You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 08:57:31 UTC
svn commit: r749218 [13/34] - in /incubator/cassandra: branches/ dist/
nightly/ site/ tags/ trunk/ trunk/lib/ trunk/src/ trunk/src/org/
trunk/src/org/apache/ trunk/src/org/apache/cassandra/
trunk/src/org/apache/cassandra/analytics/ trunk/src/org/apache...
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnIndexer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnIndexer.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnIndexer.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnIndexer.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.IndexHelper;
+import org.apache.cassandra.io.SSTable.KeyPositionInfo;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.FBUtilities;
+
+
+/**
+ * Help to create an index for a column family based on size of columns
+ * Author : Karthik Ranganathan ( kranganathan@facebook.com )
+ */
+
+public class ColumnIndexer
+{
+ /**
+ * Given a column family this, function creates an in-memory structure that represents the
+ * column index for the column family, and subsequently writes it to disk.
+ * @param columnFamily Column family to create index for
+ * @param dos data output stream
+ * @throws IOException
+ */
+ public static void serialize(ColumnFamily columnFamily, DataOutputStream dos) throws IOException
+ {
+ Collection<IColumn> columns = columnFamily.getAllColumns();
+ BloomFilter bf = createColumnBloomFilter(columns);
+ /* Write out the bloom filter. */
+ DataOutputBuffer bufOut = new DataOutputBuffer();
+ BloomFilter.serializer().serialize(bf, bufOut);
+ /* write the length of the serialized bloom filter. */
+ dos.writeInt(bufOut.getLength());
+ /* write out the serialized bytes. */
+ dos.write(bufOut.getData(), 0, bufOut.getLength());
+
+ TypeInfo typeInfo = DatabaseDescriptor.getTypeInfo(columnFamily.name());
+ doIndexing(typeInfo, columns, dos);
+ }
+
+ /**
+ * Create a bloom filter that contains the subcolumns and the columns that
+ * make up this Column Family.
+ * @param columns columns of the ColumnFamily
+ * @return BloomFilter with the summarized information.
+ */
+ private static BloomFilter createColumnBloomFilter(Collection<IColumn> columns)
+ {
+ int columnCount = 0;
+ for ( IColumn column : columns )
+ {
+ columnCount += column.getObjectCount();
+ }
+
+ BloomFilter bf = new BloomFilter(columnCount, 4);
+ for ( IColumn column : columns )
+ {
+ bf.fill(column.name());
+ /* If this is SuperColumn type Column Family we need to get the subColumns too. */
+ if ( column instanceof SuperColumn )
+ {
+ Collection<IColumn> subColumns = column.getSubColumns();
+ for ( IColumn subColumn : subColumns )
+ {
+ bf.fill(subColumn.name());
+ }
+ }
+ }
+ return bf;
+ }
+
+ private static IndexHelper.ColumnIndexInfo getColumnIndexInfo(TypeInfo typeInfo, IColumn column)
+ {
+ IndexHelper.ColumnIndexInfo cIndexInfo = null;
+
+ if ( column instanceof SuperColumn )
+ {
+ cIndexInfo = IndexHelper.ColumnIndexFactory.instance(TypeInfo.STRING);
+ cIndexInfo.set(column.name());
+ }
+ else
+ {
+ cIndexInfo = IndexHelper.ColumnIndexFactory.instance(typeInfo);
+ switch(typeInfo)
+ {
+ case STRING:
+ cIndexInfo.set(column.name());
+ break;
+
+ case LONG:
+ cIndexInfo.set(column.timestamp());
+ break;
+ }
+ }
+
+ return cIndexInfo;
+ }
+
+ /**
+ * Given the collection of columns in the Column Family,
+ * the name index is generated and written into the provided
+ * stream
+ * @param columns for whom the name index needs to be generated
+ * @param bf bloom filter that summarizes the columns that make
+ * up the column family.
+ * @param dos stream into which the serialized name index needs
+ * to be written.
+ * @throws IOException
+ */
+ private static void doIndexing(TypeInfo typeInfo, Collection<IColumn> columns, DataOutputStream dos) throws IOException
+ {
+ /* we are going to write column indexes */
+ int numColumns = 0;
+ int position = 0;
+ int indexSizeInBytes = 0;
+ int sizeSummarized = 0;
+
+ /*
+ * Maintains a list of KeyPositionInfo objects for the columns in this
+ * column family. The key is the column name and the position is the
+ * relative offset of that column name from the start of the list.
+ * We do this so that we don't read all the columns into memory.
+ */
+
+ List<IndexHelper.ColumnIndexInfo> columnIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
+
+ /* column offsets at the right thresholds into the index map. */
+ for ( IColumn column : columns )
+ {
+ /* if we hit the column index size that we have to index after, go ahead and index it */
+ if(position - sizeSummarized >= DatabaseDescriptor.getColumnIndexSize())
+ {
+ /*
+ * ColumnSort applies only to columns. So in case of
+ * SuperColumn always use the name indexing scheme for
+ * the SuperColumns. We will fix this later.
+ */
+ IndexHelper.ColumnIndexInfo cIndexInfo = getColumnIndexInfo(typeInfo, column);
+ cIndexInfo.position(position);
+ cIndexInfo.count(numColumns);
+ columnIndexList.add(cIndexInfo);
+ /*
+ * we will be writing this object as a UTF8 string and two ints,
+ * so calculate the size accordingly. Note that we store the string
+ * as UTF-8 encoded, so when we calculate the length, it should be
+ * converted to UTF-8.
+ */
+ indexSizeInBytes += cIndexInfo.size();
+ sizeSummarized = position;
+ numColumns = 0;
+ }
+ position += column.serializedSize();
+ ++numColumns;
+ }
+ /* write the column index list */
+ IndexHelper.serialize(indexSizeInBytes, columnIndexList, dos);
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLog.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLog.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLog.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,684 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.*;
+import java.nio.file.Path;
+import java.util.*;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.IFileWriter;
+import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.FileUtils;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.utils.*;
+
+/*
+ * Commit Log tracks every write operation into the system. The aim
+ * of the commit log is to be able to successfully recover data that was
+ * not stored to disk via the Memtable. Every Commit Log maintains a
+ * header represented by the abstraction CommitLogHeader. The header
+ * contains a bit array and an array of longs and both the arrays are
+ * of size, #column families for the Table, the Commit Log represents.
+ * Whenever a ColumnFamily is written to, for the first time its bit flag
+ * is set to one in the CommitLogHeader. When it is flushed to disk by the
+ * Memtable its corresponding bit in the header is set to zero. This helps
+ * track which CommitLogs can be thrown away as a result of Memtable flushes.
+ * However if a ColumnFamily is flushed and again written to disk then its
+ * entry in the array of longs is updated with the offset in the Commit Log
+ * file where it was written. This helps speed up recovery since we can seek
+ * to these offsets and start processing the commit log.
+ * Every Commit Log is rolled over everytime it reaches its threshold in size.
+ * Over time there could be a number of commit logs that would be generated.
+ * Hovever whenever we flush a column family disk and update its bit flag we
+ * take this bit array and bitwise & it with the headers of the other commit
+ * logs that are older.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+class CommitLog
+{
+ private static final int bufSize_ = 128*1024*1024;
+ private static Map<String, CommitLog> instances_ = new HashMap<String, CommitLog>();
+ private static Lock lock_ = new ReentrantLock();
+ private static Logger logger_ = Logger.getLogger(CommitLog.class);
+ private static Map<String, CommitLogHeader> clHeaders_ = new HashMap<String, CommitLogHeader>();
+
+ public static final class CommitLogContext
+ {
+ static CommitLogContext NULL = new CommitLogContext(null, -1L);
+ /* Commit Log associated with this operation */
+ private String file_;
+ /* Offset within the Commit Log where this row as added */
+ private long position_;
+
+ public CommitLogContext(String file, long position)
+ {
+ file_ = file;
+ position_ = position;
+ }
+
+ boolean isValidContext()
+ {
+ return (position_ != -1L);
+ }
+
+ String file()
+ {
+ return file_;
+ }
+
+ long position()
+ {
+ return position_;
+ }
+ }
+
+ public static class CommitLogFileComparator implements Comparator<String>
+ {
+ public int compare(String f, String f2)
+ {
+ return (int)(getCreationTime(f) - getCreationTime(f2));
+ }
+
+ public boolean equals(Object o)
+ {
+ if ( !(o instanceof CommitLogFileComparator) )
+ return false;
+ return true;
+ }
+ }
+
+ static long getCreationTime(String file)
+ {
+ String[] entries = FBUtilities.strip(file, "-.");
+ return Long.parseLong(entries[entries.length - 2]);
+ }
+
+ /*
+ * Write the serialized commit log header into the specified commit log.
+ */
+ private static void writeCommitLogHeader(String commitLogFileName, byte[] bytes) throws IOException
+ {
+ IFileWriter logWriter = CommitLog.createWriter(commitLogFileName);
+ logWriter.seek(0L);
+ /* write the commit log header */
+ logWriter.writeDirect(bytes);
+ logWriter.close();
+ }
+
+ private static IFileWriter createWriter(String file) throws IOException
+ {
+ if ( DatabaseDescriptor.isFastSync() )
+ {
+ /* Add this to the threshold */
+ int bufSize = 4*1024*1024;
+ return SequenceFile.fastWriter(file, CommitLog.bufSize_ + bufSize);
+ }
+ else
+ return SequenceFile.writer(file);
+ }
+
+ static CommitLog open(String table) throws IOException
+ {
+ CommitLog commitLog = instances_.get(table);
+ if ( commitLog == null )
+ {
+ CommitLog.lock_.lock();
+ try
+ {
+ commitLog = instances_.get(table);
+ if ( commitLog == null )
+ {
+ commitLog = new CommitLog(table, false);
+ instances_.put(table, commitLog);
+ }
+ }
+ finally
+ {
+ CommitLog.lock_.unlock();
+ }
+ }
+ return commitLog;
+ }
+
+ static String getTableName(String file)
+ {
+ String[] values = file.split("-");
+ return values[1];
+ }
+
+ private String table_;
+ /* Current commit log file */
+ private String logFile_;
+ /* header for current commit log */
+ private CommitLogHeader clHeader_;
+ private IFileWriter logWriter_;
+ private long commitHeaderStartPos_;
+ /* Force rollover the commit log on the next insert */
+ private boolean forcedRollOver_ = false;
+
+
+ /*
+ * Generates a file name of the format CommitLog-<table>-<timestamp>.log in the
+ * directory specified by the Database Descriptor.
+ */
+ private void setNextFileName()
+ {
+ logFile_ = DatabaseDescriptor.getLogFileLocation() +
+ System.getProperty("file.separator") +
+ "CommitLog-" +
+ table_ +
+ "-" +
+ System.currentTimeMillis() +
+ ".log";
+ }
+
+ /*
+ * param @ table - name of table for which we are maintaining
+ * this commit log.
+ * param @ recoverymode - is commit log being instantiated in
+ * in recovery mode.
+ */
+ CommitLog(String table, boolean recoveryMode) throws IOException
+ {
+ table_ = table;
+ if ( !recoveryMode )
+ {
+ setNextFileName();
+ logWriter_ = CommitLog.createWriter(logFile_);
+ writeCommitLogHeader();
+ }
+ }
+
+ /*
+ * This ctor is currently used only for debugging. We
+ * are now using it to modify the header so that recovery
+ * can be tested in as many scenarios as we could imagine.
+ *
+ * param @ logFile - logfile which we wish to modify.
+ */
+ CommitLog(File logFile) throws IOException
+ {
+ table_ = CommitLog.getTableName(logFile.getName());
+ logFile_ = logFile.getAbsolutePath();
+ logWriter_ = CommitLog.createWriter(logFile_);
+ commitHeaderStartPos_ = 0L;
+ }
+
+ String getLogFile()
+ {
+ return logFile_;
+ }
+
+ void readCommitLogHeader(String logFile, byte[] bytes) throws IOException
+ {
+ IFileReader logReader = SequenceFile.reader(logFile);
+ try
+ {
+ logReader.readDirect(bytes);
+ }
+ finally
+ {
+ logReader.close();
+ }
+ }
+
+ /*
+ * This is invoked on startup via the ctor. It basically
+ * writes a header with all bits set to zero.
+ */
+ private void writeCommitLogHeader() throws IOException
+ {
+ Table table = Table.open(table_);
+ int cfSize = table.getNumberOfColumnFamilies();
+ /* record the beginning of the commit header */
+ commitHeaderStartPos_ = logWriter_.getCurrentPosition();
+ /* write the commit log header */
+ clHeader_ = new CommitLogHeader(cfSize);
+ writeCommitLogHeader(clHeader_.toByteArray(), false);
+ }
+
+ private void writeCommitLogHeader(byte[] bytes, boolean reset) throws IOException
+ {
+ /* record the current position */
+ long currentPos = logWriter_.getCurrentPosition();
+ logWriter_.seek(commitHeaderStartPos_);
+ /* write the commit log header */
+ logWriter_.writeDirect(bytes);
+ if ( reset )
+ {
+ /* seek back to the old position */
+ logWriter_.seek(currentPos);
+ }
+ }
+
+ void recover(List<File> clogs) throws IOException
+ {
+ Table table = Table.open(table_);
+ int cfSize = table.getNumberOfColumnFamilies();
+ int size = CommitLogHeader.size(cfSize);
+ byte[] header = new byte[size];
+ byte[] header2 = new byte[size];
+ int index = clogs.size() - 1;
+
+ File file = clogs.get(index);
+ readCommitLogHeader(file.getAbsolutePath(), header);
+
+ Stack<File> filesNeeded = new Stack<File>();
+ filesNeeded.push(file);
+
+ /*
+ * Identify files that we need for processing. This can be done
+ * using the information in the header of each file. Simply and
+ * the byte[] (which are the headers) and stop at the file where
+ * the result is a zero.
+ */
+ for ( int i = (index - 1); i >= 0; --i )
+ {
+ file = clogs.get(i);
+ readCommitLogHeader(file.getAbsolutePath(), header2);
+ byte[] result = CommitLogHeader.and(header, header2);
+ if ( !CommitLogHeader.isZero(result) )
+ {
+ filesNeeded.push(file);
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ doRecovery(filesNeeded, header);
+ }
+
+ private void printHeader(byte[] header)
+ {
+ StringBuilder sb = new StringBuilder("");
+ for ( byte b : header )
+ {
+ sb.append(b);
+ sb.append(" ");
+ }
+ logger_.debug(sb.toString());
+ }
+
+ private void doRecovery(Stack<File> filesNeeded, byte[] header) throws IOException
+ {
+ Table table = Table.open(table_);
+
+ DataInputBuffer bufIn = new DataInputBuffer();
+ DataOutputBuffer bufOut = new DataOutputBuffer();
+
+ while ( !filesNeeded.isEmpty() )
+ {
+ File file = filesNeeded.pop();
+ // IFileReader reader = SequenceFile.bufferedReader(file.getAbsolutePath(), DatabaseDescriptor.getLogFileSizeThreshold());
+ IFileReader reader = SequenceFile.reader(file.getAbsolutePath());
+ try
+ {
+ Map<String, Row> rows = new HashMap<String, Row>();
+ reader.readDirect(header);
+ /* deserialize the commit log header */
+ bufIn.reset(header, 0, header.length);
+ CommitLogHeader clHeader = CommitLogHeader.serializer().deserialize(bufIn);
+ /* seek to the lowest position */
+ int lowPos = CommitLogHeader.getLowestPosition(clHeader);
+ /*
+ * If lowPos == 0 then we need to skip the processing of this
+ * file.
+ */
+ if (lowPos == 0)
+ break;
+ else
+ reader.seek(lowPos);
+
+ /* read the logs populate RowMutation and apply */
+ while ( !reader.isEOF() )
+ {
+ bufOut.reset();
+ long bytesRead = reader.next(bufOut);
+ if ( bytesRead == -1 )
+ break;
+
+ bufIn.reset(bufOut.getData(), bufOut.getLength());
+ /* Skip over the commit log key portion */
+ bufIn.readUTF();
+ /* Skip over data size */
+ bufIn.readInt();
+
+ /* read the commit log entry */
+ try
+ {
+ Row row = Row.serializer().deserialize(bufIn);
+ Map<String, ColumnFamily> columnFamilies = new HashMap<String, ColumnFamily>(row.getColumnFamilies());
+ /* remove column families that have already been flushed */
+ Set<String> cNames = columnFamilies.keySet();
+
+ for ( String cName : cNames )
+ {
+ ColumnFamily columnFamily = columnFamilies.get(cName);
+ /* TODO: Remove this to not process Hints */
+ if ( !DatabaseDescriptor.isApplicationColumnFamily(cName) )
+ {
+ row.removeColumnFamily(columnFamily);
+ continue;
+ }
+ int id = table.getColumnFamilyId(columnFamily.name());
+ if ( clHeader.get(id) == 0 || reader.getCurrentPosition() < clHeader.getPosition(id) )
+ row.removeColumnFamily(columnFamily);
+ }
+ if ( !row.isEmpty() )
+ {
+ table.applyNow(row);
+ }
+ }
+ catch ( IOException e )
+ {
+ logger_.debug( LogUtil.throwableToString(e) );
+ }
+ }
+ reader.close();
+ /* apply the rows read */
+ table.flush(true);
+ }
+ catch ( Throwable th )
+ {
+ logger_.info( LogUtil.throwableToString(th) );
+ /* close the reader and delete this commit log. */
+ reader.close();
+ FileUtils.delete( new File[]{file} );
+ }
+ }
+ }
+
+ /*
+ * Update the header of the commit log if a new column family
+ * is encountered for the first time.
+ */
+ private void updateHeader(Row row) throws IOException
+ {
+ Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
+ Table table = Table.open(table_);
+ Set<String> cNames = columnFamilies.keySet();
+ for ( String cName : cNames )
+ {
+ ColumnFamily columnFamily = columnFamilies.get(cName);
+ int id = table.getColumnFamilyId(columnFamily.name());
+ if ( clHeader_.get(id) == 0 || ( clHeader_.get(id) == 1 && clHeader_.getPosition(id) == 0 ) )
+ {
+ if ( clHeader_.get(id) == 0 || ( clHeader_.get(id) == 1 && clHeader_.getPosition(id) == 0 ) )
+ {
+ clHeader_.turnOn( id, logWriter_.getCurrentPosition() );
+ writeCommitLogHeader(clHeader_.toByteArray(), true);
+ }
+ }
+ }
+ }
+
+ /*
+ * Adds the specified row to the commit log. This method will reset the
+ * file offset to what it is before the start of the operation in case
+ * of any problems. This way we can assume that the subsequent commit log
+ * entry will override the garbage left over by the previous write.
+ */
+ synchronized CommitLogContext add(Row row) throws IOException
+ {
+ long currentPosition = -1L;
+ CommitLogContext cLogCtx = null;
+ DataOutputBuffer cfBuffer = new DataOutputBuffer();
+ long fileSize = 0L;
+
+ try
+ {
+ /* serialize the row */
+ cfBuffer.reset();
+ Row.serializer().serialize(row, cfBuffer);
+ currentPosition = logWriter_.getCurrentPosition();
+ cLogCtx = new CommitLogContext(logFile_, currentPosition);
+ /* Update the header */
+ updateHeader(row);
+ logWriter_.append(table_, cfBuffer);
+ fileSize = logWriter_.getFileSize();
+ checkThresholdAndRollLog(fileSize);
+ }
+ catch (IOException e)
+ {
+ if ( currentPosition != -1 )
+ logWriter_.seek(currentPosition);
+ throw e;
+ }
+ finally
+ {
+ cfBuffer.close();
+ }
+ return cLogCtx;
+ }
+
+ /*
+ * This is called on Memtable flush to add to the commit log
+ * a token indicating that this column family has been flushed.
+ * The bit flag associated with this column family is set in the
+ * header and this is used to decide if the log file can be deleted.
+ */
+ synchronized void onMemtableFlush(String cf, CommitLog.CommitLogContext cLogCtx) throws IOException
+ {
+ Table table = Table.open(table_);
+ int id = table.getColumnFamilyId(cf);
+ /* trying discarding old commit log files */
+ discard(cLogCtx, id);
+ }
+
+
+ /*
+ * Check if old commit logs can be deleted. However we cannot
+ * do this anymore in the Fast Sync mode and hence I think we
+ * should get rid of Fast Sync mode altogether. If there is
+ * a pathological event where few CF's are rarely being updated
+ * then their Memtable never gets flushed.
+ * This will prevent commit logs from being deleted. WE NEED to
+ * fix this using some hueristic and force flushing such Memtables.
+ *
+ * param @ cLogCtx The commitLog context .
+ * param @ id id of the columnFamily being flushed to disk.
+ *
+ */
+ private void discard(CommitLog.CommitLogContext cLogCtx, int id) throws IOException
+ {
+ /* retrieve the commit log header associated with the file in the context */
+ CommitLogHeader commitLogHeader = clHeaders_.get(cLogCtx.file());
+ if(commitLogHeader == null )
+ {
+ if( logFile_.equals(cLogCtx.file()) )
+ {
+ /* this means we are dealing with the current commit log. */
+ commitLogHeader = clHeader_;
+ clHeaders_.put(cLogCtx.file(), clHeader_);
+ }
+ else
+ return;
+ }
+ /*
+ * We do any processing only if there is a change in the position in the context.
+ * This can happen if an older Memtable's flush comes in after a newer Memtable's
+ * flush. Right now this cannot happen since Memtables are flushed on a single
+ * thread.
+ */
+ if ( cLogCtx.position() < commitLogHeader.getPosition(id) )
+ return;
+ commitLogHeader.turnOff(id);
+ /* Sort the commit logs based on creation time */
+ List<String> oldFiles = new ArrayList<String>(clHeaders_.keySet());
+ Collections.sort(oldFiles, new CommitLogFileComparator());
+ List<String> listOfDeletedFiles = new ArrayList<String>();
+ /*
+ * Loop through all the commit log files in the history. Now process
+ * all files that are older than the one in the context. For each of
+ * these files the header needs to modified by performing a bitwise &
+ * of the header with the header of the file in the context. If we
+ * encounter the file in the context in our list of old commit log files
+ * then we update the header and write it back to the commit log.
+ */
+ for(String oldFile : oldFiles)
+ {
+ if(oldFile.equals(cLogCtx.file()))
+ {
+ /*
+ * We need to turn on again. This is because we always keep
+ * the bit turned on and the position indicates from where the
+ * commit log needs to be read. When a flush occurs we turn off
+ * perform & operation and then turn on with the new position.
+ */
+ commitLogHeader.turnOn(id, cLogCtx.position());
+ writeCommitLogHeader(cLogCtx.file(), commitLogHeader.toByteArray());
+ break;
+ }
+ else
+ {
+ CommitLogHeader oldCommitLogHeader = clHeaders_.get(oldFile);
+ oldCommitLogHeader.and(commitLogHeader);
+ if(oldCommitLogHeader.isSafeToDelete())
+ {
+ logger_.debug("Deleting commit log:"+ oldFile);
+ FileUtils.deleteAsync(oldFile);
+ listOfDeletedFiles.add(oldFile);
+ }
+ else
+ {
+ writeCommitLogHeader(oldFile, oldCommitLogHeader.toByteArray());
+ }
+ }
+ }
+
+ for ( String deletedFile : listOfDeletedFiles)
+ {
+ clHeaders_.remove(deletedFile);
+ }
+ }
+
+ private void checkThresholdAndRollLog( long fileSize )
+ {
+ try
+ {
+ if ( fileSize >= DatabaseDescriptor.getLogFileSizeThreshold() || forcedRollOver_ )
+ {
+ if ( logWriter_.getFileSize() >= DatabaseDescriptor.getLogFileSizeThreshold() || forcedRollOver_ )
+ {
+ /* Rolls the current log file over to a new one. */
+ setNextFileName();
+ String oldLogFile = logWriter_.getFileName();
+ //history_.add(oldLogFile);
+ logWriter_.close();
+
+ /* point reader/writer to a new commit log file. */
+ // logWriter_ = SequenceFile.writer(logFile_);
+ logWriter_ = CommitLog.createWriter(logFile_);
+ /* squirrel away the old commit log header */
+ clHeaders_.put(oldLogFile, new CommitLogHeader( clHeader_ ));
+ /*
+ * We need to zero out positions because the positions in
+ * the old file do not make sense in the new one.
+ */
+ clHeader_.zeroPositions();
+ writeCommitLogHeader(clHeader_.toByteArray(), false);
+ // Get the list of files in commit log directory if it is greater than a certain number
+ // Force flush all the column families that way we ensure that a slowly populated column family is not screwing up
+ // by accumulating the commit logs .
+ }
+ }
+ }
+ catch ( IOException e )
+ {
+ logger_.info(LogUtil.throwableToString(e));
+ }
+ finally
+ {
+ forcedRollOver_ = false;
+ }
+ }
+
+ public void setForcedRollOver()
+ {
+ forcedRollOver_ = true;
+ }
+
+ synchronized public void snapshot( String snapshotDirectory ) throws IOException
+ {
+ Map<String, List<File>> tableToCommitLogs = RecoveryManager.getListOFCommitLogsPerTable();
+ List<File> clogs = tableToCommitLogs.get(table_);
+
+ if( clogs != null )
+ {
+ File snapshotDir = new File(snapshotDirectory);
+ if( !snapshotDir.exists() )
+ snapshotDir.mkdir();
+ File commitLogSnapshotDir = new File(snapshotDirectory + System.getProperty("file.separator") + "CommitLogs");
+ if( !commitLogSnapshotDir.exists() )
+ commitLogSnapshotDir.mkdir();
+ for (File file : clogs)
+ {
+ Path existingLink = file.toPath();
+ File hardLinkFile = new File(commitLogSnapshotDir.getAbsolutePath() + System.getProperty("file.separator") + file.getName());
+ Path hardLink = hardLinkFile.toPath();
+ hardLink.createLink(existingLink);
+ }
+ }
+ }
+ public static void main(String[] args) throws Throwable
+ {
+ LogUtil.init();
+
+ // the return value is not used in this case
+ DatabaseDescriptor.init();
+
+ File logDir = new File(DatabaseDescriptor.getLogFileLocation());
+ File[] files = logDir.listFiles();
+ Arrays.sort( files, new FileUtils.FileComparator() );
+
+ byte[] bytes = new byte[CommitLogHeader.size(Integer.parseInt(args[0]))];
+ for ( File file : files )
+ {
+ CommitLog clog = new CommitLog( file );
+ clog.readCommitLogHeader(file.getAbsolutePath(), bytes);
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(bytes, 0, bytes.length);
+ CommitLogHeader clHeader = CommitLogHeader.serializer().deserialize(bufIn);
+ /*
+ StringBuilder sb = new StringBuilder("");
+ for ( byte b : bytes )
+ {
+ sb.append(b);
+ sb.append(" ");
+ }
+ */
+ System.out.println("FILE:" + file);
+ System.out.println(clHeader.toString());
+ }
+ }
+}
\ No newline at end of file
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLogEntry.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLogEntry.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLogEntry.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLogEntry.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.io.ByteArrayInputStream;
+import java.io.DataOutputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+
+/*
+ * An instance of this class represents an update to a table.
+ * This is written to the CommitLog to be replayed on recovery. It
+ * contains enough information to be written to a SSTable to
+ * capture events that happened before some catastrophe.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+class CommitLogEntry
+{
+ private static AtomicInteger lsnGenerator_ = new AtomicInteger(0);
+ private static ICompactSerializer<CommitLogEntry> serializer_;
+ static
+ {
+ serializer_ = new CommitLogEntrySerializer();
+ }
+
+ static ICompactSerializer<CommitLogEntry> serializer()
+ {
+ return serializer_;
+ }
+
+ private int length_;
+ private byte[] value_ = new byte[0];
+
+ CommitLogEntry()
+ {
+ }
+
+ CommitLogEntry(byte[] value)
+ {
+ this(value, 0);
+ }
+
+ CommitLogEntry(byte[] value, int length)
+ {
+ value_ = value;
+ length_ = length;
+ }
+
+ void value(byte[] bytes)
+ {
+ value_ = bytes;
+ }
+
+ byte[] value()
+ {
+ return value_;
+ }
+
+ void length(int size)
+ {
+ length_ = size;
+ }
+
+ int length()
+ {
+ return length_;
+ }
+}
+
+class CommitLogEntrySerializer implements ICompactSerializer<CommitLogEntry>
+{
+ public void serialize(CommitLogEntry logEntry, DataOutputStream dos) throws IOException
+ {
+ int length = logEntry.length();
+ dos.writeInt(length);
+ dos.write(logEntry.value(), 0, length);
+ }
+
+ public CommitLogEntry deserialize(DataInputStream dis) throws IOException
+ {
+ byte[] value = new byte[dis.readInt()];
+ dis.readFully(value);
+ return new CommitLogEntry(value);
+ }
+}
+
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLogHeader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLogHeader.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLogHeader.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLogHeader.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.io.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class CommitLogHeader
+{
+ private static ICompactSerializer<CommitLogHeader> serializer_;
+
+ static
+ {
+ serializer_ = new CommitLogHeaderSerializer();
+ }
+
+ static ICompactSerializer<CommitLogHeader> serializer()
+ {
+ return serializer_;
+ }
+
+ static int size(int size)
+ {
+ /*
+ * We serialize the CommitLogHeader as a byte[] and write it
+ * to disk. So we first write an "int" to specify the length
+ * of the byte[] which is why we first have a 4 in the sum.
+ * We then have size which is the number of bits to track who
+ * has been flushed and then the rest is the position[]
+ * size = #of column families
+ * +
+ * size of the bitset
+ * +
+ * size of position array
+ */
+ return 4 + size + (4 * size);
+ }
+
+ static int getLowestPosition(CommitLogHeader clHeader)
+ {
+ int[] positions = clHeader.getPositions();
+ int minPosition = Integer.MAX_VALUE;
+ for ( int position : positions )
+ {
+ if ( position < minPosition && position > 0)
+ {
+ minPosition = position;
+ }
+ }
+
+ if(minPosition == Integer.MAX_VALUE)
+ minPosition = 0;
+ return minPosition;
+ }
+
+ /*
+ * Bitwise & of each byte in the two arrays.
+ * Both arrays are of same length. In order
+ * to be memory efficient the result is in
+ * the third parameter.
+ */
+ static byte[] and(byte[] bytes, byte[] bytes2) throws IOException
+ {
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(bytes, 0, bytes.length);
+ CommitLogHeader clHeader = CommitLogHeader.serializer().deserialize(bufIn);
+ byte[] clh = clHeader.getBitSet();
+
+ bufIn.reset(bytes2, 0, bytes2.length);
+ CommitLogHeader clHeader2 = CommitLogHeader.serializer().deserialize(bufIn);
+ byte[] clh2 = clHeader2.getBitSet();
+
+ byte[] result = new byte[clh.length];
+ for ( int i = 0; i < clh.length; ++i )
+ {
+ result[i] = (byte)(clh[i] & clh2[i]);
+ }
+
+ return result;
+ }
+
+ static boolean isZero(byte[] bytes)
+ {
+ for ( byte b : bytes )
+ {
+ if ( b == 1 )
+ return false;
+ }
+ return true;
+ }
+
+ private byte[] header_ = new byte[0];
+ private int[] position_ = new int[0];
+
+ CommitLogHeader(int size)
+ {
+ header_ = new byte[size];
+ position_ = new int[size];
+ }
+
+ /*
+ * This ctor is used while deserializing. This ctor
+ * also builds an index of position to column family
+ * Id.
+ */
+ CommitLogHeader(byte[] header, int[] position)
+ {
+ header_ = header;
+ position_ = position;
+ }
+
+ CommitLogHeader(CommitLogHeader clHeader)
+ {
+ header_ = new byte[clHeader.header_.length];
+ System.arraycopy(clHeader.header_, 0, header_, 0, header_.length);
+ position_ = new int[clHeader.position_.length];
+ System.arraycopy(clHeader.position_, 0, position_, 0, position_.length);
+ }
+
+ byte get(int index)
+ {
+ return header_[index];
+ }
+
+ int getPosition(int index)
+ {
+ return position_[index];
+ }
+
+ void turnOn(int index, long position)
+ {
+ turnOn(header_, index, position);
+ }
+
+ void turnOn(byte[] bytes, int index, long position)
+ {
+ bytes[index] = (byte)1;
+ position_[index] = (int)position;
+ }
+
+ void turnOff(int index)
+ {
+ turnOff(header_, index);
+ }
+
+ void turnOff(byte[] bytes, int index)
+ {
+ bytes[index] = (byte)0;
+ position_[index] = 0;
+ }
+
+ boolean isSafeToDelete() throws IOException
+ {
+ return isSafeToDelete(header_);
+ }
+
+ boolean isSafeToDelete(byte[] bytes) throws IOException
+ {
+ for ( byte b : bytes )
+ {
+ if ( b == 1 )
+ return false;
+ }
+ return true;
+ }
+
+ byte[] getBitSet()
+ {
+ return header_;
+ }
+
+ int[] getPositions()
+ {
+ return position_;
+ }
+
+ void zeroPositions()
+ {
+ int size = position_.length;
+ position_ = new int[size];
+ }
+
+ void and (CommitLogHeader commitLogHeader)
+ {
+ byte[] clh2 = commitLogHeader.header_;
+ for ( int i = 0; i < header_.length; ++i )
+ {
+ header_[i] = (byte)(header_[i] & clh2[i]);
+ }
+ }
+
+ byte[] toByteArray() throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ CommitLogHeader.serializer().serialize(this, dos);
+ return bos.toByteArray();
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder("");
+ for ( int i = 0; i < header_.length; ++i )
+ {
+ sb.append(header_[i]);
+ sb.append(":");
+ Table table = Table.open( DatabaseDescriptor.getTables().get(0));
+ sb.append(table.getColumnFamilyName(i));
+ sb.append(" ");
+ }
+ sb.append(" | " );
+ for ( int position : position_ )
+ {
+ sb.append(position);
+ sb.append(" ");
+ }
+ return sb.toString();
+ }
+}
+
+class CommitLogHeaderSerializer implements ICompactSerializer<CommitLogHeader>
+{
+ public void serialize(CommitLogHeader clHeader, DataOutputStream dos) throws IOException
+ {
+ dos.writeInt(clHeader.getBitSet().length);
+ dos.write(clHeader.getBitSet());
+ int[] positions = clHeader.getPositions();
+
+ for ( int position : positions )
+ {
+ dos.writeInt(position);
+ }
+ }
+
+ public CommitLogHeader deserialize(DataInputStream dis) throws IOException
+ {
+ int size = dis.readInt();
+ byte[] bitFlags = new byte[size];
+ dis.readFully(bitFlags);
+
+ int[] position = new int[size];
+ for ( int i = 0; i < size; ++i )
+ {
+ position[i] = dis.readInt();
+ }
+
+ return new CommitLogHeader(bitFlags, position);
+ }
+}
+
+
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/CompactSerializerInvocationHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/CompactSerializerInvocationHandler.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/CompactSerializerInvocationHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/CompactSerializerInvocationHandler.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+
+import org.apache.cassandra.io.DataOutputBuffer;
+
+
+/*
+ * This is the abstraction that pre-processes calls to implmentations
+ * of the ICompactSerializer2 serialize() via dynamic proxies.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class CompactSerializerInvocationHandler<T> implements InvocationHandler
+{
+ private ICompactSerializer2<T> serializer_;
+
+ public CompactSerializerInvocationHandler(ICompactSerializer2<T> serializer)
+ {
+ serializer_ = serializer;
+ }
+
+ /*
+ * This dynamic runtime proxy adds the indexes before the actual coumns are serialized.
+ */
+ public Object invoke(Object proxy, Method m, Object[] args) throws Throwable
+ {
+ /* Do the preprocessing here. */
+ ColumnFamily cf = (ColumnFamily)args[0];
+ DataOutputBuffer bufOut = (DataOutputBuffer)args[1];
+ ColumnIndexer.serialize(cf, bufOut);
+ return m.invoke(serializer_, args);
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/CountFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/CountFilter.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/CountFilter.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/CountFilter.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.SSTable;
+
+
+/**
+ * This class provides a filter for fitering out columns
+ * greater than a certain count.
+ *
+ * @author pmalik
+ *
+ */
+public class CountFilter implements IFilter
+{
+ private long countLimit_;
+ private boolean isDone_;
+
+ CountFilter(int countLimit)
+ {
+ countLimit_ = countLimit;
+ isDone_ = false;
+ }
+
+ public ColumnFamily filter(String cfNameParam, ColumnFamily columnFamily)
+ {
+ String[] values = RowMutation.getColumnAndColumnFamily(cfNameParam);
+ if ( columnFamily == null )
+ return columnFamily;
+
+ String cfName = columnFamily.name();
+ ColumnFamily filteredCf = new ColumnFamily(cfName);
+ if( countLimit_ <= 0 )
+ {
+ isDone_ = true;
+ return filteredCf;
+ }
+ if( values.length == 1)
+ {
+ Collection<IColumn> columns = columnFamily.getAllColumns();
+ for(IColumn column : columns)
+ {
+ filteredCf.addColumn(column.name(), column);
+ countLimit_--;
+ if( countLimit_ <= 0 )
+ {
+ isDone_ = true;
+ return filteredCf;
+ }
+ }
+ }
+ else if(values.length == 2 && DatabaseDescriptor.getColumnType(cfName).equals("Super"))
+ {
+ Collection<IColumn> columns = columnFamily.getAllColumns();
+ for(IColumn column : columns)
+ {
+ SuperColumn superColumn = (SuperColumn)column;
+ SuperColumn filteredSuperColumn = new SuperColumn(superColumn.name());
+ filteredCf.addColumn(filteredSuperColumn.name(), filteredSuperColumn);
+ Collection<IColumn> subColumns = superColumn.getSubColumns();
+ for(IColumn subColumn : subColumns)
+ {
+ filteredSuperColumn.addColumn(subColumn.name(), subColumn);
+ countLimit_--;
+ if( countLimit_ <= 0 )
+ {
+ isDone_ = true;
+ return filteredCf;
+ }
+ }
+ }
+ }
+ else
+ {
+ throw new UnsupportedOperationException();
+ }
+ return filteredCf;
+ }
+
+ public IColumn filter(IColumn column, DataInputStream dis) throws IOException
+ {
+ countLimit_--;
+ if( countLimit_ <= 0 )
+ {
+ isDone_ = true;
+ }
+ return column;
+ }
+
+ public boolean isDone()
+ {
+ return isDone_;
+ }
+
+ public void setDone()
+ {
+ isDone_ = true;
+ }
+
+ public DataInputBuffer next(String key, String cf, SSTable ssTable) throws IOException
+ {
+ return ssTable.next(key, cf);
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/DBConstants.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/DBConstants.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/DBConstants.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/DBConstants.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,9 @@
+package org.apache.cassandra.db;
+
+final class DBConstants
+{
+ public static final int boolSize_ = 1;
+ public static final int intSize_ = 4;
+ public static final int longSize_ = 8;
+ public static final int tsSize_ = 8;
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/DBManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/DBManager.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/DBManager.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/DBManager.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.net.InetAddress;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.IFileWriter;
+import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.BasicUtilities;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.GuidGenerator;
+import org.apache.cassandra.utils.HashingSchemes;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class DBManager
+{
+ private static DBManager dbMgr_;
+ private static Lock lock_ = new ReentrantLock();
+
+ public static DBManager instance() throws Throwable
+ {
+ if ( dbMgr_ == null )
+ {
+ lock_.lock();
+ try
+ {
+ if ( dbMgr_ == null )
+ dbMgr_ = new DBManager();
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+ return dbMgr_;
+ }
+
+ public static class StorageMetadata
+ {
+ private BigInteger storageId_;
+ private int generation_;
+
+ StorageMetadata(BigInteger storageId, int generation)
+ {
+ storageId_ = storageId;
+ generation_ = generation;
+ }
+
+ public BigInteger getStorageId()
+ {
+ return storageId_;
+ }
+
+ public void setStorageId(BigInteger storageId)
+ {
+ storageId_ = storageId;
+ }
+
+ public int getGeneration()
+ {
+ return generation_;
+ }
+ }
+
+ public DBManager() throws Throwable
+ {
+ /* Read the configuration file */
+ Map<String, Map<String, CFMetaData>> tableToColumnFamilyMap = DatabaseDescriptor.init();
+ storeMetadata(tableToColumnFamilyMap);
+ Set<String> tables = tableToColumnFamilyMap.keySet();
+
+ for (String table : tables)
+ {
+ Table tbl = Table.open(table);
+ tbl.onStart();
+ }
+ /* Do recovery if need be. */
+ RecoveryManager recoveryMgr = RecoveryManager.instance();
+ recoveryMgr.doRecovery();
+ }
+
+ /*
+ * Create the metadata tables. This table has information about
+ * the table name and the column families that make up the table.
+ * Each column family also has an associated ID which is an int.
+ */
+ private static void storeMetadata(Map<String, Map<String, CFMetaData>> tableToColumnFamilyMap) throws Throwable
+ {
+ AtomicInteger idGenerator = new AtomicInteger(0);
+ Set<String> tables = tableToColumnFamilyMap.keySet();
+
+ for ( String table : tables )
+ {
+ Table.TableMetadata tmetadata = Table.TableMetadata.instance();
+ if ( tmetadata.isEmpty() )
+ {
+ tmetadata = Table.TableMetadata.instance();
+ /* Column families associated with this table */
+ Map<String, CFMetaData> columnFamilies = tableToColumnFamilyMap.get(table);
+
+ for (String columnFamily : columnFamilies.keySet())
+ {
+ tmetadata.add(columnFamily, idGenerator.getAndIncrement(), DatabaseDescriptor.getColumnType(columnFamily));
+ }
+
+ /*
+ * Here we add all the system related column families.
+ */
+ /* Add the TableMetadata column family to this map. */
+ tmetadata.add(Table.TableMetadata.cfName_, idGenerator.getAndIncrement());
+ /* Add the LocationInfo column family to this map. */
+ tmetadata.add(SystemTable.cfName_, idGenerator.getAndIncrement());
+ /* Add the recycle column family to this map. */
+ tmetadata.add(Table.recycleBin_, idGenerator.getAndIncrement());
+ /* Add the Hints column family to this map. */
+ tmetadata.add(Table.hints_, idGenerator.getAndIncrement(), ColumnFamily.getColumnType("Super"));
+ tmetadata.apply();
+ idGenerator.set(0);
+ }
+ }
+ }
+
+ /*
+ * This method reads the system table and retrieves the metadata
+ * associated with this storage instance. Currently we store the
+ * metadata in a Column Family called LocatioInfo which has two
+ * columns namely "Token" and "Generation". This is the token that
+ * gets gossiped around and the generation info is used for FD.
+ */
+ public DBManager.StorageMetadata start() throws IOException
+ {
+ StorageMetadata storageMetadata = null;
+ /* Read the system table to retrieve the storage ID and the generation */
+ SystemTable sysTable = SystemTable.openSystemTable(SystemTable.name_);
+ Row row = sysTable.get(FBUtilities.getHostName());
+
+ Random random = new Random();
+ if ( row == null )
+ {
+ /* Generate a token for this Storage node */
+ String guid = GuidGenerator.guid();
+ BigInteger token = StorageService.hash(guid);
+ if ( token.signum() == -1 )
+ token = token.multiply(BigInteger.valueOf(-1L));
+
+ int generation = 1;
+
+ String key = FBUtilities.getHostName();
+ row = new Row(key);
+ ColumnFamily cf = new ColumnFamily(SystemTable.cfName_);
+ cf.addColumn(SystemTable.token_, new Column(SystemTable.token_, token.toByteArray()) );
+ cf.addColumn(SystemTable.generation_, new Column(SystemTable.generation_, BasicUtilities.intToByteArray(generation)) );
+ row.addColumnFamily(cf);
+ sysTable.apply(row);
+ storageMetadata = new StorageMetadata( token, generation);
+ }
+ else
+ {
+ /* we crashed and came back up need to bump generation # */
+ Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
+ Set<String> cfNames = columnFamilies.keySet();
+
+ for ( String cfName : cfNames )
+ {
+ ColumnFamily columnFamily = columnFamilies.get(cfName);
+
+ IColumn token = columnFamily.getColumn(SystemTable.token_);
+ BigInteger bi = new BigInteger( token.value() );
+
+ IColumn generation = columnFamily.getColumn(SystemTable.generation_);
+ int gen = BasicUtilities.byteArrayToInt(generation.value()) + 1;
+
+ Column generation2 = new Column("Generation", BasicUtilities.intToByteArray(gen), generation.timestamp() + 1);
+ columnFamily.addColumn("Generation", generation2);
+ storageMetadata = new StorageMetadata( bi, gen );
+ break;
+ }
+ sysTable.reset(row);
+ }
+ return storageMetadata;
+ }
+
+ public static void main(String[] args) throws Throwable
+ {
+ DBManager.instance().start();
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/DataFileVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/DataFileVerbHandler.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/DataFileVerbHandler.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/DataFileVerbHandler.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,46 @@
+package org.apache.cassandra.db;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+
+public class DataFileVerbHandler implements IVerbHandler
+{
+ private static Logger logger_ = Logger.getLogger( DataFileVerbHandler.class );
+
+ public void doVerb(Message message)
+ {
+ Object[] body = message.getMessageBody();
+ byte[] bytes = (byte[])body[0];
+ String table = new String(bytes);
+ logger_.info("**** Received a request from " + message.getFrom());
+
+ List<String> allFiles = Table.open(table).getAllSSTablesOnDisk();
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ try
+ {
+ dos.writeInt(allFiles.size());
+ for ( String file : allFiles )
+ {
+ dos.writeUTF(file);
+ }
+ Message response = message.getReply( StorageService.getLocalStorageEndPoint(), new Object[]{bos.toByteArray()});
+ MessagingService.getMessagingInstance().sendOneWay(response, message.getFrom());
+ }
+ catch ( IOException ex )
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/EfficientBidiMap.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/EfficientBidiMap.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/EfficientBidiMap.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/EfficientBidiMap.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.Serializable;
+import java.util.*;
+
+import org.apache.cassandra.db.ColumnComparatorFactory.ComparatorType;
+import org.apache.cassandra.service.StorageService;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class EfficientBidiMap implements Serializable
+{
+ private Map<String, IColumn> map_ = new HashMap<String, IColumn>();
+ private SortedSet<IColumn> sortedSet_;
+ private Comparator<IColumn> columnComparator_;
+
+ EfficientBidiMap()
+ {
+ this(ColumnComparatorFactory.getComparator(ComparatorType.TIMESTAMP));
+ }
+
+ EfficientBidiMap(Comparator<IColumn> columnComparator)
+ {
+ columnComparator_ = columnComparator;
+ sortedSet_ = new TreeSet<IColumn>(columnComparator);
+ }
+
+ EfficientBidiMap(Map<String, IColumn> map, SortedSet<IColumn> set, Comparator<IColumn> comparator)
+ {
+ map_ = map;
+ sortedSet_ = set;
+ columnComparator_ = comparator;
+ }
+
+ EfficientBidiMap(Object[] objects, Comparator<IColumn> columnComparator)
+ {
+ columnComparator_ = columnComparator;
+ sortedSet_ = new TreeSet<IColumn>(columnComparator);
+ for ( Object object : objects )
+ {
+ IColumn column = (IColumn)object;
+ sortedSet_.add(column);
+ map_.put(column.name(), column);
+ }
+ }
+
+ public Comparator<IColumn> getComparator()
+ {
+ return columnComparator_;
+ }
+
+ public void put(String key, IColumn column)
+ {
+ IColumn oldColumn = map_.get(key);
+ if( oldColumn != null )
+ sortedSet_.remove( oldColumn );
+ map_.put(key, column);
+ sortedSet_.add(column);
+ }
+
+ public IColumn get(String key)
+ {
+ return map_.get(key);
+ }
+
+ public SortedSet<IColumn> getSortedColumns()
+ {
+ return sortedSet_;
+ }
+
+ public Map<String, IColumn> getColumns()
+ {
+ return map_;
+ }
+
+ public int size()
+ {
+ return map_.size();
+ }
+
+ public void remove (String columnName)
+ {
+ sortedSet_.remove(map_.get(columnName));
+ map_.remove(columnName);
+ }
+ void clear()
+ {
+ map_.clear();
+ sortedSet_.clear();
+ }
+
+ ColumnComparatorFactory.ComparatorType getComparatorType()
+ {
+ return ((AbstractColumnComparator)columnComparator_).getComparatorType();
+ }
+
+ EfficientBidiMap cloneMe()
+ {
+ Map<String, IColumn> map = new HashMap<String, IColumn>(map_);
+ SortedSet<IColumn> set = new TreeSet<IColumn>(sortedSet_);
+ return new EfficientBidiMap(map, set, columnComparator_);
+ }
+}
+
+
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/FileNameComparator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/FileNameComparator.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/FileNameComparator.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/FileNameComparator.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,32 @@
+package org.apache.cassandra.db;
+
+import java.util.Comparator;
+
+class FileNameComparator implements Comparator<String>
+{
+ // 0 - ascending , 1- descending
+ private int order_ = 1 ;
+
+ public static final int Ascending = 0 ;
+ public static final int Descending = 1 ;
+
+ FileNameComparator( int order )
+ {
+ order_ = order;
+ }
+
+ public int compare(String f, String f2)
+ {
+ if( order_ == 1 )
+ return ColumnFamilyStore.getIndexFromFileName(f2) - ColumnFamilyStore.getIndexFromFileName(f);
+ else
+ return ColumnFamilyStore.getIndexFromFileName(f) - ColumnFamilyStore.getIndexFromFileName(f2);
+ }
+
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof FileNameComparator))
+ return false;
+ return true;
+ }
+}
\ No newline at end of file
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.math.BigInteger;
+
+import org.apache.cassandra.continuations.Suspendable;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.service.PartitionerType;
+import org.apache.cassandra.service.StorageService;
+
+
+public class FileStruct implements Comparable<FileStruct>
+{
+ IFileReader reader_;
+ String key_;
+ DataInputBuffer bufIn_;
+ DataOutputBuffer bufOut_;
+
+ public FileStruct()
+ {
+ }
+
+ public FileStruct(String file, int bufSize) throws IOException
+ {
+ bufIn_ = new DataInputBuffer();
+ bufOut_ = new DataOutputBuffer();
+ reader_ = SequenceFile.bufferedReader(file, bufSize);
+ long bytesRead = advance();
+ if ( bytesRead == -1L )
+ throw new IOException("Either the file is empty or EOF has been reached.");
+ }
+
+ public String getKey()
+ {
+ return key_;
+ }
+
+ public DataOutputBuffer getBuffer()
+ {
+ return bufOut_;
+ }
+
+ public long advance() throws IOException
+ {
+ long bytesRead = -1L;
+ bufOut_.reset();
+ /* advance and read the next key in the file. */
+ if (reader_.isEOF())
+ {
+ reader_.close();
+ return bytesRead;
+ }
+
+ bytesRead = reader_.next(bufOut_);
+ if (bytesRead == -1)
+ {
+ reader_.close();
+ return bytesRead;
+ }
+
+ bufIn_.reset(bufOut_.getData(), bufOut_.getLength());
+ key_ = bufIn_.readUTF();
+ /* If the key we read is the Block Index Key then omit and read the next key. */
+ if ( key_.equals(SSTable.blockIndexKey_) )
+ {
+ bufOut_.reset();
+ bytesRead = reader_.next(bufOut_);
+ if (bytesRead == -1)
+ {
+ reader_.close();
+ return bytesRead;
+ }
+ bufIn_.reset(bufOut_.getData(), bufOut_.getLength());
+ key_ = bufIn_.readUTF();
+ }
+
+ return bytesRead;
+ }
+
+ public int compareTo(FileStruct f)
+ {
+ int value = 0;
+ PartitionerType pType = StorageService.getPartitionerType();
+ switch( pType )
+ {
+ case OPHF:
+ value = key_.compareTo(f.key_);
+ break;
+
+ default:
+ String lhs = key_.split(":")[0];
+ BigInteger b = new BigInteger(lhs);
+ String rhs = f.key_.split(":")[0];
+ BigInteger b2 = new BigInteger(rhs);
+ value = b.compareTo(b2);
+ break;
+ }
+ return value;
+ }
+
+ public void close() throws IOException
+ {
+ bufIn_.close();
+ bufOut_.close();
+ reader_.close();
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStructComparator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStructComparator.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStructComparator.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStructComparator.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,18 @@
+package org.apache.cassandra.db;
+
+import java.util.Comparator;
+
+class FileStructComparator implements Comparator<FileStruct>
+{
+ public int compare(FileStruct f, FileStruct f2)
+ {
+ return f.reader_.getFileName().compareTo(f2.reader_.getFileName());
+ }
+
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof FileStructComparator))
+ return false;
+ return true;
+ }
+}
\ No newline at end of file
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.Collection;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.IComponentShutdown;
+import org.apache.cassandra.service.IResponseResolver;
+import org.apache.cassandra.service.QuorumResponseHandler;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.WriteResponseResolver;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class HintedHandOffManager implements IComponentShutdown
+{
+ private static HintedHandOffManager instance_;
+ private static Lock lock_ = new ReentrantLock();
+ private static Logger logger_ = Logger.getLogger(HintedHandOffManager.class);
+ public static final String key_ = "HintedHandOffKey";
+ final static long intervalInMins_ = 20;
+ private ScheduledExecutorService executor_ = new DebuggableScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("HINTED-HANDOFF-POOL"));
+
+
+ public static HintedHandOffManager instance()
+ {
+ if ( instance_ == null )
+ {
+ lock_.lock();
+ try
+ {
+ if ( instance_ == null )
+ instance_ = new HintedHandOffManager();
+ }
+ finally
+ {
+ lock_.unlock();
+ }
+ }
+ return instance_;
+ }
+
+ class HintedHandOff implements Runnable
+ {
+ private ColumnFamilyStore columnFamilyStore_ = null;
+ private EndPoint endPoint_ = null;
+
+ HintedHandOff(ColumnFamilyStore columnFamilyStore)
+ {
+ columnFamilyStore_ = columnFamilyStore;
+ }
+ HintedHandOff(EndPoint endPoint)
+ {
+ endPoint_ = endPoint;
+ }
+
+ private boolean sendMessage(String endpointAddress, String key) throws Exception
+ {
+ boolean success = false; // TODO : fix the hack we need to make sure the data is written on the other end.
+ if(FailureDetector.instance().isAlive(new EndPoint(endpointAddress, DatabaseDescriptor.getControlPort())))
+ {
+ success = true;
+ }
+ else
+ {
+ return success;
+ }
+ Table table = Table.open(DatabaseDescriptor.getTables().get(0));
+ Row row = null;
+ row = table.get(key);
+ RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), row);
+ RowMutationMessage rmMsg = new RowMutationMessage(rm);
+ Message message = RowMutationMessage.makeRowMutationMessage( rmMsg );
+ EndPoint endPoint = new EndPoint(endpointAddress, DatabaseDescriptor.getStoragePort());
+ MessagingService.getMessagingInstance().sendOneWay(message, endPoint);
+ return success;
+ }
+
+ private void deleteEndPoint(String endpointAddress, String key) throws Exception
+ {
+ RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), key_);
+ rm.delete(Table.hints_ + ":" + key + ":" + endpointAddress);
+ rm.apply();
+ }
+
+ private void deleteKey(String key) throws Exception
+ {
+ RowMutation rm = new RowMutation(DatabaseDescriptor.getTables().get(0), key_);
+ rm.delete(Table.hints_ + ":" + key);
+ rm.apply();
+ }
+
+ private void runHints()
+ {
+ logger_.debug("Started hinted handoff " + columnFamilyStore_.columnFamily_);
+
+ // 1. Scan through all the keys that we need to handoff
+ // 2. For each key read the list of recepients and send
+ // 3. Delete that recepient from the key if write was successful
+ // 4. If all writes were success for a given key we can even delete the key .
+ // 5. Now force a flush
+ // 6. Do major compaction to clean up all deletes etc.
+ // 7. I guess we r done
+ Table table = Table.open(DatabaseDescriptor.getTables().get(0));
+ ColumnFamily hintedColumnFamily = null;
+ boolean success = false;
+ boolean allsuccess = true;
+ try
+ {
+ hintedColumnFamily = table.get(key_, Table.hints_);
+ if(hintedColumnFamily == null)
+ {
+ // Force flush now
+ columnFamilyStore_.forceFlush(false);
+ return;
+ }
+ Collection<IColumn> keys = hintedColumnFamily.getAllColumns();
+ if(keys != null)
+ {
+ for(IColumn key : keys)
+ {
+ // Get all the endpoints for teh key
+ Collection<IColumn> endpoints = key.getSubColumns();
+ allsuccess = true;
+ if ( endpoints != null )
+ {
+ for(IColumn endpoint : endpoints )
+ {
+ success = sendMessage(endpoint.name(), key.name());
+ if(success)
+ {
+ // Delete the endpoint from the list
+ deleteEndPoint(endpoint.name(), key.name());
+ }
+ else
+ {
+ allsuccess = false;
+ }
+ }
+ }
+ if(endpoints == null || allsuccess)
+ {
+ // Delete the key itself.
+ deleteKey(key.name());
+ }
+ }
+ }
+ // Force flush now
+ columnFamilyStore_.forceFlush(false);
+
+ // Now do a major compaction
+ columnFamilyStore_.forceCompaction(null, null, 0, null);
+ }
+ catch ( Exception ex)
+ {
+ logger_.warn(ex.getMessage());
+ }
+ logger_.debug("Finished hinted handoff ..."+columnFamilyStore_.columnFamily_);
+ }
+
+ private void runDeliverHints(EndPoint to)
+ {
+ logger_.debug("Started hinted handoff for endPoint " + endPoint_.getHost());
+
+ // 1. Scan through all the keys that we need to handoff
+ // 2. For each key read the list of recepients if teh endpoint matches send
+ // 3. Delete that recepient from the key if write was successful
+
+ Table table = Table.open(DatabaseDescriptor.getTables().get(0));
+ ColumnFamily hintedColumnFamily = null;
+ boolean success = false;
+ try
+ {
+ hintedColumnFamily = table.get(key_, Table.hints_);
+ if(hintedColumnFamily == null)
+ return;
+ Collection<IColumn> keys = hintedColumnFamily.getAllColumns();
+ if(keys != null)
+ {
+ for(IColumn key : keys)
+ {
+ // Get all the endpoints for teh key
+ Collection<IColumn> endpoints = key.getSubColumns();
+ if ( endpoints != null )
+ {
+ for(IColumn endpoint : endpoints )
+ {
+ if(endpoint.name().equals(endPoint_.getHost()))
+ {
+ success = sendMessage(endpoint.name(), key.name());
+ if(success)
+ {
+ // Delete the endpoint from the list
+ deleteEndPoint(endpoint.name(), key.name());
+ }
+ }
+ }
+ }
+ if(endpoints == null)
+ {
+ // Delete the key itself.
+ deleteKey(key.name());
+ }
+ }
+ }
+ }
+ catch ( Exception ex)
+ {
+ logger_.warn(ex.getMessage());
+ }
+ logger_.debug("Finished hinted handoff for endpoint ..." + endPoint_.getHost());
+ }
+
+ public void run()
+ {
+ if(endPoint_ == null)
+ {
+ runHints();
+ }
+ else
+ {
+ runDeliverHints(endPoint_);
+ }
+
+ }
+ }
+
+ public HintedHandOffManager()
+ {
+ StorageService.instance().registerComponentForShutdown(this);
+ }
+
+ public void submit(ColumnFamilyStore columnFamilyStore)
+ {
+ executor_.scheduleWithFixedDelay(new HintedHandOff(columnFamilyStore), HintedHandOffManager.intervalInMins_,
+ HintedHandOffManager.intervalInMins_, TimeUnit.MINUTES);
+ }
+
+ /*
+ * This method is used to deliver hints to a particular endpoint.
+ * When we learn that some endpoint is back up we deliver the data
+ * to him via an event driven mechanism.
+ */
+ public void deliverHints(EndPoint to)
+ {
+ executor_.submit(new HintedHandOff(to));
+ }
+
+ public void shutdown()
+ {
+ executor_.shutdownNow();
+ }
+}