You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/27 06:39:42 UTC
svn commit: r759026 [2/3] -
/incubator/cassandra/trunk/src/org/apache/cassandra/db/
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/CountFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/CountFilter.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/CountFilter.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/CountFilter.java Fri Mar 27 05:39:40 2009
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.Collection;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.SSTable;
@@ -36,19 +37,19 @@
{
private long countLimit_;
private boolean isDone_;
-
+
CountFilter(int countLimit)
{
- countLimit_ = countLimit;
+ countLimit_ = countLimit;
isDone_ = false;
}
-
+
public ColumnFamily filter(String cfNameParam, ColumnFamily columnFamily)
{
String[] values = RowMutation.getColumnAndColumnFamily(cfNameParam);
if ( columnFamily == null )
return columnFamily;
-
+
String cfName = columnFamily.name();
ColumnFamily filteredCf = new ColumnFamily(cfName);
if( countLimit_ <= 0 )
@@ -61,7 +62,7 @@
Collection<IColumn> columns = columnFamily.getAllColumns();
for(IColumn column : columns)
{
- filteredCf.addColumn(column);
+ filteredCf.addColumn(column.name(), column);
countLimit_--;
if( countLimit_ <= 0 )
{
@@ -70,14 +71,14 @@
}
}
}
- else if(values.length == 2 && columnFamily.isSuper())
+ else if(values.length == 2 && DatabaseDescriptor.getColumnType(cfName).equals("Super"))
{
Collection<IColumn> columns = columnFamily.getAllColumns();
for(IColumn column : columns)
{
SuperColumn superColumn = (SuperColumn)column;
SuperColumn filteredSuperColumn = new SuperColumn(superColumn.name());
- filteredCf.addColumn(filteredSuperColumn);
+ filteredCf.addColumn(filteredSuperColumn.name(), filteredSuperColumn);
Collection<IColumn> subColumns = superColumn.getSubColumns();
for(IColumn subColumn : subColumns)
{
@@ -90,14 +91,14 @@
}
}
}
- }
- else
+ }
+ else
{
throw new UnsupportedOperationException();
}
return filteredCf;
}
-
+
public IColumn filter(IColumn column, DataInputStream dis) throws IOException
{
countLimit_--;
@@ -107,7 +108,7 @@
}
return column;
}
-
+
public boolean isDone()
{
return isDone_;
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/DBManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/DBManager.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/DBManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/DBManager.java Fri Mar 27 05:39:40 2009
@@ -18,18 +18,27 @@
package org.apache.cassandra.db;
+import java.io.File;
import java.io.IOException;
+import java.math.BigInteger;
+import java.net.InetAddress;
import java.util.Map;
+import java.util.Random;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.IFileWriter;
+import org.apache.cassandra.io.SequenceFile;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.BasicUtilities;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.GuidGenerator;
+import org.apache.cassandra.utils.HashingSchemes;
/**
@@ -61,23 +70,23 @@
public static class StorageMetadata
{
- private Token myToken;
+ private BigInteger storageId_;
private int generation_;
- StorageMetadata(Token storageId, int generation)
+ StorageMetadata(BigInteger storageId, int generation)
{
- myToken = storageId;
+ storageId_ = storageId;
generation_ = generation;
}
- public Token getStorageId()
+ public BigInteger getStorageId()
{
- return myToken;
+ return storageId_;
}
- public void setStorageId(Token storageId)
+ public void setStorageId(BigInteger storageId)
{
- myToken = storageId;
+ storageId_ = storageId;
}
public int getGeneration()
@@ -88,7 +97,10 @@
public DBManager() throws Throwable
{
- Set<String> tables = DatabaseDescriptor.getTableToColumnFamilyMap().keySet();
+ /* Read the configuration file */
+ Map<String, Map<String, CFMetaData>> tableToColumnFamilyMap = DatabaseDescriptor.init();
+ storeMetadata(tableToColumnFamilyMap);
+ Set<String> tables = tableToColumnFamilyMap.keySet();
for (String table : tables)
{
@@ -101,6 +113,47 @@
}
/*
+ * Create the metadata tables. This table has information about
+ * the table name and the column families that make up the table.
+ * Each column family also has an associated ID which is an int.
+ */
+ private static void storeMetadata(Map<String, Map<String, CFMetaData>> tableToColumnFamilyMap) throws Throwable
+ {
+ AtomicInteger idGenerator = new AtomicInteger(0);
+ Set<String> tables = tableToColumnFamilyMap.keySet();
+
+ for ( String table : tables )
+ {
+ Table.TableMetadata tmetadata = Table.TableMetadata.instance();
+ if ( tmetadata.isEmpty() )
+ {
+ tmetadata = Table.TableMetadata.instance();
+ /* Column families associated with this table */
+ Map<String, CFMetaData> columnFamilies = tableToColumnFamilyMap.get(table);
+
+ for (String columnFamily : columnFamilies.keySet())
+ {
+ tmetadata.add(columnFamily, idGenerator.getAndIncrement(), DatabaseDescriptor.getColumnType(columnFamily));
+ }
+
+ /*
+ * Here we add all the system related column families.
+ */
+ /* Add the TableMetadata column family to this map. */
+ tmetadata.add(Table.TableMetadata.cfName_, idGenerator.getAndIncrement());
+ /* Add the LocationInfo column family to this map. */
+ tmetadata.add(SystemTable.cfName_, idGenerator.getAndIncrement());
+ /* Add the recycle column family to this map. */
+ tmetadata.add(Table.recycleBin_, idGenerator.getAndIncrement());
+ /* Add the Hints column family to this map. */
+ tmetadata.add(Table.hints_, idGenerator.getAndIncrement(), ColumnFamily.getColumnType("Super"));
+ tmetadata.apply();
+ idGenerator.set(0);
+ }
+ }
+ }
+
+ /*
* This method reads the system table and retrieves the metadata
* associated with this storage instance. Currently we store the
* metadata in a Column Family called LocatioInfo which has two
@@ -114,17 +167,22 @@
SystemTable sysTable = SystemTable.openSystemTable(SystemTable.name_);
Row row = sysTable.get(FBUtilities.getHostName());
- IPartitioner p = StorageService.getPartitioner();
+ Random random = new Random();
if ( row == null )
{
- Token token = p.getDefaultToken();
+ /* Generate a token for this Storage node */
+ String guid = GuidGenerator.guid();
+ BigInteger token = StorageService.hash(guid);
+ if ( token.signum() == -1 )
+ token = token.multiply(BigInteger.valueOf(-1L));
+
int generation = 1;
String key = FBUtilities.getHostName();
row = new Row(key);
ColumnFamily cf = new ColumnFamily(SystemTable.cfName_);
- cf.addColumn(new Column(SystemTable.token_, p.getTokenFactory().toByteArray(token)));
- cf.addColumn(new Column(SystemTable.generation_, BasicUtilities.intToByteArray(generation)) );
+ cf.addColumn(SystemTable.token_, new Column(SystemTable.token_, token.toByteArray()) );
+ cf.addColumn(SystemTable.generation_, new Column(SystemTable.generation_, BasicUtilities.intToByteArray(generation)) );
row.addColumnFamily(cf);
sysTable.apply(row);
storageMetadata = new StorageMetadata( token, generation);
@@ -132,22 +190,22 @@
else
{
/* we crashed and came back up need to bump generation # */
- Map<String, ColumnFamily> columnFamilies = row.getColumnFamilyMap();
+ Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
Set<String> cfNames = columnFamilies.keySet();
for ( String cfName : cfNames )
{
ColumnFamily columnFamily = columnFamilies.get(cfName);
- IColumn tokenColumn = columnFamily.getColumn(SystemTable.token_);
- Token token = p.getTokenFactory().fromByteArray(tokenColumn.value());
+ IColumn token = columnFamily.getColumn(SystemTable.token_);
+ BigInteger bi = new BigInteger( token.value() );
IColumn generation = columnFamily.getColumn(SystemTable.generation_);
int gen = BasicUtilities.byteArrayToInt(generation.value()) + 1;
Column generation2 = new Column("Generation", BasicUtilities.intToByteArray(gen), generation.timestamp() + 1);
- columnFamily.addColumn(generation2);
- storageMetadata = new StorageMetadata(token, gen);
+ columnFamily.addColumn("Generation", generation2);
+ storageMetadata = new StorageMetadata( bi, gen );
break;
}
sysTable.reset(row);
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java Fri Mar 27 05:39:40 2009
@@ -19,189 +19,126 @@
package org.apache.cassandra.db;
import java.io.IOException;
-import java.util.Iterator;
+import java.math.BigInteger;
-import org.apache.cassandra.io.Coordinate;
+import org.apache.cassandra.continuations.Suspendable;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.io.IFileReader;
import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.service.PartitionerType;
+import org.apache.cassandra.service.StorageService;
-public class FileStruct implements Comparable<FileStruct>, Iterable<String>
+public class FileStruct implements Comparable<FileStruct>
{
+ IFileReader reader_;
+ String key_;
+ DataInputBuffer bufIn_;
+ DataOutputBuffer bufOut_;
- private String key = null;
- private boolean exhausted = false;
- private IFileReader reader;
- private DataInputBuffer bufIn;
- private DataOutputBuffer bufOut;
-
- public FileStruct(IFileReader reader)
- {
- this.reader = reader;
- bufIn = new DataInputBuffer();
- bufOut = new DataOutputBuffer();
- }
-
- public String getFileName()
- {
- return reader.getFileName();
- }
-
- public void close() throws IOException
- {
- reader.close();
- }
-
- public boolean isExhausted()
+ public FileStruct()
{
- return exhausted;
}
-
- public DataInputBuffer getBufIn()
+
+ public FileStruct(String file, int bufSize) throws IOException
{
- return bufIn;
+ bufIn_ = new DataInputBuffer();
+ bufOut_ = new DataOutputBuffer();
+ reader_ = SequenceFile.bufferedReader(file, bufSize);
+ long bytesRead = advance();
+ if ( bytesRead == -1L )
+ throw new IOException("Either the file is empty or EOF has been reached.");
}
-
+
public String getKey()
{
- return key;
- }
-
- public int compareTo(FileStruct f)
- {
- return key.compareTo(f.key);
- }
-
- // we don't use SequenceReader.seekTo, since that (sometimes) throws an exception
- // if the key is not found. unsure if this behavior is desired.
- public void seekTo(String seekKey)
- {
- try
+ String key = key_;
+ if ( !key.equals(SSTable.blockIndexKey_) )
{
- Coordinate range = SSTable.getCoordinates(seekKey, reader);
- reader.seek(range.end_);
- long position = reader.getPositionFromBlockIndex(seekKey);
- if (position == -1)
+ PartitionerType pType = StorageService.getPartitionerType();
+ switch ( pType )
{
- reader.seek(range.start_);
- }
- else
- {
- reader.seek(position);
- }
-
- while (!exhausted)
- {
- getNextKey();
- if (key.compareTo(seekKey) >= 0)
- {
+ case OPHF:
+ break;
+
+ default:
+ String[] peices = key.split(":");
+ key = peices[1];
break;
- }
- }
- }
- catch (IOException e)
- {
- throw new RuntimeException("corrupt sstable", e);
- }
- }
-
- /*
- * Read the next key from the data file, skipping block indexes.
- * Caller must check isExhausted after each call to see if further
- * reads are valid.
- */
- public void getNextKey()
- {
- if (exhausted)
- {
- throw new IndexOutOfBoundsException();
- }
-
- try
- {
- bufOut.reset();
- if (reader.isEOF())
- {
- reader.close();
- exhausted = true;
- return;
- }
-
- long bytesread = reader.next(bufOut);
- if (bytesread == -1)
- {
- reader.close();
- exhausted = true;
- return;
- }
-
- bufIn.reset(bufOut.getData(), bufOut.getLength());
- key = bufIn.readUTF();
- /* If the key we read is the Block Index Key then omit and read the next key. */
- if (key.equals(SSTable.blockIndexKey_))
- {
- bufOut.reset();
- bytesread = reader.next(bufOut);
- if (bytesread == -1)
- {
- reader.close();
- exhausted = true;
- return;
- }
- bufIn.reset(bufOut.getData(), bufOut.getLength());
- key = bufIn.readUTF();
}
}
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
+ return key;
}
-
- public Iterator<String> iterator()
+
+ public DataOutputBuffer getBuffer()
{
- return new FileStructIterator();
+ return bufOut_;
}
-
- private class FileStructIterator implements Iterator<String>
- {
- String saved;
-
- public FileStructIterator()
- {
- if (getKey() == null && !isExhausted())
- {
- forward();
- }
- }
-
- private void forward()
+
+ public long advance() throws IOException
+ {
+ long bytesRead = -1L;
+ bufOut_.reset();
+ /* advance and read the next key in the file. */
+ if (reader_.isEOF())
{
- getNextKey();
- saved = isExhausted() ? null : getKey();
+ reader_.close();
+ return bytesRead;
}
-
- public boolean hasNext()
+
+ bytesRead = reader_.next(bufOut_);
+ if (bytesRead == -1)
{
- return saved != null;
+ reader_.close();
+ return bytesRead;
}
- public String next()
+ bufIn_.reset(bufOut_.getData(), bufOut_.getLength());
+ key_ = bufIn_.readUTF();
+ /* If the key we read is the Block Index Key then omit and read the next key. */
+ if ( key_.equals(SSTable.blockIndexKey_) )
{
- if (saved == null)
+ bufOut_.reset();
+ bytesRead = reader_.next(bufOut_);
+ if (bytesRead == -1)
{
- throw new IndexOutOfBoundsException();
+ reader_.close();
+ return bytesRead;
}
- String key = saved;
- forward();
- return key;
+ bufIn_.reset(bufOut_.getData(), bufOut_.getLength());
+ key_ = bufIn_.readUTF();
}
+
+ return bytesRead;
+ }
- public void remove()
- {
- throw new UnsupportedOperationException();
+ public int compareTo(FileStruct f)
+ {
+ int value = 0;
+ PartitionerType pType = StorageService.getPartitionerType();
+ switch( pType )
+ {
+ case OPHF:
+ value = key_.compareTo(f.key_);
+ break;
+
+ default:
+ String lhs = key_.split(":")[0];
+ BigInteger b = new BigInteger(lhs);
+ String rhs = f.key_.split(":")[0];
+ BigInteger b2 = new BigInteger(rhs);
+ value = b.compareTo(b2);
+ break;
}
+ return value;
+ }
+
+ public void close() throws IOException
+ {
+ bufIn_.close();
+ bufOut_.close();
+ reader_.close();
}
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStructComparator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStructComparator.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStructComparator.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStructComparator.java Fri Mar 27 05:39:40 2009
@@ -6,6 +6,13 @@
{
public int compare(FileStruct f, FileStruct f2)
{
- return f.getFileName().compareTo(f2.getFileName());
+ return f.reader_.getFileName().compareTo(f2.reader_.getFileName());
+ }
+
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof FileStructComparator))
+ return false;
+ return true;
}
}
\ No newline at end of file
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java Fri Mar 27 05:39:40 2009
@@ -24,8 +24,6 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.log4j.Logger;
-
import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.concurrent.ThreadFactoryImpl;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -34,7 +32,11 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.IComponentShutdown;
+import org.apache.cassandra.service.IResponseResolver;
+import org.apache.cassandra.service.QuorumResponseHandler;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.WriteResponseResolver;
+import org.apache.log4j.Logger;
/**
@@ -108,14 +110,14 @@
private void deleteEndPoint(String endpointAddress, String key) throws Exception
{
RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), key_);
- rm.delete(Table.hints_ + ":" + key + ":" + endpointAddress, System.currentTimeMillis());
+ rm.delete(Table.hints_ + ":" + key + ":" + endpointAddress);
rm.apply();
}
private void deleteKey(String key) throws Exception
{
RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), key_);
- rm.delete(Table.hints_ + ":" + key, System.currentTimeMillis());
+ rm.delete(Table.hints_ + ":" + key);
rm.apply();
}
@@ -140,7 +142,7 @@
if(hintedColumnFamily == null)
{
// Force flush now
- columnFamilyStore_.forceFlush();
+ columnFamilyStore_.forceFlush(false);
return;
}
Collection<IColumn> keys = hintedColumnFamily.getAllColumns();
@@ -175,7 +177,7 @@
}
}
// Force flush now
- columnFamilyStore_.forceFlush();
+ columnFamilyStore_.forceFlush(false);
// Now do a major compaction
columnFamilyStore_.forceCompaction(null, null, 0, null);
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/IColumn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/IColumn.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/IColumn.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/IColumn.java Fri Mar 27 05:39:40 2009
@@ -29,7 +29,6 @@
{
public static short UtfPrefix_ = 2;
public boolean isMarkedForDelete();
- public long getMarkedForDeleteAt();
public String name();
public int size();
public int serializedSize();
@@ -40,7 +39,10 @@
public Collection<IColumn> getSubColumns();
public IColumn getSubColumn(String columnName);
public void addColumn(String name, IColumn column);
+ public void delete();
+ public void repair(IColumn column);
public IColumn diff(IColumn column);
+ public boolean putColumn(IColumn column);
public int getObjectCount();
- public byte[] digest();
+ public byte[] digest();
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java Fri Mar 27 05:39:40 2009
@@ -20,14 +20,7 @@
import java.io.FileOutputStream;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Iterator;
-import java.util.PriorityQueue;
+import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -38,16 +31,19 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.log4j.Logger;
-
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.ThreadFactoryImpl;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.service.IComponentShutdown;
+import org.apache.cassandra.service.PartitionerType;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.utils.LogUtil;
-import org.apache.cassandra.utils.DestructivePQIterator;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.utils.*;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -82,6 +78,7 @@
private Map<String, ColumnFamily> columnFamilies_ = new HashMap<String, ColumnFamily>();
/* Lock and Condition for notifying new clients about Memtable switches */
Lock lock_ = new ReentrantLock();
+ Condition condition_;
Memtable(String table, String cfName) throws IOException
{
@@ -96,6 +93,7 @@
));
}
+ condition_ = lock_.newCondition();
table_ = table;
cfName_ = cfName;
creationTime_ = System.currentTimeMillis();
@@ -129,7 +127,7 @@
key_ = key;
columnFamilyName_ = cfName;
}
-
+
Getter(String key, String cfName, IFilter filter)
{
this(key, cfName);
@@ -138,11 +136,29 @@
public ColumnFamily call()
{
- ColumnFamily cf = getLocalCopy(key_, columnFamilyName_, filter_);
+ ColumnFamily cf = getLocalCopy(key_, columnFamilyName_, filter_);
return cf;
}
}
+ class Remover implements Runnable
+ {
+ private String key_;
+ private ColumnFamily columnFamily_;
+
+ Remover(String key, ColumnFamily columnFamily)
+ {
+ key_ = key;
+ columnFamily_ = columnFamily;
+ }
+
+ public void run()
+ {
+ columnFamily_.delete();
+ columnFamilies_.put(key_, columnFamily_);
+ }
+ }
+
/**
* Flushes the current memtable to disk.
*
@@ -166,7 +182,7 @@
}
/**
- * Compares two Memtable based on creation time.
+ * Compares two Memtable based on creation time.
* @param rhs
* @return
*/
@@ -196,6 +212,13 @@
currentObjectCount_.addAndGet(newCount - oldCount);
}
+ private boolean isLifetimeViolated()
+ {
+ /* Memtable lifetime in terms of milliseconds */
+ long lifetimeInMillis = DatabaseDescriptor.getMemtableLifetime() * 3600 * 1000;
+ return ( ( System.currentTimeMillis() - creationTime_ ) >= lifetimeInMillis );
+ }
+
boolean isThresholdViolated(String key)
{
boolean bVal = false;//isLifetimeViolated();
@@ -263,30 +286,27 @@
/*
* This version is used to switch memtable and force flush.
*/
- public void forceflush(ColumnFamilyStore cfStore) throws IOException
+ void forceflush(ColumnFamilyStore cfStore, boolean fRecovery) throws IOException
{
- RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), flushKey_);
-
- try
+ if(!fRecovery)
{
- if (cfStore.isSuper())
- {
- rm.add(cfStore.getColumnFamilyName() + ":SC1:Column", "0".getBytes(), 0);
- } else {
- rm.add(cfStore.getColumnFamilyName() + ":Column", "0".getBytes(), 0);
- }
- rm.apply();
+ RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), flushKey_);
+ try
+ {
+ rm.add(cfStore.columnFamily_ + ":Column","0".getBytes());
+ rm.apply();
+ }
+ catch(ColumnFamilyNotDefinedException ex)
+ {
+ logger_.debug(LogUtil.throwableToString(ex));
+ }
}
- catch(ColumnFamilyNotDefinedException ex)
+ else
{
- logger_.debug(LogUtil.throwableToString(ex));
+ flush(CommitLog.CommitLogContext.NULL);
}
}
- void flushOnRecovery() throws IOException {
- flush(CommitLog.CommitLogContext.NULL);
- }
-
private void resolve(String key, ColumnFamily columnFamily)
{
ColumnFamily oldCf = columnFamilies_.get(key);
@@ -299,7 +319,6 @@
int newObjectCount = oldCf.getColumnCount();
resolveSize(oldSize, newSize);
resolveCount(oldObjectCount, newObjectCount);
- oldCf.delete(Math.max(oldCf.getMarkedForDeleteAt(), columnFamily.getMarkedForDeleteAt()));
}
else
{
@@ -320,46 +339,68 @@
resolve(key, columnFamily);
}
- ColumnFamily getLocalCopy(String key, String columnFamilyColumn, IFilter filter)
+ ColumnFamily getLocalCopy(String key, String cfName, IFilter filter)
{
- String[] values = RowMutation.getColumnAndColumnFamily(columnFamilyColumn);
+ String[] values = RowMutation.getColumnAndColumnFamily(cfName);
ColumnFamily columnFamily = null;
if(values.length == 1 )
{
- columnFamily = columnFamilies_.get(key);
+ columnFamily = columnFamilies_.get(key);
}
else
{
ColumnFamily cFamily = columnFamilies_.get(key);
- if (cFamily == null) return null;
-
- if (values.length == 2) {
- IColumn column = cFamily.getColumn(values[1]); // super or normal column
- if (column != null )
- {
- columnFamily = new ColumnFamily(cfName_);
- columnFamily.addColumn(column);
- }
+ if(cFamily == null)
+ return null;
+ IColumn column = null;
+ if(values.length == 2)
+ {
+ column = cFamily.getColumn(values[1]);
+ if(column != null )
+ {
+ columnFamily = new ColumnFamily(cfName_);
+ columnFamily.addColumn(column.name(), column);
+ }
}
- else
- {
- assert values.length == 3;
- SuperColumn superColumn = (SuperColumn)cFamily.getColumn(values[1]);
- if (superColumn != null)
- {
- IColumn subColumn = superColumn.getSubColumn(values[2]);
- if (subColumn != null)
- {
- columnFamily = new ColumnFamily(cfName_);
- columnFamily.addColumn(values[1] + ":" + values[2], subColumn.value(), subColumn.timestamp(), subColumn.isMarkedForDelete());
- }
- }
+ else
+ {
+ column = cFamily.getColumn(values[1]);
+ if(column != null )
+ {
+
+ IColumn subColumn = ((SuperColumn)column).getSubColumn(values[2]);
+ if(subColumn != null)
+ {
+ columnFamily = new ColumnFamily(cfName_);
+ columnFamily.createColumn(values[1] + ":" + values[2], subColumn.value(), subColumn.timestamp());
+ }
+ }
}
}
/* Filter unnecessary data from the column based on the provided filter */
- return filter.filter(columnFamilyColumn, columnFamily);
+ return filter.filter(cfName, columnFamily);
}
+ ColumnFamily get(String key, String cfName)
+ {
+ printExecutorStats();
+ Callable<ColumnFamily> call = new Getter(key, cfName);
+ ColumnFamily cf = null;
+ try
+ {
+ cf = apartments_.get(cfName_).submit(call).get();
+ }
+ catch ( ExecutionException ex )
+ {
+ logger_.debug(LogUtil.throwableToString(ex));
+ }
+ catch ( InterruptedException ex2 )
+ {
+ logger_.debug(LogUtil.throwableToString(ex2));
+ }
+ return cf;
+ }
+
ColumnFamily get(String key, String cfName, IFilter filter)
{
printExecutorStats();
@@ -380,6 +421,23 @@
return cf;
}
+ /*
+ * Although the method is named remove() we cannot remove the key
+ * from memtable. We add it to the memtable but mark it as deleted.
+ * The reason for this because we do not want a successive get()
+ * for the same key to scan the ColumnFamilyStore files for this key.
+ */
+ void remove(String key, ColumnFamily columnFamily) throws IOException
+ {
+ printExecutorStats();
+ Runnable deleter = new Remover(key, columnFamily);
+ apartments_.get(cfName_).submit(deleter);
+ }
+
+ /*
+ * param recoveryMode - indicates if this was invoked during
+ * recovery.
+ */
void flush(CommitLog.CommitLogContext cLogCtx) throws IOException
{
ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
@@ -392,9 +450,51 @@
return;
}
+ PartitionerType pType = StorageService.getPartitionerType();
String directory = DatabaseDescriptor.getDataFileLocation();
String filename = cfStore.getNextFileName();
- SSTable ssTable = new SSTable(directory, filename);
+ SSTable ssTable = new SSTable(directory, filename, pType);
+ switch (pType)
+ {
+ case OPHF:
+ flushForOrderPreservingPartitioner(ssTable, cfStore, cLogCtx);
+ break;
+
+ default:
+ flushForRandomPartitioner(ssTable, cfStore, cLogCtx);
+ break;
+ }
+ }
+
+ private void flushForRandomPartitioner(SSTable ssTable, ColumnFamilyStore cfStore, CommitLog.CommitLogContext cLogCtx) throws IOException
+ {
+ /* List of primary keys in sorted order */
+ List<PrimaryKey> pKeys = PrimaryKey.create( columnFamilies_.keySet() );
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ /* Use this BloomFilter to decide if a key exists in a SSTable */
+ BloomFilter bf = new BloomFilter(pKeys.size(), 15);
+ for ( PrimaryKey pKey : pKeys )
+ {
+ buffer.reset();
+ ColumnFamily columnFamily = columnFamilies_.get(pKey.key());
+ if ( columnFamily != null )
+ {
+ /* serialize the cf with column indexes */
+ ColumnFamily.serializer2().serialize( columnFamily, buffer );
+ /* Now write the key and value to disk */
+ ssTable.append(pKey.key(), pKey.hash(), buffer);
+ bf.fill(pKey.key());
+ columnFamily.clear();
+ }
+ }
+ ssTable.close(bf);
+ cfStore.onMemtableFlush(cLogCtx);
+ cfStore.storeLocation( ssTable.getDataFileLocation(), bf );
+ buffer.close();
+ }
+
+ private void flushForOrderPreservingPartitioner(SSTable ssTable, ColumnFamilyStore cfStore, CommitLog.CommitLogContext cLogCtx) throws IOException
+ {
List<String> keys = new ArrayList<String>( columnFamilies_.keySet() );
Collections.sort(keys);
DataOutputBuffer buffer = new DataOutputBuffer();
@@ -407,7 +507,7 @@
if ( columnFamily != null )
{
/* serialize the cf with column indexes */
- ColumnFamily.serializerWithIndexes().serialize( columnFamily, buffer );
+ ColumnFamily.serializer2().serialize( columnFamily, buffer );
/* Now write the key and value to disk */
ssTable.append(key, buffer);
bf.fill(key);
@@ -418,13 +518,5 @@
cfStore.onMemtableFlush(cLogCtx);
cfStore.storeLocation( ssTable.getDataFileLocation(), bf );
buffer.close();
-
- columnFamilies_.clear();
}
-
- public Iterator<String> sortedKeyIterator()
- {
- return new DestructivePQIterator<String>(new PriorityQueue<String>(columnFamilies_.keySet()));
- }
-
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java Fri Mar 27 05:39:40 2009
@@ -27,8 +27,10 @@
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -40,7 +42,7 @@
private static Lock lock_ = new ReentrantLock();
private static Logger logger_ = Logger.getLogger(MemtableManager.class);
private ReentrantReadWriteLock rwLock_ = new ReentrantReadWriteLock(true);
- public static MemtableManager instance()
+ static MemtableManager instance()
{
if ( instance_ == null )
{
@@ -157,22 +159,7 @@
}
}
- public List<Memtable> getUnflushedMemtables(String cfName)
- {
- rwLock_.readLock().lock();
- try
- {
- List<Memtable> memtables = history_.get(cfName);
- if (memtables != null)
- {
- return new ArrayList<Memtable>(memtables);
- }
- return Arrays.asList(new Memtable[0]);
- }
- finally
- {
- rwLock_.readLock().unlock();
- }
- }
+
+
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java Fri Mar 27 05:39:40 2009
@@ -87,6 +87,10 @@
columnFamilyStore_.doCompaction();
logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
}
+ catch (IOException e)
+ {
+ logger_.debug( LogUtil.throwableToString(e) );
+ }
catch (Throwable th)
{
logger_.error( LogUtil.throwableToString(th) );
@@ -118,9 +122,16 @@
public Boolean call()
{
boolean result = true;
- logger_.debug("Started compaction ..."+columnFamilyStore_.columnFamily_);
- result = columnFamilyStore_.doAntiCompaction(ranges_, target_,fileList_);
- logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
+ try
+ {
+ logger_.debug("Started compaction ..."+columnFamilyStore_.columnFamily_);
+ result = columnFamilyStore_.doAntiCompaction(ranges_, target_,fileList_);
+ logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
+ }
+ catch (IOException e)
+ {
+ logger_.debug( LogUtil.throwableToString(e) );
+ }
return result;
}
}
@@ -169,6 +180,10 @@
columnFamilyStore_.doCleanupCompaction();
logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
}
+ catch (IOException e)
+ {
+ logger_.debug( LogUtil.throwableToString(e) );
+ }
catch (Throwable th)
{
logger_.error( LogUtil.throwableToString(th) );
@@ -208,9 +223,14 @@
public Future<Boolean> submit(ColumnFamilyStore columnFamilyStore, List<Range> ranges, EndPoint target, List<String> fileList)
{
return compactor_.submit( new FileCompactor2(columnFamilyStore, ranges, target, fileList) );
+ }
+
+ public Future<Boolean> submit(ColumnFamilyStore columnFamilyStore, List<Range> ranges)
+ {
+ return compactor_.submit( new FileCompactor2(columnFamilyStore, ranges) );
}
- public void submitMajor(ColumnFamilyStore columnFamilyStore, long skip)
+ public void submitMajor(ColumnFamilyStore columnFamilyStore, List<Range> ranges, long skip)
{
compactor_.submit( new OnDemandCompactor(columnFamilyStore, skip) );
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/NamesFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/NamesFilter.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/NamesFilter.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/NamesFilter.java Fri Mar 27 05:39:40 2009
@@ -32,13 +32,13 @@
public class NamesFilter implements IFilter
{
/* list of column names to filter against. */
- private List<String> names_;
-
+ private List<String> names_ = new ArrayList<String>();
+
NamesFilter(List<String> names)
{
- names_ = new ArrayList<String>(names);
+ names_ = names;
}
-
+
public ColumnFamily filter(String cf, ColumnFamily columnFamily)
{
if ( columnFamily == null )
@@ -55,8 +55,8 @@
{
if ( names_.contains(column.name()) )
{
- names_.remove(column.name());
- filteredCf.addColumn(column);
+ names_.remove(column.name());
+ filteredCf.addColumn(column.name(), column);
}
if( isDone() )
{
@@ -64,20 +64,20 @@
}
}
}
- else if ( values.length == 2 && DatabaseDescriptor.getColumnType(cfName).equals("Super"))
+ else if ( values.length == 2 && DatabaseDescriptor.getColumnType(cfName).equals("Super") )
{
Collection<IColumn> columns = columnFamily.getAllColumns();
for(IColumn column : columns)
{
SuperColumn superColumn = (SuperColumn)column;
SuperColumn filteredSuperColumn = new SuperColumn(superColumn.name());
- filteredCf.addColumn(filteredSuperColumn);
+ filteredCf.addColumn(filteredSuperColumn.name(), filteredSuperColumn);
Collection<IColumn> subColumns = superColumn.getSubColumns();
for(IColumn subColumn : subColumns)
{
if ( names_.contains(subColumn.name()) )
{
- names_.remove(subColumn.name());
+ names_.remove(subColumn.name());
filteredSuperColumn.addColumn(subColumn.name(), subColumn);
}
if( isDone() )
@@ -87,28 +87,28 @@
}
}
}
- else
+ else
{
throw new UnsupportedOperationException();
}
return filteredCf;
}
-
+
public IColumn filter(IColumn column, DataInputStream dis) throws IOException
- {
+ {
String columnName = column.name();
if ( names_.contains(columnName) )
{
- names_.remove(columnName);
+ names_.remove(columnName);
}
else
{
column = null;
}
-
+
return column;
}
-
+
public boolean isDone()
{
return names_.isEmpty();
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/RecoveryManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/RecoveryManager.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/RecoveryManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/RecoveryManager.java Fri Mar 27 05:39:40 2009
@@ -93,6 +93,7 @@
public static void main(String[] args) throws Throwable
{
+ DatabaseDescriptor.init();
long start = System.currentTimeMillis();
RecoveryManager rm = RecoveryManager.instance();
rm.doRecovery();
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Row.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Row.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Row.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Row.java Fri Mar 27 05:39:40 2009
@@ -18,109 +18,132 @@
package org.apache.cassandra.db;
+import java.io.DataInput;
import java.io.DataInputStream;
+import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Hashtable;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.log4j.Logger;
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.io.*;
-public class Row
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+public class Row implements Serializable
{
- private static RowSerializer serializer_ = new RowSerializer();
- private static Logger logger_ = Logger.getLogger(Row.class);
+ private static ICompactSerializer<Row> serializer_;
+ private static Logger logger_ = Logger.getLogger(Row.class);
- static RowSerializer serializer()
+ static
{
- return serializer_;
+ serializer_ = new RowSerializer();
}
- private String key_;
+ static ICompactSerializer<Row> serializer()
+ {
+ return serializer_;
+ }
+ private String key_;
private Map<String, ColumnFamily> columnFamilies_ = new Hashtable<String, ColumnFamily>();
+ private transient AtomicInteger size_ = new AtomicInteger(0);
+ /* Ctor for JAXB */
protected Row()
{
}
public Row(String key)
{
- key_ = key;
+ key_ = key;
}
-
+
public String key()
{
return key_;
}
-
+
void key(String key)
{
key_ = key;
}
-
- public Set<String> getColumnFamilyNames()
- {
- return columnFamilies_.keySet();
- }
-
- public Collection<ColumnFamily> getColumnFamilies()
+
+ public ColumnFamily getColumnFamily(String cfName)
{
- return columnFamilies_.values();
+ return columnFamilies_.get(cfName);
}
- @Deprecated
- // (use getColumnFamilies or getColumnFamilyNames)
- public Map<String, ColumnFamily> getColumnFamilyMap()
+ public Map<String, ColumnFamily> getColumnFamilies()
{
return columnFamilies_;
}
- public ColumnFamily getColumnFamily(String cfName)
- {
- return columnFamilies_.get(cfName);
- }
-
void addColumnFamily(ColumnFamily columnFamily)
{
columnFamilies_.put(columnFamily.name(), columnFamily);
+ size_.addAndGet(columnFamily.size());
}
void removeColumnFamily(ColumnFamily columnFamily)
{
columnFamilies_.remove(columnFamily.name());
int delta = (-1) * columnFamily.size();
+ size_.addAndGet(delta);
+ }
+
+ public int size()
+ {
+ return size_.get();
}
public boolean isEmpty()
{
- return (columnFamilies_.size() == 0);
+ return ( columnFamilies_.size() == 0 );
}
+
+ /**
+ * This is used as oldRow.merge(newRow). Basically we take the newRow
+ * and merge it into the oldRow.
+ */
+ void merge(Row row)
+ {
+ Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
+ Set<String> cfNames = columnFamilies.keySet();
- /*
+ for ( String cfName : cfNames )
+ {
+ ColumnFamily cf = columnFamilies_.get(cfName);
+ if ( cf == null )
+ columnFamilies_.put(cfName, columnFamilies.get(cfName));
+ else
+ {
+ cf.merge(columnFamilies.get(cfName));
+ }
+ }
+ }
+
+ /**
* This function will repair the current row with the input row
* what that means is that if there are any differences between the 2 rows then
* this fn will make the current row take the latest changes .
*/
public void repair(Row row)
{
- Map<String, ColumnFamily> columnFamilies = row.getColumnFamilyMap();
+ Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
Set<String> cfNames = columnFamilies.keySet();
- for (String cfName : cfNames)
+ for ( String cfName : cfNames )
{
ColumnFamily cf = columnFamilies_.get(cfName);
- if (cf == null)
+ if ( cf == null )
{
- cf = new ColumnFamily(cfName);
+ cf = new ColumnFamily(cfName);
columnFamilies_.put(cfName, cf);
}
cf.repair(columnFamilies.get(cfName));
@@ -128,7 +151,7 @@
}
- /*
+ /**
* This function will calculate the difference between 2 rows
* and return the resultant row. This assumes that the row that
* is being submitted is a super set of the current row so
@@ -139,82 +162,78 @@
public Row diff(Row row)
{
Row rowDiff = new Row(key_);
- Map<String, ColumnFamily> columnFamilies = row.getColumnFamilyMap();
+ Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
Set<String> cfNames = columnFamilies.keySet();
- for (String cfName : cfNames)
+ for ( String cfName : cfNames )
{
ColumnFamily cf = columnFamilies_.get(cfName);
ColumnFamily cfDiff = null;
- if (cf == null)
- rowDiff.getColumnFamilyMap().put(cfName, columnFamilies.get(cfName));
+ if ( cf == null )
+ rowDiff.getColumnFamilies().put(cfName, columnFamilies.get(cfName));
else
{
- cfDiff = cf.diff(columnFamilies.get(cfName));
- if (cfDiff != null)
- rowDiff.getColumnFamilyMap().put(cfName, cfDiff);
+ cfDiff = cf.diff(columnFamilies.get(cfName));
+ if(cfDiff != null)
+ rowDiff.getColumnFamilies().put(cfName, cfDiff);
}
}
- if (rowDiff.getColumnFamilyMap().size() != 0)
- return rowDiff;
+ if(rowDiff.getColumnFamilies().size() != 0)
+ return rowDiff;
else
- return null;
+ return null;
}
-
+
public Row cloneMe()
{
- Row row = new Row(key_);
- row.columnFamilies_ = new HashMap<String, ColumnFamily>(columnFamilies_);
- return row;
+ Row row = new Row(key_);
+ row.columnFamilies_ = new HashMap<String, ColumnFamily>(columnFamilies_);
+ return row;
}
public byte[] digest()
{
long start = System.currentTimeMillis();
- Set<String> cfamilies = columnFamilies_.keySet();
- byte[] xorHash = ArrayUtils.EMPTY_BYTE_ARRAY;
- for (String cFamily : cfamilies)
- {
- if (xorHash.length == 0)
- {
- xorHash = columnFamilies_.get(cFamily).digest();
- }
- else
- {
- byte[] tmpHash = columnFamilies_.get(cFamily).digest();
- xorHash = FBUtilities.xor(xorHash, tmpHash);
- }
- }
+ Set<String> cfamilies = columnFamilies_.keySet();
+ byte[] xorHash = new byte[0];
+ byte[] tmpHash = new byte[0];
+ for(String cFamily : cfamilies)
+ {
+ if(xorHash.length == 0)
+ {
+ xorHash = columnFamilies_.get(cFamily).digest();
+ }
+ else
+ {
+ tmpHash = columnFamilies_.get(cFamily).digest();
+ xorHash = FBUtilities.xor(xorHash, tmpHash);
+ }
+ }
logger_.info("DIGEST TIME: " + (System.currentTimeMillis() - start)
- + " ms.");
- return xorHash;
+ + " ms.");
+ return xorHash;
}
-
+
void clear()
- {
+ {
columnFamilies_.clear();
}
-
- public String toString()
- {
- return "Row(" + key_ + " [" + StringUtils.join(columnFamilies_.values(), ", ") + ")]";
- }
}
class RowSerializer implements ICompactSerializer<Row>
{
public void serialize(Row row, DataOutputStream dos) throws IOException
{
- dos.writeUTF(row.key());
- Map<String, ColumnFamily> columnFamilies = row.getColumnFamilyMap();
- int size = columnFamilies.size();
+ dos.writeUTF(row.key());
+ Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
+ int size = columnFamilies.size();
dos.writeInt(size);
-
- if (size > 0)
- {
- Set<String> cNames = columnFamilies.keySet();
- for (String cName : cNames)
- {
+
+ if ( size > 0 )
+ {
+ Set<String> cNames = columnFamilies.keySet();
+ for ( String cName : cNames )
+ {
ColumnFamily.serializer().serialize(columnFamilies.get(cName), dos);
}
}
@@ -222,13 +241,13 @@
public Row deserialize(DataInputStream dis) throws IOException
{
- String key = dis.readUTF();
- Row row = new Row(key);
+ String key = dis.readUTF();
+ Row row = new Row(key);
int size = dis.readInt();
-
- if (size > 0)
- {
- for (int i = 0; i < size; ++i)
+
+ if ( size > 0 )
+ {
+ for ( int i = 0; i < size; ++i )
{
ColumnFamily cf = ColumnFamily.serializer().deserialize(dis);
row.addColumnFamily(cf);
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java Fri Mar 27 05:39:40 2009
@@ -18,30 +18,13 @@
package org.apache.cassandra.db;
-import java.io.ByteArrayOutputStream;
+import java.util.*;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.batch_mutation_super_t;
-import org.apache.cassandra.service.batch_mutation_t;
-import org.apache.cassandra.service.column_t;
-import org.apache.cassandra.service.superColumn_t;
-import org.apache.cassandra.utils.FBUtilities;
/**
@@ -49,107 +32,121 @@
*/
public class RowMutation implements Serializable
-{
- private static ICompactSerializer<RowMutation> serializer_;
- public static final String HINT = "HINT";
-
+{
+ private static ICompactSerializer<RowMutation> serializer_;
+
static
{
serializer_ = new RowMutationSerializer();
- }
+ }
static ICompactSerializer<RowMutation> serializer()
{
return serializer_;
}
-
+
private String table_;
- private String key_;
- protected Map<String, ColumnFamily> modifications_ = new HashMap<String, ColumnFamily>();
-
+ private String key_;
+ protected Map<String, ColumnFamily> modifications_ = new HashMap<String, ColumnFamily>();
+ protected Map<String, ColumnFamily> deletions_ = new HashMap<String, ColumnFamily>();
+
/* Ctor for JAXB */
private RowMutation()
{
}
-
+
public RowMutation(String table, String key)
{
table_ = table;
key_ = key;
}
-
+
public RowMutation(String table, Row row)
{
table_ = table;
key_ = row.key();
- for (ColumnFamily cf : row.getColumnFamilies())
+ Map<String, ColumnFamily> cfSet = row.getColumnFamilies();
+ Set<String> keyset = cfSet.keySet();
+ for(String cfName : keyset)
{
- add(cf);
+ add(cfName, cfSet.get(cfName));
}
}
- protected RowMutation(String table, String key, Map<String, ColumnFamily> modifications)
+ protected RowMutation(String table, String key, Map<String, ColumnFamily> modifications, Map<String, ColumnFamily> deletions)
{
- table_ = table;
- key_ = key;
- modifications_ = modifications;
+ table_ = table;
+ key_ = key;
+ modifications_ = modifications;
+ deletions_ = deletions;
}
-
+
public static String[] getColumnAndColumnFamily(String cf)
{
return cf.split(":");
}
-
+
String table()
{
return table_;
}
-
+
public String key()
{
return key_;
}
-
+
void addHints(String hint) throws IOException, ColumnFamilyNotDefinedException
- {
+ {
String cfName = Table.hints_ + ":" + hint;
- add(cfName, ArrayUtils.EMPTY_BYTE_ARRAY, 0);
+ add(cfName, new byte[0]);
}
-
+
/*
* Specify a column family name and the corresponding column
- * family object.
+ * family object.
* param @ cf - column family name
* param @ columnFamily - the column family.
*/
- public void add(ColumnFamily columnFamily)
+ public void add(String cf, ColumnFamily columnFamily)
+ {
+ modifications_.put(cf, columnFamily);
+ }
+
+ /*
+ * Specify a column name and a corresponding value for
+ * the column. Column name is specified as <column family>:column.
+ * This will result in a ColumnFamily associated with
+ * <column family> as name and a Column with <column>
+ * as name.
+ *
+ * param @ cf - column name as <column family>:<column>
+ * param @ value - value associated with the column
+ */
+ public void add(String cf, byte[] value) throws IOException, ColumnFamilyNotDefinedException
{
- if (modifications_.containsKey(columnFamily.name()))
- {
- throw new IllegalArgumentException("ColumnFamily " + columnFamily.name() + " is already being modified");
- }
- modifications_.put(columnFamily.name(), columnFamily);
+ add(cf, value, 0);
}
-
+
/*
* Specify a column name and a corresponding value for
* the column. Column name is specified as <column family>:column.
* This will result in a ColumnFamily associated with
* <column family> as name and a Column with <column>
- * as name. The columan can be further broken up
+ * as name. The columan can be further broken up
* as super column name : columnname in case of super columns
- *
+ *
* param @ cf - column name as <column family>:<column>
* param @ value - value associated with the column
* param @ timestamp - ts associated with this data.
*/
public void add(String cf, byte[] value, long timestamp)
- {
+ {
String[] values = RowMutation.getColumnAndColumnFamily(cf);
-
+
if ( values.length == 0 || values.length == 1 || values.length > 3 )
throw new IllegalArgumentException("Column Family " + cf + " in invalid format. Must be in <column family>:<column> format.");
-
+
ColumnFamily columnFamily = modifications_.get(values[0]);
if( values.length == 2 )
{
@@ -157,7 +154,7 @@
{
columnFamily = new ColumnFamily(values[0], ColumnFamily.getColumnType("Standard"));
}
- columnFamily.addColumn(values[1], value, timestamp);
+ columnFamily.createColumn(values[1], value, timestamp);
}
if( values.length == 3 )
{
@@ -165,203 +162,172 @@
{
columnFamily = new ColumnFamily(values[0], ColumnFamily.getColumnType("Super"));
}
- columnFamily.addColumn(values[1]+ ":" + values[2], value, timestamp);
+ columnFamily.createColumn(values[1]+ ":" + values[2], value, timestamp);
}
modifications_.put(values[0], columnFamily);
}
-
- public void delete(String columnFamilyColumn, long timestamp)
- {
- String[] values = RowMutation.getColumnAndColumnFamily(columnFamilyColumn);
- String cfName = values[0];
- if (modifications_.containsKey(cfName))
- {
- throw new IllegalArgumentException("ColumnFamily " + cfName + " is already being modified");
- }
-
- if (values.length == 0 || values.length > 3)
- throw new IllegalArgumentException("Column Family " + columnFamilyColumn + " in invalid format. Must be in <column family>:<column> format.");
-
- ColumnFamily columnFamily = modifications_.get(cfName);
- if (columnFamily == null)
- columnFamily = new ColumnFamily(cfName);
- if (values.length == 2)
- {
- columnFamily.addColumn(values[1], ArrayUtils.EMPTY_BYTE_ARRAY, timestamp, true);
- }
- else if (values.length == 3)
+
+ /*
+ * Specify a column name to be deleted. Column name is
+ * specified as <column family>:column. This will result
+ * in a ColumnFamily associated with <column family> as
+ * name and perhaps Column with <column> as name being
+ * marked as deleted.
+ * TODO : Delete is NOT correct as we do not know
+ * the CF type so we need to fix that.
+ * param @ cf - column name as <column family>:<column>
+ */
+ public void delete(String cf)
+ {
+ String[] values = RowMutation.getColumnAndColumnFamily(cf);
+
+ if ( values.length == 0 || values.length > 3 )
+ throw new IllegalArgumentException("Column Family " + cf + " in invalid format. Must be in <column family>:<column> format.");
+
+ ColumnFamily columnFamily = modifications_.get(values[0]);
+ if ( columnFamily == null )
+ columnFamily = new ColumnFamily(values[0]);
+ if(values.length == 2 )
{
- columnFamily.addColumn(values[1] + ":" + values[2], ArrayUtils.EMPTY_BYTE_ARRAY, timestamp, true);
+ columnFamily.createColumn( values[1]);
}
- else
+ if(values.length == 3 )
{
- assert values.length == 1;
- columnFamily.delete(timestamp);
+ columnFamily.createColumn( values[1] + ":" + values[2]);
}
- modifications_.put(cfName, columnFamily);
+ deletions_.put(values[0], columnFamily);
}
-
- /*
+
+ /*
* This is equivalent to calling commit. Applies the changes to
* to the table that is obtained by calling Table.open().
*/
public void apply() throws IOException, ColumnFamilyNotDefinedException
- {
+ {
Row row = new Row(key_);
- apply(row);
- }
-
- /*
- * Allows RowMutationVerbHandler to optimize by re-using a single Row object.
- */
- void apply(Row emptyRow) throws IOException, ColumnFamilyNotDefinedException
- {
- assert emptyRow.getColumnFamilyMap().size() == 0;
Table table = Table.open(table_);
- for (String cfName : modifications_.keySet())
- {
- if (!table.isValidColumnFamily(cfName))
+ Set<String> cfNames = modifications_.keySet();
+ for (String cfName : cfNames )
+ {
+ if ( !table.isValidColumnFamily(cfName) )
+ throw new ColumnFamilyNotDefinedException("Column Family " + cfName + " has not been defined.");
+ row.addColumnFamily( modifications_.get(cfName) );
+ }
+ table.apply(row);
+
+ Set<String> cfNames2 = deletions_.keySet();
+ for (String cfName : cfNames2 )
+ {
+ if ( !table.isValidColumnFamily(cfName) )
throw new ColumnFamilyNotDefinedException("Column Family " + cfName + " has not been defined.");
- emptyRow.addColumnFamily(modifications_.get(cfName));
+ row.addColumnFamily( deletions_.get(cfName) );
}
- table.apply(emptyRow);
+ if ( deletions_.size() > 0 )
+ table.delete(row);
}
-
- /*
+
+ /*
* This is equivalent to calling commit. Applies the changes to
* to the table that is obtained by calling Table.open().
*/
- void load(Row row) throws IOException, ColumnFamilyNotDefinedException, ExecutionException, InterruptedException
- {
- Table table = Table.open(table_);
+ void apply(Row row) throws IOException, ColumnFamilyNotDefinedException
+ {
+ Table table = Table.open(table_);
Set<String> cfNames = modifications_.keySet();
- for (String cfName : cfNames)
- {
- if (!table.isValidColumnFamily(cfName))
+ for (String cfName : cfNames )
+ {
+ if ( !table.isValidColumnFamily(cfName) )
throw new ColumnFamilyNotDefinedException("Column Family " + cfName + " has not been defined.");
- row.addColumnFamily(modifications_.get(cfName));
+ row.addColumnFamily( modifications_.get(cfName) );
}
- table.load(row);
- }
-
- public Message makeRowMutationMessage() throws IOException
- {
- return makeRowMutationMessage(StorageService.mutationVerbHandler_);
- }
-
- public Message makeRowMutationMessage(String verbHandlerName) throws IOException
- {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
- serializer().serialize(this, dos);
- EndPoint local = StorageService.getLocalStorageEndPoint();
- EndPoint from = (local != null) ? local : new EndPoint(FBUtilities.getHostName(), 7000);
- return new Message(from, StorageService.mutationStage_, verbHandlerName, bos.toByteArray());
- }
-
- public static RowMutation getRowMutation(batch_mutation_t batchMutation)
- {
- RowMutation rm = new RowMutation(batchMutation.table,
- batchMutation.key.trim());
- for (String cfname : batchMutation.cfmap.keySet())
- {
- List<column_t> list = batchMutation.cfmap.get(cfname);
- for (column_t columnData : list)
- {
- rm.add(cfname + ":" + columnData.columnName,
- columnData.value.getBytes(), columnData.timestamp);
-
- }
+ table.apply(row);
+
+ Set<String> cfNames2 = deletions_.keySet();
+ for (String cfName : cfNames2 )
+ {
+ if ( !table.isValidColumnFamily(cfName) )
+ throw new ColumnFamilyNotDefinedException("Column Family " + cfName + " has not been defined.");
+ row.addColumnFamily( deletions_.get(cfName) );
}
- return rm;
+ if ( deletions_.size() > 0 )
+ table.delete(row);
}
-
- public static RowMutation getRowMutation(batch_mutation_super_t batchMutationSuper)
- {
- RowMutation rm = new RowMutation(batchMutationSuper.table,
- batchMutationSuper.key.trim());
- Set keys = batchMutationSuper.cfmap.keySet();
- Iterator keyIter = keys.iterator();
- while (keyIter.hasNext())
- {
- Object key = keyIter.next(); // Get the next key.
- List<superColumn_t> list = batchMutationSuper.cfmap.get(key);
- for (superColumn_t superColumnData : list)
- {
- if (superColumnData.columns.size() != 0)
- {
- for (column_t columnData : superColumnData.columns)
- {
- rm.add(key.toString() + ":" + superColumnData.name + ":" + columnData.columnName,
- columnData.value.getBytes(), columnData.timestamp);
- }
- }
- else
- {
- rm.add(key.toString() + ":" + superColumnData.name, ArrayUtils.EMPTY_BYTE_ARRAY, 0);
- }
- }
+
+ /*
+ * This is equivalent to calling commit. Applies the changes to
+ * to the table that is obtained by calling Table.open().
+ */
+ void load(Row row) throws IOException, ColumnFamilyNotDefinedException
+ {
+ Table table = Table.open(table_);
+ Set<String> cfNames = modifications_.keySet();
+ for (String cfName : cfNames )
+ {
+ if ( !table.isValidColumnFamily(cfName) )
+ throw new ColumnFamilyNotDefinedException("Column Family " + cfName + " has not been defined.");
+ row.addColumnFamily( modifications_.get(cfName) );
}
- return rm;
- }
-
- public String toString()
- {
- return "RowMutation(" +
- "key='" + key_ + '\'' +
- ", modifications=[" + StringUtils.join(modifications_.values(), ", ") + "]" +
- ')';
- }
+ table.load(row);
+ }
}
class RowMutationSerializer implements ICompactSerializer<RowMutation>
{
- private void freezeTheMaps(Map<String, ColumnFamily> map, DataOutputStream dos) throws IOException
- {
- int size = map.size();
+ private void freezeTheMaps(Map<String, ColumnFamily> map, DataOutputStream dos) throws IOException
+ {
+ int size = map.size();
dos.writeInt(size);
- if (size > 0)
- {
+ if ( size > 0 )
+ {
Set<String> keys = map.keySet();
- for (String key : keys)
- {
- dos.writeUTF(key);
+ for( String key : keys )
+ {
+ dos.writeUTF(key);
ColumnFamily cf = map.get(key);
- if (cf != null)
+ if ( cf != null )
{
ColumnFamily.serializer().serialize(cf, dos);
- }
+ }
}
}
- }
-
- public void serialize(RowMutation rm, DataOutputStream dos) throws IOException
- {
- dos.writeUTF(rm.table());
- dos.writeUTF(rm.key());
-
- /* serialize the modifications_ in the mutation */
+ }
+
+ public void serialize(RowMutation rm, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(rm.table());
+ dos.writeUTF(rm.key());
+
+ /* serialize the modifications_ in the mutation */
freezeTheMaps(rm.modifications_, dos);
- }
-
- private Map<String, ColumnFamily> defreezeTheMaps(DataInputStream dis) throws IOException
- {
- Map<String, ColumnFamily> map = new HashMap<String, ColumnFamily>();
+
+ /* serialize the deletions_ in the mutation */
+ freezeTheMaps(rm.deletions_, dos);
+ }
+
+ private Map<String, ColumnFamily> defreezeTheMaps(DataInputStream dis) throws IOException
+ {
+ Map<String, ColumnFamily> map = new HashMap<String, ColumnFamily>();
int size = dis.readInt();
- for (int i = 0; i < size; ++i)
+ for ( int i = 0; i < size; ++i )
{
- String key = dis.readUTF();
+ String key = dis.readUTF();
ColumnFamily cf = ColumnFamily.serializer().deserialize(dis);
- map.put(key, cf);
+ map.put(key, cf);
}
return map;
- }
-
+ }
+
public RowMutation deserialize(DataInputStream dis) throws IOException
{
- String table = dis.readUTF();
- String key = dis.readUTF();
- Map<String, ColumnFamily> modifications = defreezeTheMaps(dis);
- return new RowMutation(table, key, modifications);
+ String table = dis.readUTF();
+ String key = dis.readUTF();
+
+ /* Defreeze the modifications_ map */
+ Map<String, ColumnFamily> modifications = defreezeTheMaps(dis);
+
+ /* Defreeze the deletions_ map */
+ Map<String, ColumnFamily> deletions = defreezeTheMaps(dis);
+
+ return new RowMutation(table, key, modifications, deletions);
}
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java Fri Mar 27 05:39:40 2009
@@ -46,14 +46,18 @@
protected Row row_ = new Row();
protected DataInputBuffer buffer_ = new DataInputBuffer();
}
-
- private static Logger logger_ = Logger.getLogger(RowMutationVerbHandler.class);
+
+ private static Logger logger_ = Logger.getLogger(RowMutationVerbHandler.class);
/* We use this so that we can reuse the same row mutation context for the mutation. */
private static ThreadLocal<RowMutationContext> tls_ = new InheritableThreadLocal<RowMutationContext>();
-
+
public void doVerb(Message message)
{
- byte[] bytes = (byte[]) message.getMessageBody()[0];
+ /* For DEBUG only. Printing queue length */
+ logger_.info( "ROW MUTATION STAGE: " + StageManager.getStageTaskCount(StorageService.mutationStage_) );
+ /* END DEBUG */
+
+ byte[] bytes = (byte[])message.getMessageBody()[0];
/* Obtain a Row Mutation Context from TLS */
RowMutationContext rowMutationCtx = tls_.get();
if ( rowMutationCtx == null )
@@ -61,47 +65,51 @@
rowMutationCtx = new RowMutationContext();
tls_.set(rowMutationCtx);
}
-
- rowMutationCtx.buffer_.reset(bytes, bytes.length);
-
+
+ rowMutationCtx.buffer_.reset(bytes, bytes.length);
+
try
{
- RowMutation rm = RowMutation.serializer().deserialize(rowMutationCtx.buffer_);
- logger_.debug("Applying " + rm);
-
+ RowMutationMessage rmMsg = RowMutationMessage.serializer().deserialize(rowMutationCtx.buffer_);
+ RowMutation rm = rmMsg.getRowMutation();
/* Check if there were any hints in this message */
- byte[] hintedBytes = message.getHeader(RowMutation.HINT);
+ byte[] hintedBytes = message.getHeader(RowMutationMessage.hint_);
if ( hintedBytes != null && hintedBytes.length > 0 )
{
EndPoint hint = EndPoint.fromBytes(hintedBytes);
- logger_.debug("Adding hint for " + hint);
/* add necessary hints to this mutation */
- RowMutation hintedMutation = new RowMutation(rm.table(), HintedHandOffManager.key_);
- hintedMutation.addHints(rm.key() + ":" + hint.getHost());
- hintedMutation.apply();
+ try
+ {
+ RowMutation hintedMutation = new RowMutation(rm.table(), HintedHandOffManager.key_);
+ hintedMutation.addHints(rm.key() + ":" + hint.getHost());
+ hintedMutation.apply();
+ }
+ catch ( ColumnFamilyNotDefinedException ex )
+ {
+ logger_.debug(LogUtil.throwableToString(ex));
+ }
}
-
- long start = System.currentTimeMillis();
-
- rowMutationCtx.row_.clear();
+
+ long start = System.currentTimeMillis();
+
rowMutationCtx.row_.key(rm.key());
rm.apply(rowMutationCtx.row_);
-
- long end = System.currentTimeMillis();
-
- WriteResponse response = new WriteResponse(rm.table(), rm.key(), true);
- Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
- logger_.debug("Mutation applied in " + (end - start) + "ms. Sending response to " + message.getFrom() + " for key :" + rm.key());
- MessagingService.getMessagingInstance().sendOneWay(responseMessage, message.getFrom());
- }
- catch(ColumnFamilyNotDefinedException ex)
+
+ long end = System.currentTimeMillis();
+ logger_.info("ROW MUTATION APPLY: " + (end - start) + " ms.");
+
+ /*WriteResponseMessage writeResponseMessage = new WriteResponseMessage(rm.table(), rm.key(), true);
+ Message response = message.getReply( StorageService.getLocalStorageEndPoint(), new Object[]{writeResponseMessage} );
+ logger_.debug("Sending teh response to " + message.getFrom() + " for key :" + rm.key());
+ MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom()); */
+ }
+ catch( ColumnFamilyNotDefinedException ex )
{
- // TODO shouldn't this be checked before it's sent to us?
- logger_.warn("column family not defined, and no way to tell the client", ex);
- }
- catch (IOException e)
+ logger_.debug(LogUtil.throwableToString(ex));
+ }
+ catch ( IOException e )
{
- logger_.error("Error in row mutation", e);
- }
+ logger_.debug(LogUtil.throwableToString(e));
+ }
}
}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/SequentialScanner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/SequentialScanner.java?rev=759026&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/SequentialScanner.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/SequentialScanner.java Fri Mar 27 05:39:40 2009
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.continuations.Suspendable;
+import org.apache.cassandra.db.ColumnFamilyNotDefinedException;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.FileStruct;
+import org.apache.cassandra.db.IScanner;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IAsyncResult;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+
+/**
+ * This class is used to scan through all the keys in disk
+ * in Iterator style. Usage is as follows:
+ * SequentialScanner scanner = new SequentialScanner("table");
+ *
+ * while ( scanner.hasNext() )
+ * {
+ * Row row = scanner.next();
+ * // Do something with the row
+ * }
+ *
+ * @author alakshman
+ *
+ */
+
+public class SequentialScanner implements IScanner<Row>
+{
+ private static Logger logger_ = Logger.getLogger( SequentialScanner.class );
+ private final static int bufSize_ = 1024*1024;
+
+ /* Table over which we want to perform a sequential scan. */
+ private String table_;
+ private Queue<FileStruct> fsQueue_ = new PriorityQueue<FileStruct>();
+
+ public SequentialScanner(String table) throws IOException
+ {
+ table_ = table;
+ List<String> allFiles = Table.open(table_).getAllSSTablesOnDisk();
+
+ for (String file : allFiles)
+ {
+ FileStruct fs = new FileStruct(file, SequentialScanner.bufSize_);
+ fsQueue_.add(fs);
+ }
+ }
+
+ /**
+ * Determines if there is anything more to be
+ * scanned.
+ * @return true if more elements are remanining
+ * else false.
+ */
+ public boolean hasNext() throws IOException
+ {
+ boolean hasNext = ( fsQueue_.size() > 0 ) ? true : false;
+ return hasNext;
+ }
+
+ /**
+ * Returns the next row associated with the smallest key
+ * on disk.
+ *
+ * @return row of the next smallest key on disk.
+ */
+ public Row next() throws IOException
+ {
+ if ( fsQueue_.size() == 0 )
+ throw new IllegalStateException("Nothing in the stream to scan.");
+
+ Row row = null;
+ FileStruct fs = fsQueue_.poll();
+
+ // Process the key only if it is in the primary range and not a block index.
+ if ( StorageService.instance().isPrimary(fs.getKey()) && !fs.getKey().equals(SSTable.blockIndexKey_) )
+ {
+ row = Table.open(table_).get(fs.getKey());
+ }
+
+ doCorrections(fs.getKey());
+ long bytesRead = fs.advance();
+ if ( bytesRead != -1L )
+ fsQueue_.add(fs);
+ return row;
+ }
+
+ /**
+ * This method advances the pointer in the file struct
+ * in the even the same key occurs in multiple files.
+ *
+ * @param key key we are interested in.
+ * @throws IOException
+ */
+ private void doCorrections(String key) throws IOException
+ {
+ List<FileStruct> lfs = new ArrayList<FileStruct>();
+ Iterator<FileStruct> it = fsQueue_.iterator();
+
+ while ( it.hasNext() )
+ {
+ FileStruct fs = it.next();
+ /*
+ * We encountered a key that is greater
+ * than the key we are currently serving
+ * so scram.
+ */
+ if ( fs.getKey().compareTo(key) != 0 )
+ {
+ break;
+ }
+ else
+ {
+ lfs.add(fs);
+ }
+ }
+
+ for ( FileStruct fs : lfs )
+ {
+ /* discard duplicate entries. */
+ fsQueue_.poll();
+ long bytesRead = fs.advance();
+ if ( bytesRead != -1L )
+ {
+ fsQueue_.add(fs);
+ }
+ }
+ }
+
+ public void close() throws IOException
+ {
+ if ( fsQueue_.size() > 0 )
+ {
+ for ( int i = 0; i < fsQueue_.size(); ++i )
+ {
+ FileStruct fs = fsQueue_.poll();
+ fs.close();
+ }
+ }
+ }
+
+ public void fetch(String key, String cf) throws IOException, ColumnFamilyNotDefinedException
+ {
+ throw new UnsupportedOperationException("This operation does not make sense in the SequentialScanner");
+ }
+}