You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 07:04:22 UTC
svn commit: r749202 [2/6] - in /incubator/cassandra/src: ./ org/ org/apache/
org/apache/cassandra/ org/apache/cassandra/db/
Added: incubator/cassandra/src/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/ColumnFamilyStore.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/ColumnFamilyStore.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/ColumnFamilyStore.java Mon Mar 2 06:04:20 2009
@@ -0,0 +1,1556 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.*;
+import java.math.BigInteger;
+import java.nio.file.Path;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.IndexHelper;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.PartitionerType;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.FileUtils;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.utils.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ColumnFamilyStore
+{
+ private static int threshHold_ = 4;
+ private static final int bufSize_ = 128*1024*1024;
+ private static int compactionMemoryThreshold_ = 1 << 30;
+ private static Logger logger_ = Logger.getLogger(ColumnFamilyStore.class);
+
+ private String table_;
+ public String columnFamily_;
+
+ /* This is used to generate the next index for a SSTable */
+ private AtomicInteger fileIndexGenerator_ = new AtomicInteger(0);
+
+ /* memtable associated with this ColumnFamilyStore. */
+ private AtomicReference<Memtable> memtable_;
+ private AtomicReference<BinaryMemtable> binaryMemtable_;
+
+ /* SSTables on disk for this column family */
+ private Set<String> ssTables_ = new HashSet<String>();
+
+ /* Modification lock used for protecting reads from compactions. */
+ private ReentrantReadWriteLock lock_ = new ReentrantReadWriteLock(true);
+
+ /* Flag indicates if a compaction is in process */
+ public AtomicBoolean isCompacting_ = new AtomicBoolean(false);
+
+ ColumnFamilyStore(String table, String columnFamily) throws IOException
+ {
+ table_ = table;
+ columnFamily_ = columnFamily;
+ /*
+ * Get all data files associated with old Memtables for this table.
+ * These files are named as follows <Table>-1.db, ..., <Table>-n.db. Get
+ * the max which in this case is n and increment it to use it for next
+ * index.
+ */
+ List<Integer> indices = new ArrayList<Integer>();
+ String[] dataFileDirectories = DatabaseDescriptor.getAllDataFileLocations();
+ for ( String directory : dataFileDirectories )
+ {
+ File fileDir = new File(directory);
+ File[] files = fileDir.listFiles();
+ for (File file : files)
+ {
+ String filename = file.getName();
+ String[] tblCfName = getTableAndColumnFamilyName(filename);
+ if (tblCfName[0].equals(table_)
+ && tblCfName[1].equals(columnFamily))
+ {
+ int index = getIndexFromFileName(filename);
+ indices.add(index);
+ }
+ }
+ }
+ Collections.sort(indices);
+ int value = (indices.size() > 0) ? (indices.get(indices.size() - 1)) : 0;
+ fileIndexGenerator_.set(value);
+ memtable_ = new AtomicReference<Memtable>( new Memtable(table_, columnFamily_) );
+ binaryMemtable_ = new AtomicReference<BinaryMemtable>( new BinaryMemtable(table_, columnFamily_) );
+ }
+
+ void onStart() throws IOException
+ {
+ /* Do major compaction */
+ List<File> ssTables = new ArrayList<File>();
+ String[] dataFileDirectories = DatabaseDescriptor.getAllDataFileLocations();
+ for ( String directory : dataFileDirectories )
+ {
+ File fileDir = new File(directory);
+ File[] files = fileDir.listFiles();
+ for (File file : files)
+ {
+ String filename = file.getName();
+ if(((file.length() == 0) || (filename.indexOf("-" + SSTable.temporaryFile_) != -1) ) && (filename.indexOf(columnFamily_) != -1))
+ {
+ file.delete();
+ continue;
+ }
+ String[] tblCfName = getTableAndColumnFamilyName(filename);
+ if (tblCfName[0].equals(table_)
+ && tblCfName[1].equals(columnFamily_)
+ && filename.indexOf("-Data.db") != -1)
+ {
+ ssTables.add(file.getAbsoluteFile());
+ }
+ }
+ }
+ Collections.sort(ssTables, new FileUtils.FileComparator());
+ List<String> filenames = new ArrayList<String>();
+ for (File ssTable : ssTables)
+ {
+ filenames.add(ssTable.getAbsolutePath());
+ }
+
+ /* There are no files to compact just add to the list of SSTables */
+ ssTables_.addAll(filenames);
+ /* Load the index files and the Bloom Filters associated with them. */
+ SSTable.onStart(filenames);
+ logger_.debug("Submitting a major compaction task ...");
+ MinorCompactionManager.instance().submit(ColumnFamilyStore.this);
+ if(columnFamily_.equals(Table.hints_))
+ {
+ HintedHandOffManager.instance().submit(this);
+ }
+ MinorCompactionManager.instance().submitPeriodicCompaction(this);
+ }
+
+ List<String> getAllSSTablesOnDisk()
+ {
+ return new ArrayList<String>(ssTables_);
+ }
+
+ /*
+ * This method is called to obtain statistics about
+ * the Column Family represented by this Column Family
+ * Store. It will report the total number of files on
+ * disk and the total space oocupied by the data files
+ * associated with this Column Family.
+ */
+ public String cfStats(String newLineSeparator, java.text.DecimalFormat df)
+ {
+ StringBuilder sb = new StringBuilder();
+ /*
+ * We want to do this so that if there are
+ * no files on disk we do not want to display
+ * something ugly on the admin page.
+ */
+ if ( ssTables_.size() == 0 )
+ {
+ return sb.toString();
+ }
+ sb.append(columnFamily_ + " statistics :");
+ sb.append(newLineSeparator);
+ sb.append("Number of files on disk : " + ssTables_.size());
+ sb.append(newLineSeparator);
+ double totalSpace = 0d;
+ for ( String file : ssTables_ )
+ {
+ File f = new File(file);
+ totalSpace += f.length();
+ }
+ String diskSpace = FileUtils.stringifyFileSize(totalSpace);
+ sb.append("Total disk space : " + diskSpace);
+ sb.append(newLineSeparator);
+ sb.append("--------------------------------------");
+ sb.append(newLineSeparator);
+ return sb.toString();
+ }
+
+ /*
+ * This is called after bootstrap to add the files
+ * to the list of files maintained.
+ */
+ void addToList(String file)
+ {
+ lock_.writeLock().lock();
+ try
+ {
+ ssTables_.add(file);
+ }
+ finally
+ {
+ lock_.writeLock().unlock();
+ }
+ }
+
+ void touch(String key, boolean fData) throws IOException
+ {
+ /* Scan the SSTables on disk first */
+ lock_.readLock().lock();
+ try
+ {
+ List<String> files = new ArrayList<String>(ssTables_);
+ for (String file : files)
+ {
+ /*
+ * Get the BloomFilter associated with this file. Check if the key
+ * is present in the BloomFilter. If not continue to the next file.
+ */
+ boolean bVal = SSTable.isKeyInFile(key, file);
+ if ( !bVal )
+ continue;
+ SSTable ssTable = new SSTable(file);
+ ssTable.touch(key, fData);
+ }
+ }
+ finally
+ {
+ lock_.readLock().unlock();
+ }
+ }
+
+ /*
+ * This method forces a compaction of the SSTables on disk. We wait
+ * for the process to complete by waiting on a future pointer.
+ */
+ boolean forceCompaction(List<Range> ranges, EndPoint target, long skip, List<String> fileList)
+ {
+ Future<Boolean> futurePtr = null;
+ if( ranges != null)
+ futurePtr = MinorCompactionManager.instance().submit(ColumnFamilyStore.this, ranges, target, fileList);
+ else
+ MinorCompactionManager.instance().submitMajor(ColumnFamilyStore.this, ranges, skip);
+
+ boolean result = true;
+ try
+ {
+ /* Waiting for the compaction to complete. */
+ if(futurePtr != null)
+ result = futurePtr.get();
+ logger_.debug("Done forcing compaction ...");
+ }
+ catch (ExecutionException ex)
+ {
+ logger_.debug(LogUtil.throwableToString(ex));
+ }
+ catch ( InterruptedException ex2 )
+ {
+ logger_.debug(LogUtil.throwableToString(ex2));
+ }
+ return result;
+ }
+
+ String getColumnFamilyName()
+ {
+ return columnFamily_;
+ }
+
+ private String[] getTableAndColumnFamilyName(String filename)
+ {
+ StringTokenizer st = new StringTokenizer(filename, "-");
+ String[] values = new String[2];
+ int i = 0;
+ while (st.hasMoreElements())
+ {
+ if (i == 0)
+ values[i] = (String) st.nextElement();
+ else if (i == 1)
+ {
+ values[i] = (String) st.nextElement();
+ break;
+ }
+ ++i;
+ }
+ return values;
+ }
+
+ protected static int getIndexFromFileName(String filename)
+ {
+ /*
+ * File name is of the form <table>-<column family>-<index>-Data.db.
+ * This tokenizer will strip the .db portion.
+ */
+ StringTokenizer st = new StringTokenizer(filename, "-");
+ /*
+ * Now I want to get the index portion of the filename. We accumulate
+ * the indices and then sort them to get the max index.
+ */
+ int count = st.countTokens();
+ int i = 0;
+ String index = null;
+ while (st.hasMoreElements())
+ {
+ index = (String) st.nextElement();
+ if (i == (count - 2))
+ break;
+ ++i;
+ }
+ return Integer.parseInt(index);
+ }
+
+ String getNextFileName()
+ {
+ // Psuedo increment so that we do not generate consecutive numbers
+ fileIndexGenerator_.incrementAndGet();
+ String name = table_ + "-" + columnFamily_ + "-" + fileIndexGenerator_.incrementAndGet();
+ return name;
+ }
+
+ /*
+ * Return a temporary file name.
+ */
+ String getTempFileName()
+ {
+ // Psuedo increment so that we do not generate consecutive numbers
+ fileIndexGenerator_.incrementAndGet();
+ String name = table_ + "-" + columnFamily_ + "-" + SSTable.temporaryFile_ + "-" + fileIndexGenerator_.incrementAndGet() ;
+ return name;
+ }
+
+ /*
+ * Return a temporary file name. Based on the list of files input
+ * This fn sorts the list and generates a number between he 2 lowest filenames
+ * ensuring uniqueness.
+ * Since we do not generate consecutive numbers hence the lowest file number
+ * can just be incremented to generate the next file.
+ */
+ String getTempFileName( List<String> files)
+ {
+ int lowestIndex = 0 ;
+ int index = 0;
+ Collections.sort(files, new FileNameComparator(FileNameComparator.Ascending));
+
+ if( files.size() <= 1)
+ return null;
+ lowestIndex = getIndexFromFileName(files.get(0));
+
+ index = lowestIndex + 1 ;
+
+ String name = table_ + "-" + columnFamily_ + "-" + SSTable.temporaryFile_ + "-" + index ;
+ return name;
+ }
+
+
+ /*
+ * This version is used only on start up when we are recovering from logs.
+ * In the future we may want to parellelize the log processing for a table
+ * by having a thread per log file present for recovery. Re-visit at that
+ * time.
+ */
+ void switchMemtable(String key, ColumnFamily columnFamily, CommitLog.CommitLogContext cLogCtx) throws IOException
+ {
+ memtable_.set( new Memtable(table_, columnFamily_) );
+ if(!key.equals(Memtable.flushKey_))
+ memtable_.get().put(key, columnFamily, cLogCtx);
+ }
+
+ /*
+ * This version is used when we forceflush.
+ */
+ void switchMemtable() throws IOException
+ {
+ memtable_.set( new Memtable(table_, columnFamily_) );
+ }
+
+ /*
+ * This version is used only on start up when we are recovering from logs.
+ * In the future we may want to parellelize the log processing for a table
+ * by having a thread per log file present for recovery. Re-visit at that
+ * time.
+ */
+ void switchBinaryMemtable(String key, byte[] buffer) throws IOException
+ {
+ binaryMemtable_.set( new BinaryMemtable(table_, columnFamily_) );
+ binaryMemtable_.get().put(key, buffer);
+ }
+
+ void forceFlush(boolean fRecovery) throws IOException
+ {
+ //MemtableManager.instance().submit(getColumnFamilyName(), memtable_.get() , CommitLog.CommitLogContext.NULL);
+ //memtable_.get().flush(true, CommitLog.CommitLogContext.NULL);
+ memtable_.get().forceflush(this, fRecovery);
+ }
+
+ void forceFlushBinary() throws IOException
+ {
+ BinaryMemtableManager.instance().submit(getColumnFamilyName(), binaryMemtable_.get());
+ //binaryMemtable_.get().flush(true);
+ }
+
+ /**
+ * Insert/Update the column family for this key.
+ * param @ lock - lock that needs to be used.
+ * param @ key - key for update/insert
+ * param @ columnFamily - columnFamily changes
+ */
+ void apply(String key, ColumnFamily columnFamily, CommitLog.CommitLogContext cLogCtx)
+ throws IOException
+ {
+ memtable_.get().put(key, columnFamily, cLogCtx);
+ }
+
+ /*
+ * Insert/Update the column family for this key. param @ lock - lock that
+ * needs to be used. param @ key - key for update/insert param @
+ * columnFamily - columnFamily changes
+ */
+ void applyBinary(String key, byte[] buffer)
+ throws IOException
+ {
+ binaryMemtable_.get().put(key, buffer);
+ }
+
+ /**
+ *
+ * Get the column family in the most efficient order.
+ * 1. Memtable
+ * 2. Sorted list of files
+ */
+ public ColumnFamily getColumnFamily(String key, String cf, IFilter filter) throws IOException
+ {
+ List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
+ ColumnFamily columnFamily = null;
+ long start = System.currentTimeMillis();
+ /* Get the ColumnFamily from Memtable */
+ getColumnFamilyFromCurrentMemtable(key, cf, filter, columnFamilies);
+ if(columnFamilies.size() != 0)
+ {
+ if(filter.isDone())
+ return columnFamilies.get(0);
+ }
+ /* Check if MemtableManager has any historical information */
+ MemtableManager.instance().getColumnFamily(key, columnFamily_, cf, filter, columnFamilies);
+ if(columnFamilies.size() != 0)
+ {
+ columnFamily = resolve(columnFamilies);
+ if(filter.isDone())
+ return columnFamily;
+ columnFamilies.clear();
+ columnFamilies.add(columnFamily);
+ }
+ getColumnFamilyFromDisk(key, cf, columnFamilies, filter);
+ logger_.debug("DISK TIME: " + (System.currentTimeMillis() - start)
+ + " ms.");
+ columnFamily = resolve(columnFamilies);
+
+ return columnFamily;
+ }
+
+ public ColumnFamily getColumnFamilyFromMemory(String key, String cf, IFilter filter)
+ {
+ List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
+ ColumnFamily columnFamily = null;
+ long start = System.currentTimeMillis();
+ /* Get the ColumnFamily from Memtable */
+ getColumnFamilyFromCurrentMemtable(key, cf, filter, columnFamilies);
+ if(columnFamilies.size() != 0)
+ {
+ if(filter.isDone())
+ return columnFamilies.get(0);
+ }
+ /* Check if MemtableManager has any historical information */
+ MemtableManager.instance().getColumnFamily(key, columnFamily_, cf, filter, columnFamilies);
+ if(columnFamilies.size() != 0)
+ {
+ columnFamily = resolve(columnFamilies);
+ if(filter.isDone())
+ return columnFamily;
+ columnFamilies.clear();
+ columnFamilies.add(columnFamily);
+ }
+ columnFamily = resolve(columnFamilies);
+ return columnFamily;
+ }
+
+ /**
+ * Fetch from disk files and go in sorted order to be efficient
+ * This fn exits as soon as the required data is found.
+ * @param key
+ * @param cf
+ * @param columnFamilies
+ * @param filter
+ * @throws IOException
+ */
+ private void getColumnFamilyFromDisk(String key, String cf, List<ColumnFamily> columnFamilies, IFilter filter) throws IOException
+ {
+ /* Scan the SSTables on disk first */
+ List<String> files = new ArrayList<String>();
+ lock_.readLock().lock();
+ try
+ {
+ files.addAll(ssTables_);
+ Collections.sort(files, new FileNameComparator(FileNameComparator.Descending));
+ }
+ finally
+ {
+ lock_.readLock().unlock();
+ }
+
+ for (String file : files)
+ {
+ /*
+ * Get the BloomFilter associated with this file. Check if the key
+ * is present in the BloomFilter. If not continue to the next file.
+ */
+ boolean bVal = SSTable.isKeyInFile(key, file);
+ if ( !bVal )
+ continue;
+ ColumnFamily columnFamily = fetchColumnFamily(key, cf, filter, file);
+ long start = System.currentTimeMillis();
+ if (columnFamily != null)
+ {
+ /*
+ * TODO
+ * By using the filter before removing deleted columns
+ * we have a efficient implementation of timefilter
+ * but for count filter this can return wrong results
+ * we need to take care of that later.
+ */
+ /* suppress columns marked for delete */
+ Map<String, IColumn> columns = columnFamily.getColumns();
+ Set<String> cNames = columns.keySet();
+
+ for (String cName : cNames)
+ {
+ IColumn column = columns.get(cName);
+ if (column.isMarkedForDelete())
+ columns.remove(cName);
+ }
+ columnFamilies.add(columnFamily);
+ if(filter.isDone())
+ {
+ break;
+ }
+ }
+ logger_.debug("DISK Data structure population TIME: " + (System.currentTimeMillis() - start)
+ + " ms.");
+ }
+ files.clear();
+ }
+
+
+ private ColumnFamily fetchColumnFamily(String key, String cf, IFilter filter, String ssTableFile) throws IOException
+ {
+ SSTable ssTable = new SSTable(ssTableFile);
+ long start = System.currentTimeMillis();
+ DataInputBuffer bufIn = null;
+ bufIn = filter.next(key, cf, ssTable);
+ logger_.debug("DISK ssTable.next TIME: " + (System.currentTimeMillis() - start) + " ms.");
+ if (bufIn.getLength() == 0)
+ return null;
+ start = System.currentTimeMillis();
+ ColumnFamily columnFamily = null;
+ columnFamily = ColumnFamily.serializer().deserialize(bufIn, cf, filter);
+ logger_.debug("DISK Deserialize TIME: " + (System.currentTimeMillis() - start) + " ms.");
+ if (columnFamily == null)
+ return columnFamily;
+ return (!columnFamily.isMarkedForDelete()) ? columnFamily : null;
+ }
+
+
+
+ private void getColumnFamilyFromCurrentMemtable(String key, String cf, IFilter filter, List<ColumnFamily> columnFamilies)
+ {
+ /* Get the ColumnFamily from Memtable */
+ ColumnFamily columnFamily = memtable_.get().get(key, cf, filter);
+ if (columnFamily != null)
+ {
+ if (!columnFamily.isMarkedForDelete())
+ columnFamilies.add(columnFamily);
+ }
+ }
+
+ private ColumnFamily resolve(List<ColumnFamily> columnFamilies)
+ {
+ int size = columnFamilies.size();
+ if (size == 0)
+ return null;
+ ColumnFamily cf = columnFamilies.get(0);
+ for ( int i = 1; i < size ; ++i )
+ {
+ cf.addColumns(columnFamilies.get(i));
+ }
+ return cf;
+ }
+
+
+ /*
+ * This version is used only on start up when we are recovering from logs.
+ * Hence no locking is required since we process logs on the main thread. In
+ * the future we may want to parellelize the log processing for a table by
+ * having a thread per log file present for recovery. Re-visit at that time.
+ */
+ void applyNow(String key, ColumnFamily columnFamily) throws IOException
+ {
+ if (!columnFamily.isMarkedForDelete())
+ memtable_.get().putOnRecovery(key, columnFamily);
+ }
+
+ /*
+ * Delete doesn't mean we can blindly delete. We need to write this to disk
+ * as being marked for delete. This is to prevent a previous value from
+ * resuscitating a column family that has been deleted.
+ */
+ void delete(String key, ColumnFamily columnFamily)
+ throws IOException
+ {
+ memtable_.get().remove(key, columnFamily);
+ }
+
+ /*
+ * This method is called when the Memtable is frozen and ready to be flushed
+ * to disk. This method informs the CommitLog that a particular ColumnFamily
+ * is being flushed to disk.
+ */
+ void onMemtableFlush(CommitLog.CommitLogContext cLogCtx) throws IOException
+ {
+ if ( cLogCtx.isValidContext() )
+ CommitLog.open(table_).onMemtableFlush(columnFamily_, cLogCtx);
+ }
+
+ /*
+ * Called after the Memtable flushes its in-memory data. This information is
+ * cached in the ColumnFamilyStore. This is useful for reads because the
+ * ColumnFamilyStore first looks in the in-memory store and the into the
+ * disk to find the key. If invoked during recoveryMode the
+ * onMemtableFlush() need not be invoked.
+ *
+ * param @ filename - filename just flushed to disk
+ * param @ bf - bloom filter which indicates the keys that are in this file.
+ */
+ void storeLocation(String filename, BloomFilter bf) throws IOException
+ {
+ boolean doCompaction = false;
+ int ssTableSize = 0;
+ lock_.writeLock().lock();
+ try
+ {
+ ssTables_.add(filename);
+ SSTable.storeBloomFilter(filename, bf);
+ ssTableSize = ssTables_.size();
+ }
+ finally
+ {
+ lock_.writeLock().unlock();
+ }
+ if (ssTableSize >= threshHold_ && !isCompacting_.get())
+ {
+ doCompaction = true;
+ }
+
+ if (isCompacting_.get())
+ {
+ if ( ssTableSize % threshHold_ == 0 )
+ {
+ doCompaction = true;
+ }
+ }
+ if ( doCompaction )
+ {
+ logger_.debug("Submitting for compaction ...");
+ MinorCompactionManager.instance().submit(ColumnFamilyStore.this);
+ logger_.debug("Submitted for compaction ...");
+ }
+ }
+
+ PriorityQueue<FileStruct> initializePriorityQueue(List<String> files, List<Range> ranges, int minBufferSize) throws IOException
+ {
+ PriorityQueue<FileStruct> pq = new PriorityQueue<FileStruct>();
+ if (files.size() > 1 || (ranges != null && files.size() > 0))
+ {
+ int bufferSize = Math.min( (ColumnFamilyStore.compactionMemoryThreshold_ / files.size()), minBufferSize ) ;
+ FileStruct fs = null;
+ for (String file : files)
+ {
+ try
+ {
+ fs = new FileStruct();
+ fs.bufIn_ = new DataInputBuffer();
+ fs.bufOut_ = new DataOutputBuffer();
+ fs.reader_ = SequenceFile.bufferedReader(file, bufferSize);
+ fs.key_ = null;
+ fs = getNextKey(fs);
+ if(fs == null)
+ continue;
+ pq.add(fs);
+ }
+ catch ( Exception ex)
+ {
+ ex.printStackTrace();
+ try
+ {
+ if(fs != null)
+ {
+ fs.reader_.close();
+ }
+ }
+ catch(Exception e)
+ {
+ logger_.warn("Unable to close file :" + file);
+ }
+ continue;
+ }
+ }
+ }
+ return pq;
+ }
+
+
+ /*
+ * Stage the compactions , compact similar size files.
+ * This fn figures out the files close enough by size and if they
+ * are greater than the threshold then compacts.
+ */
+ Map<Integer, List<String>> stageOrderedCompaction(List<String> files)
+ {
+ // Sort the files based on the generation ID
+ Collections.sort(files, new FileNameComparator(FileNameComparator.Ascending));
+ Map<Integer, List<String>> buckets = new HashMap<Integer, List<String>>();
+ int maxBuckets = 1000;
+ long averages[] = new long[maxBuckets];
+ long min = 50L*1024L*1024L;
+ Integer i = 0;
+ for(String file : files)
+ {
+ File f = new File(file);
+ long size = f.length();
+ if ( (size > averages[i]/2 && size < 3*averages[i]/2) || ( size < min && averages[i] < min ))
+ {
+ averages[i] = (averages[i] + size) / 2 ;
+ List<String> fileList = buckets.get(i);
+ if(fileList == null)
+ {
+ fileList = new ArrayList<String>();
+ buckets.put(i, fileList);
+ }
+ fileList.add(file);
+ }
+ else
+ {
+ if( i >= maxBuckets )
+ break;
+ i++;
+ List<String> fileList = new ArrayList<String>();
+ buckets.put(i, fileList);
+ fileList.add(file);
+ averages[i] = size;
+ }
+ }
+ return buckets;
+ }
+
+
+
+ /*
+ * Break the files into buckets and then compact.
+ */
+ void doCompaction() throws IOException
+ {
+ isCompacting_.set(true);
+ List<String> files = new ArrayList<String>(ssTables_);
+ try
+ {
+ int count = 0;
+ Map<Integer, List<String>> buckets = stageOrderedCompaction(files);
+ Set<Integer> keySet = buckets.keySet();
+ for(Integer key : keySet)
+ {
+ List<String> fileList = buckets.get(key);
+ Collections.sort( fileList , new FileNameComparator( FileNameComparator.Ascending));
+ if(fileList.size() >= threshHold_ )
+ {
+ files.clear();
+ count = 0;
+ for(String file : fileList)
+ {
+ files.add(file);
+ count++;
+ if( count == threshHold_ )
+ break;
+ }
+ try
+ {
+ // For each bucket if it has crossed the threshhold do the compaction
+ // In case of range compaction merge the counting bloom filters also.
+ if( count == threshHold_)
+ doFileCompaction(files, bufSize_);
+ }
+ catch ( Exception ex)
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+ }
+ }
+ }
+ finally
+ {
+ isCompacting_.set(false);
+ }
+ return;
+ }
+
+ void doMajorCompaction(long skip) throws IOException
+ {
+ doMajorCompactionInternal( skip );
+ }
+
+ void doMajorCompaction() throws IOException
+ {
+ doMajorCompactionInternal( 0 );
+ }
+ /*
+ * Compact all the files irrespective of the size.
+ * skip : is the ammount in Gb of the files to be skipped
+ * all files greater than skip GB are skipped for this compaction.
+ * Except if skip is 0 , in that case this is ignored and all files are taken.
+ */
+ void doMajorCompactionInternal(long skip) throws IOException
+ {
+ isCompacting_.set(true);
+ List<String> filesInternal = new ArrayList<String>(ssTables_);
+ List<String> files = null;
+ try
+ {
+ if( skip > 0L )
+ {
+ files = new ArrayList<String>();
+ for ( String file : filesInternal )
+ {
+ File f = new File(file);
+ if( f.length() < skip*1024L*1024L*1024L )
+ {
+ files.add(file);
+ }
+ }
+ }
+ else
+ {
+ files = filesInternal;
+ }
+ doFileCompaction(files, bufSize_);
+ }
+ catch ( Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ finally
+ {
+ isCompacting_.set(false);
+ }
+ return ;
+ }
+
+ /*
+ * Add up all the files sizes this is the worst case file
+ * size for compaction of all the list of files given.
+ */
+ long getExpectedCompactedFileSize(List<String> files)
+ {
+ long expectedFileSize = 0;
+ for(String file : files)
+ {
+ File f = new File(file);
+ long size = f.length();
+ expectedFileSize = expectedFileSize + size;
+ }
+ return expectedFileSize;
+ }
+
+
+ /*
+ * Find the maximum size file in the list .
+ */
+ String getMaxSizeFile( List<String> files )
+ {
+ long maxSize = 0L;
+ String maxFile = null;
+ for ( String file : files )
+ {
+ File f = new File(file);
+ if(f.length() > maxSize )
+ {
+ maxSize = f.length();
+ maxFile = file;
+ }
+ }
+ return maxFile;
+ }
+
+
+ Range getMaxRange( List<Range> ranges )
+ {
+ Range maxRange = new Range( BigInteger.ZERO, BigInteger.ZERO );
+ for( Range range : ranges)
+ {
+ if( range.left().compareTo(maxRange.left()) > 0 )
+ {
+ maxRange = range;
+ }
+ }
+ return maxRange;
+ }
+
+ boolean isLoopAround ( List<Range> ranges )
+ {
+ boolean isLoop = false;
+ for( Range range : ranges)
+ {
+ if( range.left().compareTo(range.right()) > 0 )
+ {
+ isLoop = true;
+ break;
+ }
+ }
+ return isLoop;
+ }
+
+ boolean doAntiCompaction(List<Range> ranges, EndPoint target, List<String> fileList) throws IOException
+ {
+ isCompacting_.set(true);
+ List<String> files = new ArrayList<String>(ssTables_);
+ boolean result = true;
+ try
+ {
+ result = doFileAntiCompaction(files, ranges, target, bufSize_, fileList, null);
+ }
+ catch ( Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ finally
+ {
+ isCompacting_.set(false);
+ }
+ return result;
+
+ }
+
+ /*
+ * Read the next key from the data file , this fn will skip teh block index
+ * and read teh next available key into the filestruct that is passed.
+ * If it cannot read or a end of file is reached it will return null.
+ */
+ FileStruct getNextKey(FileStruct filestruct) throws IOException
+ {
+ filestruct.bufOut_.reset();
+ if (filestruct.reader_.isEOF())
+ {
+ filestruct.reader_.close();
+ return null;
+ }
+
+ long bytesread = filestruct.reader_.next(filestruct.bufOut_);
+ if (bytesread == -1)
+ {
+ filestruct.reader_.close();
+ return null;
+ }
+
+ filestruct.bufIn_.reset(filestruct.bufOut_.getData(), filestruct.bufOut_.getLength());
+ filestruct.key_ = filestruct.bufIn_.readUTF();
+ /* If the key we read is the Block Index Key then we are done reading the keys so exit */
+ if ( filestruct.key_.equals(SSTable.blockIndexKey_) )
+ {
+ filestruct.reader_.close();
+ return null;
+ }
+ return filestruct;
+ }
+
+ void forceCleanup()
+ {
+ MinorCompactionManager.instance().submitCleanup(ColumnFamilyStore.this);
+ }
+
+ /**
+ * This function goes over each file and removes the keys that the node is not responsible for
+ * and only keeps keys that this node is responsible for.
+ * @throws IOException
+ */
+ void doCleanupCompaction() throws IOException
+ {
+ isCompacting_.set(true);
+ List<String> files = new ArrayList<String>(ssTables_);
+ for(String file: files)
+ {
+ try
+ {
+ doCleanup(file);
+ }
+ catch ( Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+ isCompacting_.set(false);
+ }
+ /**
+ * cleans up one particular file by removing keys that this node is not responsible for.
+ * @param file
+ * @throws IOException
+ */
+ /* TODO: Take care of the comments later. */
+ void doCleanup(String file) throws IOException
+ {
+ if(file == null )
+ return;
+ List<Range> myRanges = null;
+ List<String> files = new ArrayList<String>();
+ files.add(file);
+ List<String> newFiles = new ArrayList<String>();
+ Map<EndPoint, List<Range>> endPointtoRangeMap = StorageService.instance().constructEndPointToRangesMap();
+ myRanges = endPointtoRangeMap.get(StorageService.getLocalStorageEndPoint());
+ List<BloomFilter> compactedBloomFilters = new ArrayList<BloomFilter>();
+ doFileAntiCompaction(files, myRanges, null, bufSize_, newFiles, compactedBloomFilters);
+ logger_.debug("Original file : " + file + " of size " + new File(file).length());
+ lock_.writeLock().lock();
+ try
+ {
+ ssTables_.remove(file);
+ SSTable.removeAssociatedBloomFilter(file);
+ for (String newfile : newFiles)
+ {
+ logger_.debug("New file : " + newfile + " of size " + new File(newfile).length());
+ if ( newfile != null )
+ {
+ ssTables_.add(newfile);
+ logger_.debug("Inserting bloom filter for file " + newfile);
+ SSTable.storeBloomFilter(newfile, compactedBloomFilters.get(0));
+ }
+ }
+ SSTable.delete(file);
+ }
+ finally
+ {
+ lock_.writeLock().unlock();
+ }
+ }
+
+ /**
+ * This function is used to do the anti compaction process , it spits out the file which has keys that belong to a given range
+ * If the target is not specified it spits out the file as a compacted file with the unecessary ranges wiped out.
+ * @param files
+ * @param ranges
+ * @param target
+ * @param minBufferSize
+ * @param fileList
+ * @return
+ * @throws IOException
+ */
+ boolean doFileAntiCompaction(List<String> files, List<Range> ranges, EndPoint target, int minBufferSize, List<String> fileList, List<BloomFilter> compactedBloomFilters) throws IOException
+ {
+ boolean result = false;
+ long startTime = System.currentTimeMillis();
+ long totalBytesRead = 0;
+ long totalBytesWritten = 0;
+ long totalkeysRead = 0;
+ long totalkeysWritten = 0;
+ String rangeFileLocation = null;
+ String mergedFileName = null;
+ try
+ {
+ // Calculate the expected compacted filesize
+ long expectedRangeFileSize = getExpectedCompactedFileSize(files);
+ /* in the worst case a node will be giving out alf of its data so we take a chance */
+ expectedRangeFileSize = expectedRangeFileSize / 2;
+ rangeFileLocation = DatabaseDescriptor.getCompactionFileLocation(expectedRangeFileSize);
+// boolean isLoop = isLoopAround( ranges );
+// Range maxRange = getMaxRange( ranges );
+ // If the compaction file path is null that means we have no space left for this compaction.
+ if( rangeFileLocation == null )
+ {
+ logger_.warn("Total bytes to be written for range compaction ..."
+ + expectedRangeFileSize + " is greater than the safe limit of the disk space available.");
+ return result;
+ }
+ PriorityQueue<FileStruct> pq = initializePriorityQueue(files, ranges, minBufferSize);
+ if (pq.size() > 0)
+ {
+ mergedFileName = getTempFileName();
+ SSTable ssTableRange = null ;
+ String lastkey = null;
+ List<FileStruct> lfs = new ArrayList<FileStruct>();
+ DataOutputBuffer bufOut = new DataOutputBuffer();
+ int expectedBloomFilterSize = SSTable.getApproximateKeyCount(files);
+ expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize : SSTable.indexInterval();
+ logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
+ /* Create the bloom filter for the compacted file. */
+ BloomFilter compactedRangeBloomFilter = new BloomFilter(expectedBloomFilterSize, 15);
+ List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
+
+ while (pq.size() > 0 || lfs.size() > 0)
+ {
+ FileStruct fs = null;
+ if (pq.size() > 0)
+ {
+ fs = pq.poll();
+ }
+ if (fs != null
+ && (lastkey == null || lastkey.compareTo(fs.key_) == 0))
+ {
+ // The keys are the same so we need to add this to the
+ // ldfs list
+ lastkey = fs.key_;
+ lfs.add(fs);
+ }
+ else
+ {
+ Collections.sort(lfs, new FileStructComparator());
+ ColumnFamily columnFamily = null;
+ bufOut.reset();
+ if(lfs.size() > 1)
+ {
+ for (FileStruct filestruct : lfs)
+ {
+ try
+ {
+ /* read the length although we don't need it */
+ filestruct.bufIn_.readInt();
+ // Skip the Index
+ IndexHelper.skipBloomFilterAndIndex(filestruct.bufIn_);
+ // We want to add only 2 and resolve them right there in order to save on memory footprint
+ if(columnFamilies.size() > 1)
+ {
+ // Now merge the 2 column families
+ columnFamily = resolve(columnFamilies);
+ columnFamilies.clear();
+ if( columnFamily != null)
+ {
+ // add the merged columnfamily back to the list
+ columnFamilies.add(columnFamily);
+ }
+
+ }
+ // deserialize into column families
+ columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.bufIn_));
+ }
+ catch ( Exception ex)
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ continue;
+ }
+ }
+ // Now after merging all crap append to the sstable
+ columnFamily = resolve(columnFamilies);
+ columnFamilies.clear();
+ if( columnFamily != null )
+ {
+ /* serialize the cf with column indexes */
+ ColumnFamily.serializer2().serialize(columnFamily, bufOut);
+ }
+ }
+ else
+ {
+ FileStruct filestruct = lfs.get(0);
+ try
+ {
+ /* read the length although we don't need it */
+ int size = filestruct.bufIn_.readInt();
+ bufOut.write(filestruct.bufIn_, size);
+ }
+ catch ( Exception ex)
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ filestruct.reader_.close();
+ continue;
+ }
+ }
+ if ( Range.isKeyInRanges(ranges, lastkey) )
+ {
+ if(ssTableRange == null )
+ {
+ if( target != null )
+ rangeFileLocation = rangeFileLocation + System.getProperty("file.separator") + "bootstrap";
+ FileUtils.createDirectory(rangeFileLocation);
+ ssTableRange = new SSTable(rangeFileLocation, mergedFileName);
+ }
+ try
+ {
+ ssTableRange.append(lastkey, bufOut);
+ compactedRangeBloomFilter.fill(lastkey);
+ }
+ catch(Exception ex)
+ {
+ logger_.warn( LogUtil.throwableToString(ex) );
+ }
+ }
+ totalkeysWritten++;
+ for (FileStruct filestruct : lfs)
+ {
+ try
+ {
+ filestruct = getNextKey ( filestruct );
+ if(filestruct == null)
+ {
+ continue;
+ }
+ /* keep on looping until we find a key in the range */
+ while ( !Range.isKeyInRanges(ranges, filestruct.key_ ) )
+ {
+ filestruct = getNextKey ( filestruct );
+ if(filestruct == null)
+ {
+ break;
+ }
+ /* check if we need to continue , if we are done with ranges empty the queue and close all file handles and exit */
+ //if( !isLoop && StorageService.hash(filestruct.key).compareTo(maxRange.right()) > 0 && !filestruct.key.equals(""))
+ //{
+ //filestruct.reader.close();
+ //filestruct = null;
+ //break;
+ //}
+ }
+ if ( filestruct != null)
+ {
+ pq.add(filestruct);
+ }
+ totalkeysRead++;
+ }
+ catch ( Exception ex )
+ {
+ // Ignore the exception as it might be a corrupted file
+ // in any case we have read as far as possible from it
+ // and it will be deleted after compaction.
+ logger_.warn(LogUtil.throwableToString(ex));
+ filestruct.reader_.close();
+ continue;
+ }
+ }
+ lfs.clear();
+ lastkey = null;
+ if (fs != null)
+ {
+ // Add back the fs since we processed the rest of
+ // filestructs
+ pq.add(fs);
+ }
+ }
+ }
+ if( ssTableRange != null )
+ {
+ if ( fileList == null )
+ fileList = new ArrayList<String>();
+ ssTableRange.closeRename(compactedRangeBloomFilter, fileList);
+ if(compactedBloomFilters != null)
+ compactedBloomFilters.add(compactedRangeBloomFilter);
+ }
+ }
+ }
+ catch ( Exception ex)
+ {
+ logger_.warn( LogUtil.throwableToString(ex) );
+ }
+ logger_.debug("Total time taken for range split ..."
+ + (System.currentTimeMillis() - startTime));
+ logger_.debug("Total bytes Read for range split ..." + totalBytesRead);
+ logger_.debug("Total bytes written for range split ..."
+ + totalBytesWritten + " Total keys read ..." + totalkeysRead);
+ return result;
+ }
+
+ private void doWrite(SSTable ssTable, String key, DataOutputBuffer bufOut) throws IOException
+ {
+ PartitionerType pType = StorageService.getPartitionerType();
+ switch ( pType )
+ {
+ case OPHF:
+ ssTable.append(key, bufOut);
+ break;
+
+ default:
+ String[] peices = key.split(":");
+ key = peices[1];
+ BigInteger hash = new BigInteger(peices[0]);
+ ssTable.append(key, hash, bufOut);
+ break;
+ }
+ }
+
+ private void doFill(BloomFilter bf, String key)
+ {
+ PartitionerType pType = StorageService.getPartitionerType();
+ switch ( pType )
+ {
+ case OPHF:
+ bf.fill(key);
+ break;
+
+ default:
+ String[] peices = key.split(":");
+ bf.fill(peices[1]);
+ break;
+ }
+ }
+
+ /*
+ * This function does the actual compaction for files.
+ * It maintains a priority queue of with the first key from each file
+ * and then removes the top of the queue and adds it to the SStable and
+ * repeats this process while reading the next from each file until its
+ * done with all the files . The SStable to which the keys are written
+ * represents the new compacted file. Before writing if there are keys
+ * that occur in multiple files and are the same then a resolution is done
+ * to get the latest data.
+ *
+ */
+ void doFileCompaction(List<String> files, int minBufferSize) throws IOException
+ {
+ String newfile = null;
+ long startTime = System.currentTimeMillis();
+ long totalBytesRead = 0;
+ long totalBytesWritten = 0;
+ long totalkeysRead = 0;
+ long totalkeysWritten = 0;
+ try
+ {
+ // Calculate the expected compacted filesize
+ long expectedCompactedFileSize = getExpectedCompactedFileSize(files);
+ String compactionFileLocation = DatabaseDescriptor.getCompactionFileLocation(expectedCompactedFileSize);
+ // If the compaction file path is null that means we have no space left for this compaction.
+ if( compactionFileLocation == null )
+ {
+ String maxFile = getMaxSizeFile( files );
+ files.remove( maxFile );
+ doFileCompaction(files , minBufferSize);
+ return;
+ }
+ PriorityQueue<FileStruct> pq = initializePriorityQueue(files, null, minBufferSize);
+ if (pq.size() > 0)
+ {
+ String mergedFileName = getTempFileName( files );
+ SSTable ssTable = null;
+ String lastkey = null;
+ List<FileStruct> lfs = new ArrayList<FileStruct>();
+ DataOutputBuffer bufOut = new DataOutputBuffer();
+ int expectedBloomFilterSize = SSTable.getApproximateKeyCount(files);
+ expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize : SSTable.indexInterval();
+ logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
+ /* Create the bloom filter for the compacted file. */
+ BloomFilter compactedBloomFilter = new BloomFilter(expectedBloomFilterSize, 15);
+ List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
+
+ while (pq.size() > 0 || lfs.size() > 0)
+ {
+ FileStruct fs = null;
+ if (pq.size() > 0)
+ {
+ fs = pq.poll();
+ }
+ if (fs != null
+ && (lastkey == null || lastkey.compareTo(fs.key_) == 0))
+ {
+ // The keys are the same so we need to add this to the
+ // ldfs list
+ lastkey = fs.key_;
+ lfs.add(fs);
+ }
+ else
+ {
+ Collections.sort(lfs, new FileStructComparator());
+ ColumnFamily columnFamily = null;
+ bufOut.reset();
+ if(lfs.size() > 1)
+ {
+ for (FileStruct filestruct : lfs)
+ {
+ try
+ {
+ /* read the length although we don't need it */
+ filestruct.bufIn_.readInt();
+ // Skip the Index
+ IndexHelper.skipBloomFilterAndIndex(filestruct.bufIn_);
+ // We want to add only 2 and resolve them right there in order to save on memory footprint
+ if(columnFamilies.size() > 1)
+ {
+ // Now merge the 2 column families
+ columnFamily = resolve(columnFamilies);
+ columnFamilies.clear();
+ if( columnFamily != null)
+ {
+ // add the merged columnfamily back to the list
+ columnFamilies.add(columnFamily);
+ }
+
+ }
+ // deserialize into column families
+ columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.bufIn_));
+ }
+ catch ( Exception ex)
+ {
+ continue;
+ }
+ }
+ // Now after merging all crap append to the sstable
+ columnFamily = resolve(columnFamilies);
+ columnFamilies.clear();
+ if( columnFamily != null )
+ {
+ /* serialize the cf with column indexes */
+ ColumnFamily.serializer2().serialize(columnFamily, bufOut);
+ }
+ }
+ else
+ {
+ FileStruct filestruct = lfs.get(0);
+ try
+ {
+ /* read the length although we don't need it */
+ int size = filestruct.bufIn_.readInt();
+ bufOut.write(filestruct.bufIn_, size);
+ }
+ catch ( Exception ex)
+ {
+ ex.printStackTrace();
+ filestruct.reader_.close();
+ continue;
+ }
+ }
+
+ if ( ssTable == null )
+ {
+ PartitionerType pType = StorageService.getPartitionerType();
+ ssTable = new SSTable(compactionFileLocation, mergedFileName, pType);
+ }
+ doWrite(ssTable, lastkey, bufOut);
+
+ /* Fill the bloom filter with the key */
+ doFill(compactedBloomFilter, lastkey);
+ totalkeysWritten++;
+ for (FileStruct filestruct : lfs)
+ {
+ try
+ {
+ filestruct = getNextKey(filestruct);
+ if(filestruct == null)
+ {
+ continue;
+ }
+ pq.add(filestruct);
+ totalkeysRead++;
+ }
+ catch ( Throwable ex )
+ {
+ // Ignore the exception as it might be a corrupted file
+ // in any case we have read as far as possible from it
+ // and it will be deleted after compaction.
+ filestruct.reader_.close();
+ continue;
+ }
+ }
+ lfs.clear();
+ lastkey = null;
+ if (fs != null)
+ {
+ /* Add back the fs since we processed the rest of filestructs */
+ pq.add(fs);
+ }
+ }
+ }
+ if ( ssTable != null )
+ {
+ ssTable.closeRename(compactedBloomFilter);
+ newfile = ssTable.getDataFileLocation();
+ }
+ lock_.writeLock().lock();
+ try
+ {
+ for (String file : files)
+ {
+ ssTables_.remove(file);
+ SSTable.removeAssociatedBloomFilter(file);
+ }
+ if ( newfile != null )
+ {
+ ssTables_.add(newfile);
+ logger_.debug("Inserting bloom filter for file " + newfile);
+ SSTable.storeBloomFilter(newfile, compactedBloomFilter);
+ totalBytesWritten = (new File(newfile)).length();
+ }
+ }
+ finally
+ {
+ lock_.writeLock().unlock();
+ }
+ for (String file : files)
+ {
+ SSTable.delete(file);
+ }
+ }
+ }
+ catch ( Exception ex)
+ {
+ logger_.warn( LogUtil.throwableToString(ex) );
+ }
+ logger_.debug("Total time taken for compaction ..."
+ + (System.currentTimeMillis() - startTime));
+ logger_.debug("Total bytes Read for compaction ..." + totalBytesRead);
+ logger_.debug("Total bytes written for compaction ..."
+ + totalBytesWritten + " Total keys read ..." + totalkeysRead);
+ return;
+ }
+
+ /*
+ * Take a snap shot of this columnfamily store.
+ */
+ public void snapshot( String snapshotDirectory ) throws IOException
+ {
+ File snapshotDir = new File(snapshotDirectory);
+ if( !snapshotDir.exists() )
+ snapshotDir.mkdir();
+ lock_.writeLock().lock();
+ List<String> files = new ArrayList<String>(ssTables_);
+ try
+ {
+ for (String file : files)
+ {
+ File f = new File(file);
+ Path existingLink = f.toPath();
+ File hardLinkFile = new File(snapshotDirectory + System.getProperty("file.separator") + f.getName());
+ Path hardLink = hardLinkFile.toPath();
+ hardLink.createLink(existingLink);
+ }
+ }
+ finally
+ {
+ lock_.writeLock().unlock();
+ }
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/db/ColumnIndexer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/db/ColumnIndexer.java?rev=749202&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/db/ColumnIndexer.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/db/ColumnIndexer.java Mon Mar 2 06:04:20 2009
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.IndexHelper;
+import org.apache.cassandra.io.SSTable.KeyPositionInfo;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.FBUtilities;
+
+
+/**
+ * Help to create an index for a column family based on size of columns
+ * Author : Karthik Ranganathan ( kranganathan@facebook.com )
+ */
+
+public class ColumnIndexer
+{
+ /**
+ * Given a column family this, function creates an in-memory structure that represents the
+ * column index for the column family, and subsequently writes it to disk.
+ * @param columnFamily Column family to create index for
+ * @param dos data output stream
+ * @throws IOException
+ */
+ public static void serialize(ColumnFamily columnFamily, DataOutputStream dos) throws IOException
+ {
+ Collection<IColumn> columns = columnFamily.getAllColumns();
+ BloomFilter bf = createColumnBloomFilter(columns);
+ /* Write out the bloom filter. */
+ DataOutputBuffer bufOut = new DataOutputBuffer();
+ BloomFilter.serializer().serialize(bf, bufOut);
+ /* write the length of the serialized bloom filter. */
+ dos.writeInt(bufOut.getLength());
+ /* write out the serialized bytes. */
+ dos.write(bufOut.getData(), 0, bufOut.getLength());
+
+ TypeInfo typeInfo = DatabaseDescriptor.getTypeInfo(columnFamily.name());
+ doIndexing(typeInfo, columns, dos);
+ }
+
+ /**
+ * Create a bloom filter that contains the subcolumns and the columns that
+ * make up this Column Family.
+ * @param columns columns of the ColumnFamily
+ * @return BloomFilter with the summarized information.
+ */
+ private static BloomFilter createColumnBloomFilter(Collection<IColumn> columns)
+ {
+ int columnCount = 0;
+ for ( IColumn column : columns )
+ {
+ columnCount += column.getObjectCount();
+ }
+
+ BloomFilter bf = new BloomFilter(columnCount, 4);
+ for ( IColumn column : columns )
+ {
+ bf.fill(column.name());
+ /* If this is SuperColumn type Column Family we need to get the subColumns too. */
+ if ( column instanceof SuperColumn )
+ {
+ Collection<IColumn> subColumns = column.getSubColumns();
+ for ( IColumn subColumn : subColumns )
+ {
+ bf.fill(subColumn.name());
+ }
+ }
+ }
+ return bf;
+ }
+
+ private static IndexHelper.ColumnIndexInfo getColumnIndexInfo(TypeInfo typeInfo, IColumn column)
+ {
+ IndexHelper.ColumnIndexInfo cIndexInfo = null;
+
+ if ( column instanceof SuperColumn )
+ {
+ cIndexInfo = IndexHelper.ColumnIndexFactory.instance(TypeInfo.STRING);
+ cIndexInfo.set(column.name());
+ }
+ else
+ {
+ cIndexInfo = IndexHelper.ColumnIndexFactory.instance(typeInfo);
+ switch(typeInfo)
+ {
+ case STRING:
+ cIndexInfo.set(column.name());
+ break;
+
+ case LONG:
+ cIndexInfo.set(column.timestamp());
+ break;
+ }
+ }
+
+ return cIndexInfo;
+ }
+
+ /**
+ * Given the collection of columns in the Column Family,
+ * the name index is generated and written into the provided
+ * stream
+ * @param columns for whom the name index needs to be generated
+ * @param bf bloom filter that summarizes the columns that make
+ * up the column family.
+ * @param dos stream into which the serialized name index needs
+ * to be written.
+ * @throws IOException
+ */
+ private static void doIndexing(TypeInfo typeInfo, Collection<IColumn> columns, DataOutputStream dos) throws IOException
+ {
+ /* we are going to write column indexes */
+ int numColumns = 0;
+ int position = 0;
+ int indexSizeInBytes = 0;
+ int sizeSummarized = 0;
+
+ /*
+ * Maintains a list of KeyPositionInfo objects for the columns in this
+ * column family. The key is the column name and the position is the
+ * relative offset of that column name from the start of the list.
+ * We do this so that we don't read all the columns into memory.
+ */
+
+ List<IndexHelper.ColumnIndexInfo> columnIndexList = new ArrayList<IndexHelper.ColumnIndexInfo>();
+
+ /* column offsets at the right thresholds into the index map. */
+ for ( IColumn column : columns )
+ {
+ /* if we hit the column index size that we have to index after, go ahead and index it */
+ if(position - sizeSummarized >= DatabaseDescriptor.getColumnIndexSize())
+ {
+ /*
+ * ColumnSort applies only to columns. So in case of
+ * SuperColumn always use the name indexing scheme for
+ * the SuperColumns. We will fix this later.
+ */
+ IndexHelper.ColumnIndexInfo cIndexInfo = getColumnIndexInfo(typeInfo, column);
+ cIndexInfo.position(position);
+ cIndexInfo.count(numColumns);
+ columnIndexList.add(cIndexInfo);
+ /*
+ * we will be writing this object as a UTF8 string and two ints,
+ * so calculate the size accordingly. Note that we store the string
+ * as UTF-8 encoded, so when we calculate the length, it should be
+ * converted to UTF-8.
+ */
+ indexSizeInBytes += cIndexInfo.size();
+ sizeSummarized = position;
+ numColumns = 0;
+ }
+ position += column.serializedSize();
+ ++numColumns;
+ }
+ /* write the column index list */
+ IndexHelper.serialize(indexSizeInBytes, columnIndexList, dos);
+ }
+}