You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 08:57:31 UTC
svn commit: r749218 [12/34] - in /incubator/cassandra: branches/ dist/
nightly/ site/ tags/ trunk/ trunk/lib/ trunk/src/ trunk/src/org/
trunk/src/org/apache/ trunk/src/org/apache/cassandra/
trunk/src/org/apache/cassandra/analytics/ trunk/src/org/apache...
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,614 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Proxy;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.HashingSchemes;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.io.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public final class ColumnFamily implements Serializable
+{
+ private static ICompactSerializer2<ColumnFamily> serializer_;
+ public static final short utfPrefix_ = 2;
+ /* The column serializer for this Column Family. Create based on config. */
+
+ private static Logger logger_ = Logger.getLogger( ColumnFamily.class );
+ private static Map<String, String> columnTypes_ = new HashMap<String, String>();
+ private static Map<String, String> indexTypes_ = new HashMap<String, String>();
+
+ static
+ {
+ serializer_ = new ColumnFamilySerializer();
+ /* TODO: These are the various column types. Hard coded for now. */
+ columnTypes_.put("Standard", "Standard");
+ columnTypes_.put("Super", "Super");
+
+ indexTypes_.put("Name", "Name");
+ indexTypes_.put("Time", "Time");
+ }
+
+ public static ICompactSerializer2<ColumnFamily> serializer()
+ {
+ return serializer_;
+ }
+
+ /*
+ * This method returns the serializer whose methods are
+ * preprocessed by a dynamic proxy.
+ */
+ public static ICompactSerializer2<ColumnFamily> serializer2()
+ {
+ return (ICompactSerializer2<ColumnFamily>)Proxy.newProxyInstance( ColumnFamily.class.getClassLoader(), new Class[]{ICompactSerializer2.class}, new CompactSerializerInvocationHandler<ColumnFamily>(serializer_) );
+ }
+
+ public static String getColumnType(String key)
+ {
+ if ( key == null )
+ return columnTypes_.get("Standard");
+ return columnTypes_.get(key);
+ }
+
+ public static String getColumnSortProperty(String columnIndexProperty)
+ {
+ if ( columnIndexProperty == null )
+ return indexTypes_.get("Time");
+ return indexTypes_.get(columnIndexProperty);
+ }
+
+ private transient AbstractColumnFactory columnFactory_;
+
+ private String name_;
+
+ private transient ICompactSerializer2<IColumn> columnSerializer_;
+ private transient AtomicBoolean isMarkedForDelete_;
+ private AtomicInteger size_ = new AtomicInteger(0);
+ private EfficientBidiMap columns_;
+
+ private Comparator<IColumn> columnComparator_;
+
+ private Comparator<IColumn> getColumnComparator(String cfName, String columnType)
+ {
+ if(columnComparator_ == null)
+ {
+ /*
+ * if this columnfamily has supercolumns or there is an index on the column name,
+ * then sort by name
+ */
+ if("Super".equals(columnType) || DatabaseDescriptor.isNameSortingEnabled(cfName))
+ {
+ columnComparator_ = ColumnComparatorFactory.getComparator(ColumnComparatorFactory.ComparatorType.NAME);
+ }
+ /* if this columnfamily has simple columns, and no index on name sort by timestamp */
+ else
+ {
+ columnComparator_ = ColumnComparatorFactory.getComparator(ColumnComparatorFactory.ComparatorType.TIMESTAMP);
+ }
+ }
+
+ return columnComparator_;
+ }
+
+ ColumnFamily()
+ {
+ }
+
+ public ColumnFamily(String cf)
+ {
+ name_ = cf;
+ createColumnFactoryAndColumnSerializer();
+ }
+
+ public ColumnFamily(String cf, String columnType)
+ {
+ name_ = cf;
+ createColumnFactoryAndColumnSerializer(columnType);
+ }
+
+ void createColumnFactoryAndColumnSerializer(String columnType)
+ {
+ if ( columnFactory_ == null )
+ {
+ columnFactory_ = AbstractColumnFactory.getColumnFactory(columnType);
+ columnSerializer_ = columnFactory_.createColumnSerializer();
+ if(columns_ == null)
+ columns_ = new EfficientBidiMap(getColumnComparator(name_, columnType));
+ }
+ }
+
+ void createColumnFactoryAndColumnSerializer()
+ {
+ String columnType = DatabaseDescriptor.getColumnFamilyType(name_);
+ if ( columnType == null )
+ {
+ List<String> tables = DatabaseDescriptor.getTables();
+ if ( tables.size() > 0 )
+ {
+ String table = tables.get(0);
+ columnType = Table.open(table).getColumnFamilyType(name_);
+ }
+ }
+ createColumnFactoryAndColumnSerializer(columnType);
+ }
+
+ ColumnFamily cloneMe()
+ {
+ ColumnFamily cf = new ColumnFamily(name_);
+ cf.isMarkedForDelete_ = isMarkedForDelete_;
+ cf.columns_ = columns_.cloneMe();
+ return cf;
+ }
+
+ public String name()
+ {
+ return name_;
+ }
+
+ /**
+ * We need to go through each column
+ * in the column family and resolve it before adding
+ */
+ void addColumns(ColumnFamily cf)
+ {
+ Map<String, IColumn> columns = cf.getColumns();
+ Set<String> cNames = columns.keySet();
+
+ for ( String cName : cNames )
+ {
+ addColumn(cName, columns.get(cName));
+ }
+ }
+
+ public ICompactSerializer2<IColumn> getColumnSerializer()
+ {
+ createColumnFactoryAndColumnSerializer();
+ return columnSerializer_;
+ }
+
+ public void createColumn(String name)
+ {
+ IColumn column = columnFactory_.createColumn(name);
+ addColumn(column.name(), column);
+ }
+
+ int getColumnCount()
+ {
+ int count = 0;
+ Map<String, IColumn> columns = columns_.getColumns();
+ if( columns != null )
+ {
+ if(!DatabaseDescriptor.getColumnType(name_).equals("Super"))
+ {
+ count = columns.size();
+ }
+ else
+ {
+ Collection<IColumn> values = columns.values();
+ for(IColumn column: values)
+ {
+ count += column.getObjectCount();
+ }
+ }
+ }
+ return count;
+ }
+
+ public void createColumn(String name, byte[] value)
+ {
+ IColumn column = columnFactory_.createColumn(name, value);
+ addColumn(column.name(), column);
+ }
+
+ public void createColumn(String name, byte[] value, long timestamp)
+ {
+ IColumn column = columnFactory_.createColumn(name, value, timestamp);
+ addColumn(column.name(), column);
+ }
+
+ void clear()
+ {
+ columns_.clear();
+ }
+
+ /*
+ * If we find an old column that has the same name
+ * the ask it to resolve itself else add the new column .
+ */
+ void addColumn(String name, IColumn column)
+ {
+ int newSize = 0;
+ IColumn oldColumn = columns_.get(name);
+ if ( oldColumn != null )
+ {
+ int oldSize = oldColumn.size();
+ if( oldColumn.putColumn(column))
+ {
+ // This will never be called for super column as put column always returns false.
+ columns_.put(name, column);
+ newSize = column.size();
+ }
+ else
+ {
+ newSize = oldColumn.size();
+ }
+ size_.addAndGet(newSize - oldSize);
+ }
+ else
+ {
+ newSize = column.size();
+ size_.addAndGet(newSize);
+ columns_.put(name, column);
+ }
+ }
+
+ public IColumn getColumn(String name)
+ {
+ return columns_.get( name );
+ }
+
+ public Collection<IColumn> getAllColumns()
+ {
+ return columns_.getSortedColumns();
+ }
+
+ Map<String, IColumn> getColumns()
+ {
+ return columns_.getColumns();
+ }
+
+ public void remove(String columnName)
+ {
+ columns_.remove(columnName);
+ }
+
+ void delete()
+ {
+ if ( isMarkedForDelete_ == null )
+ isMarkedForDelete_ = new AtomicBoolean(true);
+ else
+ isMarkedForDelete_.set(true);
+ }
+
+ boolean isMarkedForDelete()
+ {
+ return ( ( isMarkedForDelete_ == null ) ? false : isMarkedForDelete_.get() );
+ }
+
+ /*
+ * This is used as oldCf.merge(newCf). Basically we take the newCf
+ * and merge it into the oldCf.
+ */
+ void merge(ColumnFamily columnFamily)
+ {
+ Map<String, IColumn> columns = columnFamily.getColumns();
+ Set<String> cNames = columns.keySet();
+
+ for ( String cName : cNames )
+ {
+ columns_.put(cName, columns.get(cName));
+ }
+ }
+
+ /*
+ * This function will repair a list of columns
+ * If there are any columns in the external list which are not present
+ * internally then they are added ( this might have to change depending on
+ * how we implement delete repairs).
+ * Also if there are any columns in teh internal and not in the external
+ * they are kept intact.
+ * Else the one with the greatest timestamp is considered latest.
+ */
+ void repair(ColumnFamily columnFamily)
+ {
+ Map<String, IColumn> columns = columnFamily.getColumns();
+ Set<String> cNames = columns.keySet();
+
+ for ( String cName : cNames )
+ {
+ IColumn columnInternal = columns_.get(cName);
+ IColumn columnExternal = columns.get(cName);
+
+ if( columnInternal == null )
+ {
+ if(DatabaseDescriptor.getColumnFamilyType(name_).equals(ColumnFamily.getColumnType("Super")))
+ {
+ columnInternal = new SuperColumn(columnExternal.name());
+ columns_.put(cName, columnInternal);
+ }
+ if(DatabaseDescriptor.getColumnFamilyType(name_).equals(ColumnFamily.getColumnType("Standard")))
+ {
+ columnInternal = columnExternal;
+ columns_.put(cName, columnInternal);
+ }
+ }
+ columnInternal.repair(columnExternal);
+ }
+ }
+
+
+ /*
+ * This function will calculate the differnce between 2 column families
+ * the external input is considered the superset of internal
+ * so there are no deletes in the diff.
+ */
+ ColumnFamily diff(ColumnFamily columnFamily)
+ {
+ ColumnFamily cfDiff = new ColumnFamily(columnFamily.name());
+ Map<String, IColumn> columns = columnFamily.getColumns();
+ Set<String> cNames = columns.keySet();
+
+ for ( String cName : cNames )
+ {
+ IColumn columnInternal = columns_.get(cName);
+ IColumn columnExternal = columns.get(cName);
+ if( columnInternal == null )
+ {
+ cfDiff.addColumn(cName, columnExternal);
+ }
+ else
+ {
+ IColumn columnDiff = columnInternal.diff(columnExternal);
+ if(columnDiff != null)
+ {
+ cfDiff.addColumn(cName, columnDiff);
+ }
+ }
+ }
+ if(cfDiff.getColumns().size() != 0)
+ return cfDiff;
+ else
+ return null;
+ }
+
+ int size()
+ {
+ if ( size_.get() == 0 )
+ {
+ Set<String> cNames = columns_.getColumns().keySet();
+ for ( String cName : cNames )
+ {
+ size_.addAndGet(columns_.get(cName).size());
+ }
+ }
+ return size_.get();
+ }
+
+ public int hashCode()
+ {
+ return name().hashCode();
+ }
+
+ public boolean equals(Object o)
+ {
+ if ( !(o instanceof ColumnFamily) )
+ return false;
+ ColumnFamily cf = (ColumnFamily)o;
+ return name().equals(cf.name());
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(name_);
+ sb.append(":");
+ sb.append(isMarkedForDelete());
+ sb.append(":");
+ Collection<IColumn> columns = getAllColumns();
+ sb.append(columns.size());
+ sb.append(":");
+
+ for ( IColumn column : columns )
+ {
+ sb.append(column.toString());
+ }
+ sb.append(":");
+ return sb.toString();
+ }
+
+ public byte[] digest()
+ {
+ Set<IColumn> columns = columns_.getSortedColumns();
+ byte[] xorHash = new byte[0];
+ byte[] tmpHash = new byte[0];
+ for(IColumn column : columns)
+ {
+ if(xorHash.length == 0)
+ {
+ xorHash = column.digest();
+ }
+ else
+ {
+ tmpHash = column.digest();
+ xorHash = FBUtilities.xor(xorHash, tmpHash);
+ }
+ }
+ return xorHash;
+ }
+}
+
+class ColumnFamilySerializer implements ICompactSerializer2<ColumnFamily>
+{
+ /*
+ * We are going to create indexes, and write out that information as well. The format
+ * of the data serialized is as follows.
+ *
+ * 1) Without indexes:
+ * // written by the data
+ * <boolean false (index is not present)>
+ * <column family id>
+ * <is marked for delete>
+ * <total number of columns>
+ * <columns data>
+
+ * <boolean true (index is present)>
+ *
+ * This part is written by the column indexer
+ * <size of index in bytes>
+ * <list of column names and their offsets relative to the first column>
+ *
+ * <size of the cf in bytes>
+ * <column family id>
+ * <is marked for delete>
+ * <total number of columns>
+ * <columns data>
+ */
+ public void serialize(ColumnFamily columnFamily, DataOutputStream dos) throws IOException
+ {
+ Collection<IColumn> columns = columnFamily.getAllColumns();
+
+ /* write the column family id */
+ dos.writeUTF(columnFamily.name());
+ /* write if this cf is marked for delete */
+ dos.writeBoolean(columnFamily.isMarkedForDelete());
+ /* write the size is the number of columns */
+ dos.writeInt(columns.size());
+
+ /* write the column data */
+ for ( IColumn column : columns )
+ {
+ columnFamily.getColumnSerializer().serialize(column, dos);
+ }
+ }
+
+ /*
+ * Use this method to create a bare bones Column Family. This column family
+ * does not have any of the Column information.
+ */
+ private ColumnFamily defreezeColumnFamily(DataInputStream dis) throws IOException
+ {
+ String name = dis.readUTF();
+ boolean delete = dis.readBoolean();
+ ColumnFamily cf = new ColumnFamily(name);
+ if ( delete )
+ cf.delete();
+ return cf;
+ }
+
+ /*
+ * This method fills the Column Family object with the column information
+ * from the DataInputStream. The "items" parameter tells us whether we need
+ * all the columns or just a subset of all the Columns that make up the
+ * Column Family. If "items" is -1 then we need all the columns if not we
+ * deserialize only as many columns as indicated by the "items" parameter.
+ */
+ private void fillColumnFamily(ColumnFamily cf, DataInputStream dis) throws IOException
+ {
+ int size = dis.readInt();
+ IColumn column = null;
+ for ( int i = 0; i < size; ++i )
+ {
+ column = cf.getColumnSerializer().deserialize(dis);
+ if(column != null)
+ {
+ cf.addColumn(column.name(), column);
+ }
+ }
+ }
+
+ public ColumnFamily deserialize(DataInputStream dis) throws IOException
+ {
+ ColumnFamily cf = defreezeColumnFamily(dis);
+ if ( !cf.isMarkedForDelete() )
+ fillColumnFamily(cf,dis);
+ return cf;
+ }
+
+ /*
+ * This version of deserialize is used when we need a specific set if columns for
+ * a column family specified in the name cfName parameter.
+ */
+ public ColumnFamily deserialize(DataInputStream dis, IFilter filter) throws IOException
+ {
+ ColumnFamily cf = defreezeColumnFamily(dis);
+ if ( !cf.isMarkedForDelete() )
+ {
+ int size = dis.readInt();
+ IColumn column = null;
+ for ( int i = 0; i < size; ++i )
+ {
+ column = cf.getColumnSerializer().deserialize(dis, filter);
+ if(column != null)
+ {
+ cf.addColumn(column.name(), column);
+ column = null;
+ if(filter.isDone())
+ {
+ break;
+ }
+ }
+ }
+ }
+ return cf;
+ }
+
+ /*
+ * Deserialize a particular column or super column or the entire columnfamily given a : seprated name
+ * name could be of the form cf:superColumn:column or cf:column or cf
+ */
+ public ColumnFamily deserialize(DataInputStream dis, String name, IFilter filter) throws IOException
+ {
+ String[] names = RowMutation.getColumnAndColumnFamily(name);
+ String columnName = "";
+ if ( names.length == 1 )
+ return deserialize(dis, filter);
+ if( names.length == 2 )
+ columnName = names[1];
+ if( names.length == 3 )
+ columnName = names[1]+ ":" + names[2];
+
+ ColumnFamily cf = defreezeColumnFamily(dis);
+ if ( !cf.isMarkedForDelete() )
+ {
+ /* read the number of columns */
+ int size = dis.readInt();
+ for ( int i = 0; i < size; ++i )
+ {
+ IColumn column = cf.getColumnSerializer().deserialize(dis, columnName, filter);
+ if ( column != null )
+ {
+ cf.addColumn(column.name(), column);
+ break;
+ }
+ }
+ }
+ return cf;
+ }
+
+ public void skip(DataInputStream dis) throws IOException
+ {
+ throw new UnsupportedOperationException("This operation is not yet supported.");
+ }
+
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyNotDefinedException.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyNotDefinedException.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyNotDefinedException.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyNotDefinedException.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ColumnFamilyNotDefinedException extends Exception
+{
+ public ColumnFamilyNotDefinedException(String message)
+ {
+ super(message);
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java?rev=749218&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java Mon Mar 2 07:57:22 2009
@@ -0,0 +1,1556 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.*;
+import java.math.BigInteger;
+import java.nio.file.Path;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.DataOutputBuffer;
+import org.apache.cassandra.io.IndexHelper;
+import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.PartitionerType;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.FileUtils;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.utils.*;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class ColumnFamilyStore
+{
+ private static int threshHold_ = 4;
+ private static final int bufSize_ = 128*1024*1024;
+ private static int compactionMemoryThreshold_ = 1 << 30;
+ private static Logger logger_ = Logger.getLogger(ColumnFamilyStore.class);
+
+ private String table_;
+ public String columnFamily_;
+
+ /* This is used to generate the next index for a SSTable */
+ private AtomicInteger fileIndexGenerator_ = new AtomicInteger(0);
+
+ /* memtable associated with this ColumnFamilyStore. */
+ private AtomicReference<Memtable> memtable_;
+ private AtomicReference<BinaryMemtable> binaryMemtable_;
+
+ /* SSTables on disk for this column family */
+ private Set<String> ssTables_ = new HashSet<String>();
+
+ /* Modification lock used for protecting reads from compactions. */
+ private ReentrantReadWriteLock lock_ = new ReentrantReadWriteLock(true);
+
+ /* Flag indicates if a compaction is in process */
+ public AtomicBoolean isCompacting_ = new AtomicBoolean(false);
+
+ ColumnFamilyStore(String table, String columnFamily) throws IOException
+ {
+ table_ = table;
+ columnFamily_ = columnFamily;
+ /*
+ * Get all data files associated with old Memtables for this table.
+ * These files are named as follows <Table>-1.db, ..., <Table>-n.db. Get
+ * the max which in this case is n and increment it to use it for next
+ * index.
+ */
+ List<Integer> indices = new ArrayList<Integer>();
+ String[] dataFileDirectories = DatabaseDescriptor.getAllDataFileLocations();
+ for ( String directory : dataFileDirectories )
+ {
+ File fileDir = new File(directory);
+ File[] files = fileDir.listFiles();
+ for (File file : files)
+ {
+ String filename = file.getName();
+ String[] tblCfName = getTableAndColumnFamilyName(filename);
+ if (tblCfName[0].equals(table_)
+ && tblCfName[1].equals(columnFamily))
+ {
+ int index = getIndexFromFileName(filename);
+ indices.add(index);
+ }
+ }
+ }
+ Collections.sort(indices);
+ int value = (indices.size() > 0) ? (indices.get(indices.size() - 1)) : 0;
+ fileIndexGenerator_.set(value);
+ memtable_ = new AtomicReference<Memtable>( new Memtable(table_, columnFamily_) );
+ binaryMemtable_ = new AtomicReference<BinaryMemtable>( new BinaryMemtable(table_, columnFamily_) );
+ }
+
+ void onStart() throws IOException
+ {
+ /* Do major compaction */
+ List<File> ssTables = new ArrayList<File>();
+ String[] dataFileDirectories = DatabaseDescriptor.getAllDataFileLocations();
+ for ( String directory : dataFileDirectories )
+ {
+ File fileDir = new File(directory);
+ File[] files = fileDir.listFiles();
+ for (File file : files)
+ {
+ String filename = file.getName();
+ if(((file.length() == 0) || (filename.indexOf("-" + SSTable.temporaryFile_) != -1) ) && (filename.indexOf(columnFamily_) != -1))
+ {
+ file.delete();
+ continue;
+ }
+ String[] tblCfName = getTableAndColumnFamilyName(filename);
+ if (tblCfName[0].equals(table_)
+ && tblCfName[1].equals(columnFamily_)
+ && filename.indexOf("-Data.db") != -1)
+ {
+ ssTables.add(file.getAbsoluteFile());
+ }
+ }
+ }
+ Collections.sort(ssTables, new FileUtils.FileComparator());
+ List<String> filenames = new ArrayList<String>();
+ for (File ssTable : ssTables)
+ {
+ filenames.add(ssTable.getAbsolutePath());
+ }
+
+ /* There are no files to compact just add to the list of SSTables */
+ ssTables_.addAll(filenames);
+ /* Load the index files and the Bloom Filters associated with them. */
+ SSTable.onStart(filenames);
+ logger_.debug("Submitting a major compaction task ...");
+ MinorCompactionManager.instance().submit(ColumnFamilyStore.this);
+ if(columnFamily_.equals(Table.hints_))
+ {
+ HintedHandOffManager.instance().submit(this);
+ }
+ MinorCompactionManager.instance().submitPeriodicCompaction(this);
+ }
+
+ List<String> getAllSSTablesOnDisk()
+ {
+ return new ArrayList<String>(ssTables_);
+ }
+
+ /*
+ * This method is called to obtain statistics about
+ * the Column Family represented by this Column Family
+ * Store. It will report the total number of files on
+ * disk and the total space oocupied by the data files
+ * associated with this Column Family.
+ */
+ public String cfStats(String newLineSeparator, java.text.DecimalFormat df)
+ {
+ StringBuilder sb = new StringBuilder();
+ /*
+ * We want to do this so that if there are
+ * no files on disk we do not want to display
+ * something ugly on the admin page.
+ */
+ if ( ssTables_.size() == 0 )
+ {
+ return sb.toString();
+ }
+ sb.append(columnFamily_ + " statistics :");
+ sb.append(newLineSeparator);
+ sb.append("Number of files on disk : " + ssTables_.size());
+ sb.append(newLineSeparator);
+ double totalSpace = 0d;
+ for ( String file : ssTables_ )
+ {
+ File f = new File(file);
+ totalSpace += f.length();
+ }
+ String diskSpace = FileUtils.stringifyFileSize(totalSpace);
+ sb.append("Total disk space : " + diskSpace);
+ sb.append(newLineSeparator);
+ sb.append("--------------------------------------");
+ sb.append(newLineSeparator);
+ return sb.toString();
+ }
+
+ /*
+ * This is called after bootstrap to add the files
+ * to the list of files maintained.
+ */
+ void addToList(String file)
+ {
+ lock_.writeLock().lock();
+ try
+ {
+ ssTables_.add(file);
+ }
+ finally
+ {
+ lock_.writeLock().unlock();
+ }
+ }
+
+ void touch(String key, boolean fData) throws IOException
+ {
+ /* Scan the SSTables on disk first */
+ lock_.readLock().lock();
+ try
+ {
+ List<String> files = new ArrayList<String>(ssTables_);
+ for (String file : files)
+ {
+ /*
+ * Get the BloomFilter associated with this file. Check if the key
+ * is present in the BloomFilter. If not continue to the next file.
+ */
+ boolean bVal = SSTable.isKeyInFile(key, file);
+ if ( !bVal )
+ continue;
+ SSTable ssTable = new SSTable(file);
+ ssTable.touch(key, fData);
+ }
+ }
+ finally
+ {
+ lock_.readLock().unlock();
+ }
+ }
+
+ /*
+ * This method forces a compaction of the SSTables on disk. We wait
+ * for the process to complete by waiting on a future pointer.
+ */
+ boolean forceCompaction(List<Range> ranges, EndPoint target, long skip, List<String> fileList)
+ {
+ Future<Boolean> futurePtr = null;
+ if( ranges != null)
+ futurePtr = MinorCompactionManager.instance().submit(ColumnFamilyStore.this, ranges, target, fileList);
+ else
+ MinorCompactionManager.instance().submitMajor(ColumnFamilyStore.this, ranges, skip);
+
+ boolean result = true;
+ try
+ {
+ /* Waiting for the compaction to complete. */
+ if(futurePtr != null)
+ result = futurePtr.get();
+ logger_.debug("Done forcing compaction ...");
+ }
+ catch (ExecutionException ex)
+ {
+ logger_.debug(LogUtil.throwableToString(ex));
+ }
+ catch ( InterruptedException ex2 )
+ {
+ logger_.debug(LogUtil.throwableToString(ex2));
+ }
+ return result;
+ }
+
+ String getColumnFamilyName()
+ {
+ return columnFamily_;
+ }
+
+ private String[] getTableAndColumnFamilyName(String filename)
+ {
+ StringTokenizer st = new StringTokenizer(filename, "-");
+ String[] values = new String[2];
+ int i = 0;
+ while (st.hasMoreElements())
+ {
+ if (i == 0)
+ values[i] = (String) st.nextElement();
+ else if (i == 1)
+ {
+ values[i] = (String) st.nextElement();
+ break;
+ }
+ ++i;
+ }
+ return values;
+ }
+
+ protected static int getIndexFromFileName(String filename)
+ {
+ /*
+ * File name is of the form <table>-<column family>-<index>-Data.db.
+ * This tokenizer will strip the .db portion.
+ */
+ StringTokenizer st = new StringTokenizer(filename, "-");
+ /*
+ * Now I want to get the index portion of the filename. We accumulate
+ * the indices and then sort them to get the max index.
+ */
+ int count = st.countTokens();
+ int i = 0;
+ String index = null;
+ while (st.hasMoreElements())
+ {
+ index = (String) st.nextElement();
+ if (i == (count - 2))
+ break;
+ ++i;
+ }
+ return Integer.parseInt(index);
+ }
+
+ String getNextFileName()
+ {
+ // Psuedo increment so that we do not generate consecutive numbers
+ fileIndexGenerator_.incrementAndGet();
+ String name = table_ + "-" + columnFamily_ + "-" + fileIndexGenerator_.incrementAndGet();
+ return name;
+ }
+
+ /*
+ * Return a temporary file name.
+ */
+ String getTempFileName()
+ {
+ // Psuedo increment so that we do not generate consecutive numbers
+ fileIndexGenerator_.incrementAndGet();
+ String name = table_ + "-" + columnFamily_ + "-" + SSTable.temporaryFile_ + "-" + fileIndexGenerator_.incrementAndGet() ;
+ return name;
+ }
+
+ /*
+ * Return a temporary file name. Based on the list of files input
+ * This fn sorts the list and generates a number between he 2 lowest filenames
+ * ensuring uniqueness.
+ * Since we do not generate consecutive numbers hence the lowest file number
+ * can just be incremented to generate the next file.
+ */
+ String getTempFileName( List<String> files)
+ {
+ int lowestIndex = 0 ;
+ int index = 0;
+ Collections.sort(files, new FileNameComparator(FileNameComparator.Ascending));
+
+ if( files.size() <= 1)
+ return null;
+ lowestIndex = getIndexFromFileName(files.get(0));
+
+ index = lowestIndex + 1 ;
+
+ String name = table_ + "-" + columnFamily_ + "-" + SSTable.temporaryFile_ + "-" + index ;
+ return name;
+ }
+
+
+ /*
+ * This version is used only on start up when we are recovering from logs.
+ * In the future we may want to parellelize the log processing for a table
+ * by having a thread per log file present for recovery. Re-visit at that
+ * time.
+ */
+ void switchMemtable(String key, ColumnFamily columnFamily, CommitLog.CommitLogContext cLogCtx) throws IOException
+ {
+ memtable_.set( new Memtable(table_, columnFamily_) );
+ if(!key.equals(Memtable.flushKey_))
+ memtable_.get().put(key, columnFamily, cLogCtx);
+ }
+
+ /*
+ * This version is used when we forceflush.
+ */
+ void switchMemtable() throws IOException
+ {
+ memtable_.set( new Memtable(table_, columnFamily_) );
+ }
+
+ /*
+ * This version is used only on start up when we are recovering from logs.
+ * In the future we may want to parellelize the log processing for a table
+ * by having a thread per log file present for recovery. Re-visit at that
+ * time.
+ */
+ void switchBinaryMemtable(String key, byte[] buffer) throws IOException
+ {
+ binaryMemtable_.set( new BinaryMemtable(table_, columnFamily_) );
+ binaryMemtable_.get().put(key, buffer);
+ }
+
+ void forceFlush(boolean fRecovery) throws IOException
+ {
+ //MemtableManager.instance().submit(getColumnFamilyName(), memtable_.get() , CommitLog.CommitLogContext.NULL);
+ //memtable_.get().flush(true, CommitLog.CommitLogContext.NULL);
+ memtable_.get().forceflush(this, fRecovery);
+ }
+
+ void forceFlushBinary() throws IOException
+ {
+ BinaryMemtableManager.instance().submit(getColumnFamilyName(), binaryMemtable_.get());
+ //binaryMemtable_.get().flush(true);
+ }
+
+ /**
+ * Insert/Update the column family for this key.
+ * param @ lock - lock that needs to be used.
+ * param @ key - key for update/insert
+ * param @ columnFamily - columnFamily changes
+ */
+ void apply(String key, ColumnFamily columnFamily, CommitLog.CommitLogContext cLogCtx)
+ throws IOException
+ {
+ memtable_.get().put(key, columnFamily, cLogCtx);
+ }
+
+ /*
+ * Insert/Update the column family for this key. param @ lock - lock that
+ * needs to be used. param @ key - key for update/insert param @
+ * columnFamily - columnFamily changes
+ */
+ void applyBinary(String key, byte[] buffer)
+ throws IOException
+ {
+ binaryMemtable_.get().put(key, buffer);
+ }
+
+ /**
+ *
+ * Get the column family in the most efficient order.
+ * 1. Memtable
+ * 2. Sorted list of files
+ */
+ public ColumnFamily getColumnFamily(String key, String cf, IFilter filter) throws IOException
+ {
+ List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
+ ColumnFamily columnFamily = null;
+ long start = System.currentTimeMillis();
+ /* Get the ColumnFamily from Memtable */
+ getColumnFamilyFromCurrentMemtable(key, cf, filter, columnFamilies);
+ if(columnFamilies.size() != 0)
+ {
+ if(filter.isDone())
+ return columnFamilies.get(0);
+ }
+ /* Check if MemtableManager has any historical information */
+ MemtableManager.instance().getColumnFamily(key, columnFamily_, cf, filter, columnFamilies);
+ if(columnFamilies.size() != 0)
+ {
+ columnFamily = resolve(columnFamilies);
+ if(filter.isDone())
+ return columnFamily;
+ columnFamilies.clear();
+ columnFamilies.add(columnFamily);
+ }
+ getColumnFamilyFromDisk(key, cf, columnFamilies, filter);
+ logger_.debug("DISK TIME: " + (System.currentTimeMillis() - start)
+ + " ms.");
+ columnFamily = resolve(columnFamilies);
+
+ return columnFamily;
+ }
+
+ public ColumnFamily getColumnFamilyFromMemory(String key, String cf, IFilter filter)
+ {
+ List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
+ ColumnFamily columnFamily = null;
+ long start = System.currentTimeMillis();
+ /* Get the ColumnFamily from Memtable */
+ getColumnFamilyFromCurrentMemtable(key, cf, filter, columnFamilies);
+ if(columnFamilies.size() != 0)
+ {
+ if(filter.isDone())
+ return columnFamilies.get(0);
+ }
+ /* Check if MemtableManager has any historical information */
+ MemtableManager.instance().getColumnFamily(key, columnFamily_, cf, filter, columnFamilies);
+ if(columnFamilies.size() != 0)
+ {
+ columnFamily = resolve(columnFamilies);
+ if(filter.isDone())
+ return columnFamily;
+ columnFamilies.clear();
+ columnFamilies.add(columnFamily);
+ }
+ columnFamily = resolve(columnFamilies);
+ return columnFamily;
+ }
+
+ /**
+ * Fetch from disk files and go in sorted order to be efficient
+ * This fn exits as soon as the required data is found.
+ * @param key
+ * @param cf
+ * @param columnFamilies
+ * @param filter
+ * @throws IOException
+ */
+ private void getColumnFamilyFromDisk(String key, String cf, List<ColumnFamily> columnFamilies, IFilter filter) throws IOException
+ {
+ /* Scan the SSTables on disk first */
+ List<String> files = new ArrayList<String>();
+ lock_.readLock().lock();
+ try
+ {
+ files.addAll(ssTables_);
+ Collections.sort(files, new FileNameComparator(FileNameComparator.Descending));
+ }
+ finally
+ {
+ lock_.readLock().unlock();
+ }
+
+ for (String file : files)
+ {
+ /*
+ * Get the BloomFilter associated with this file. Check if the key
+ * is present in the BloomFilter. If not continue to the next file.
+ */
+ boolean bVal = SSTable.isKeyInFile(key, file);
+ if ( !bVal )
+ continue;
+ ColumnFamily columnFamily = fetchColumnFamily(key, cf, filter, file);
+ long start = System.currentTimeMillis();
+ if (columnFamily != null)
+ {
+ /*
+ * TODO
+ * By using the filter before removing deleted columns
+ * we have a efficient implementation of timefilter
+ * but for count filter this can return wrong results
+ * we need to take care of that later.
+ */
+ /* suppress columns marked for delete */
+ Map<String, IColumn> columns = columnFamily.getColumns();
+ Set<String> cNames = columns.keySet();
+
+ for (String cName : cNames)
+ {
+ IColumn column = columns.get(cName);
+ if (column.isMarkedForDelete())
+ columns.remove(cName);
+ }
+ columnFamilies.add(columnFamily);
+ if(filter.isDone())
+ {
+ break;
+ }
+ }
+ logger_.debug("DISK Data structure population TIME: " + (System.currentTimeMillis() - start)
+ + " ms.");
+ }
+ files.clear();
+ }
+
+
+ private ColumnFamily fetchColumnFamily(String key, String cf, IFilter filter, String ssTableFile) throws IOException
+ {
+ SSTable ssTable = new SSTable(ssTableFile);
+ long start = System.currentTimeMillis();
+ DataInputBuffer bufIn = null;
+ bufIn = filter.next(key, cf, ssTable);
+ logger_.debug("DISK ssTable.next TIME: " + (System.currentTimeMillis() - start) + " ms.");
+ if (bufIn.getLength() == 0)
+ return null;
+ start = System.currentTimeMillis();
+ ColumnFamily columnFamily = null;
+ columnFamily = ColumnFamily.serializer().deserialize(bufIn, cf, filter);
+ logger_.debug("DISK Deserialize TIME: " + (System.currentTimeMillis() - start) + " ms.");
+ if (columnFamily == null)
+ return columnFamily;
+ return (!columnFamily.isMarkedForDelete()) ? columnFamily : null;
+ }
+
+
+
+ private void getColumnFamilyFromCurrentMemtable(String key, String cf, IFilter filter, List<ColumnFamily> columnFamilies)
+ {
+ /* Get the ColumnFamily from Memtable */
+ ColumnFamily columnFamily = memtable_.get().get(key, cf, filter);
+ if (columnFamily != null)
+ {
+ if (!columnFamily.isMarkedForDelete())
+ columnFamilies.add(columnFamily);
+ }
+ }
+
+ private ColumnFamily resolve(List<ColumnFamily> columnFamilies)
+ {
+ int size = columnFamilies.size();
+ if (size == 0)
+ return null;
+ ColumnFamily cf = columnFamilies.get(0);
+ for ( int i = 1; i < size ; ++i )
+ {
+ cf.addColumns(columnFamilies.get(i));
+ }
+ return cf;
+ }
+
+
+ /*
+ * This version is used only on start up when we are recovering from logs.
+ * Hence no locking is required since we process logs on the main thread. In
+ * the future we may want to parellelize the log processing for a table by
+ * having a thread per log file present for recovery. Re-visit at that time.
+ */
+ void applyNow(String key, ColumnFamily columnFamily) throws IOException
+ {
+ if (!columnFamily.isMarkedForDelete())
+ memtable_.get().putOnRecovery(key, columnFamily);
+ }
+
+ /*
+ * Delete doesn't mean we can blindly delete. We need to write this to disk
+ * as being marked for delete. This is to prevent a previous value from
+ * resuscitating a column family that has been deleted.
+ */
+ void delete(String key, ColumnFamily columnFamily)
+ throws IOException
+ {
+ memtable_.get().remove(key, columnFamily);
+ }
+
+ /*
+ * This method is called when the Memtable is frozen and ready to be flushed
+ * to disk. This method informs the CommitLog that a particular ColumnFamily
+ * is being flushed to disk.
+ */
+ void onMemtableFlush(CommitLog.CommitLogContext cLogCtx) throws IOException
+ {
+ if ( cLogCtx.isValidContext() )
+ CommitLog.open(table_).onMemtableFlush(columnFamily_, cLogCtx);
+ }
+
+ /*
+ * Called after the Memtable flushes its in-memory data. This information is
+ * cached in the ColumnFamilyStore. This is useful for reads because the
+ * ColumnFamilyStore first looks in the in-memory store and the into the
+ * disk to find the key. If invoked during recoveryMode the
+ * onMemtableFlush() need not be invoked.
+ *
+ * param @ filename - filename just flushed to disk
+ * param @ bf - bloom filter which indicates the keys that are in this file.
+ */
+ void storeLocation(String filename, BloomFilter bf) throws IOException
+ {
+ boolean doCompaction = false;
+ int ssTableSize = 0;
+ lock_.writeLock().lock();
+ try
+ {
+ ssTables_.add(filename);
+ SSTable.storeBloomFilter(filename, bf);
+ ssTableSize = ssTables_.size();
+ }
+ finally
+ {
+ lock_.writeLock().unlock();
+ }
+ if (ssTableSize >= threshHold_ && !isCompacting_.get())
+ {
+ doCompaction = true;
+ }
+
+ if (isCompacting_.get())
+ {
+ if ( ssTableSize % threshHold_ == 0 )
+ {
+ doCompaction = true;
+ }
+ }
+ if ( doCompaction )
+ {
+ logger_.debug("Submitting for compaction ...");
+ MinorCompactionManager.instance().submit(ColumnFamilyStore.this);
+ logger_.debug("Submitted for compaction ...");
+ }
+ }
+
+ PriorityQueue<FileStruct> initializePriorityQueue(List<String> files, List<Range> ranges, int minBufferSize) throws IOException
+ {
+ PriorityQueue<FileStruct> pq = new PriorityQueue<FileStruct>();
+ if (files.size() > 1 || (ranges != null && files.size() > 0))
+ {
+ int bufferSize = Math.min( (ColumnFamilyStore.compactionMemoryThreshold_ / files.size()), minBufferSize ) ;
+ FileStruct fs = null;
+ for (String file : files)
+ {
+ try
+ {
+ fs = new FileStruct();
+ fs.bufIn_ = new DataInputBuffer();
+ fs.bufOut_ = new DataOutputBuffer();
+ fs.reader_ = SequenceFile.bufferedReader(file, bufferSize);
+ fs.key_ = null;
+ fs = getNextKey(fs);
+ if(fs == null)
+ continue;
+ pq.add(fs);
+ }
+ catch ( Exception ex)
+ {
+ ex.printStackTrace();
+ try
+ {
+ if(fs != null)
+ {
+ fs.reader_.close();
+ }
+ }
+ catch(Exception e)
+ {
+ logger_.warn("Unable to close file :" + file);
+ }
+ continue;
+ }
+ }
+ }
+ return pq;
+ }
+
+
+ /*
+ * Stage the compactions , compact similar size files.
+ * This fn figures out the files close enough by size and if they
+ * are greater than the threshold then compacts.
+ */
+ Map<Integer, List<String>> stageOrderedCompaction(List<String> files)
+ {
+ // Sort the files based on the generation ID
+ Collections.sort(files, new FileNameComparator(FileNameComparator.Ascending));
+ Map<Integer, List<String>> buckets = new HashMap<Integer, List<String>>();
+ int maxBuckets = 1000;
+ long averages[] = new long[maxBuckets];
+ long min = 50L*1024L*1024L;
+ Integer i = 0;
+ for(String file : files)
+ {
+ File f = new File(file);
+ long size = f.length();
+ if ( (size > averages[i]/2 && size < 3*averages[i]/2) || ( size < min && averages[i] < min ))
+ {
+ averages[i] = (averages[i] + size) / 2 ;
+ List<String> fileList = buckets.get(i);
+ if(fileList == null)
+ {
+ fileList = new ArrayList<String>();
+ buckets.put(i, fileList);
+ }
+ fileList.add(file);
+ }
+ else
+ {
+ if( i >= maxBuckets )
+ break;
+ i++;
+ List<String> fileList = new ArrayList<String>();
+ buckets.put(i, fileList);
+ fileList.add(file);
+ averages[i] = size;
+ }
+ }
+ return buckets;
+ }
+
+
+
+ /*
+ * Break the files into buckets and then compact.
+ */
+ void doCompaction() throws IOException
+ {
+ isCompacting_.set(true);
+ List<String> files = new ArrayList<String>(ssTables_);
+ try
+ {
+ int count = 0;
+ Map<Integer, List<String>> buckets = stageOrderedCompaction(files);
+ Set<Integer> keySet = buckets.keySet();
+ for(Integer key : keySet)
+ {
+ List<String> fileList = buckets.get(key);
+ Collections.sort( fileList , new FileNameComparator( FileNameComparator.Ascending));
+ if(fileList.size() >= threshHold_ )
+ {
+ files.clear();
+ count = 0;
+ for(String file : fileList)
+ {
+ files.add(file);
+ count++;
+ if( count == threshHold_ )
+ break;
+ }
+ try
+ {
+ // For each bucket if it has crossed the threshhold do the compaction
+ // In case of range compaction merge the counting bloom filters also.
+ if( count == threshHold_)
+ doFileCompaction(files, bufSize_);
+ }
+ catch ( Exception ex)
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+ }
+ }
+ }
+ finally
+ {
+ isCompacting_.set(false);
+ }
+ return;
+ }
+
+ void doMajorCompaction(long skip) throws IOException
+ {
+ doMajorCompactionInternal( skip );
+ }
+
+ void doMajorCompaction() throws IOException
+ {
+ doMajorCompactionInternal( 0 );
+ }
+ /*
+ * Compact all the files irrespective of the size.
+ * skip : is the ammount in Gb of the files to be skipped
+ * all files greater than skip GB are skipped for this compaction.
+ * Except if skip is 0 , in that case this is ignored and all files are taken.
+ */
+ void doMajorCompactionInternal(long skip) throws IOException
+ {
+ isCompacting_.set(true);
+ List<String> filesInternal = new ArrayList<String>(ssTables_);
+ List<String> files = null;
+ try
+ {
+ if( skip > 0L )
+ {
+ files = new ArrayList<String>();
+ for ( String file : filesInternal )
+ {
+ File f = new File(file);
+ if( f.length() < skip*1024L*1024L*1024L )
+ {
+ files.add(file);
+ }
+ }
+ }
+ else
+ {
+ files = filesInternal;
+ }
+ doFileCompaction(files, bufSize_);
+ }
+ catch ( Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ finally
+ {
+ isCompacting_.set(false);
+ }
+ return ;
+ }
+
+ /*
+ * Add up all the files sizes this is the worst case file
+ * size for compaction of all the list of files given.
+ */
+ long getExpectedCompactedFileSize(List<String> files)
+ {
+ long expectedFileSize = 0;
+ for(String file : files)
+ {
+ File f = new File(file);
+ long size = f.length();
+ expectedFileSize = expectedFileSize + size;
+ }
+ return expectedFileSize;
+ }
+
+
+ /*
+ * Find the maximum size file in the list .
+ */
+ String getMaxSizeFile( List<String> files )
+ {
+ long maxSize = 0L;
+ String maxFile = null;
+ for ( String file : files )
+ {
+ File f = new File(file);
+ if(f.length() > maxSize )
+ {
+ maxSize = f.length();
+ maxFile = file;
+ }
+ }
+ return maxFile;
+ }
+
+
+ Range getMaxRange( List<Range> ranges )
+ {
+ Range maxRange = new Range( BigInteger.ZERO, BigInteger.ZERO );
+ for( Range range : ranges)
+ {
+ if( range.left().compareTo(maxRange.left()) > 0 )
+ {
+ maxRange = range;
+ }
+ }
+ return maxRange;
+ }
+
+ boolean isLoopAround ( List<Range> ranges )
+ {
+ boolean isLoop = false;
+ for( Range range : ranges)
+ {
+ if( range.left().compareTo(range.right()) > 0 )
+ {
+ isLoop = true;
+ break;
+ }
+ }
+ return isLoop;
+ }
+
+ boolean doAntiCompaction(List<Range> ranges, EndPoint target, List<String> fileList) throws IOException
+ {
+ isCompacting_.set(true);
+ List<String> files = new ArrayList<String>(ssTables_);
+ boolean result = true;
+ try
+ {
+ result = doFileAntiCompaction(files, ranges, target, bufSize_, fileList, null);
+ }
+ catch ( Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ finally
+ {
+ isCompacting_.set(false);
+ }
+ return result;
+
+ }
+
+ /*
+ * Read the next key from the data file , this fn will skip teh block index
+ * and read teh next available key into the filestruct that is passed.
+ * If it cannot read or a end of file is reached it will return null.
+ */
+ FileStruct getNextKey(FileStruct filestruct) throws IOException
+ {
+ filestruct.bufOut_.reset();
+ if (filestruct.reader_.isEOF())
+ {
+ filestruct.reader_.close();
+ return null;
+ }
+
+ long bytesread = filestruct.reader_.next(filestruct.bufOut_);
+ if (bytesread == -1)
+ {
+ filestruct.reader_.close();
+ return null;
+ }
+
+ filestruct.bufIn_.reset(filestruct.bufOut_.getData(), filestruct.bufOut_.getLength());
+ filestruct.key_ = filestruct.bufIn_.readUTF();
+ /* If the key we read is the Block Index Key then we are done reading the keys so exit */
+ if ( filestruct.key_.equals(SSTable.blockIndexKey_) )
+ {
+ filestruct.reader_.close();
+ return null;
+ }
+ return filestruct;
+ }
+
+ void forceCleanup()
+ {
+ MinorCompactionManager.instance().submitCleanup(ColumnFamilyStore.this);
+ }
+
+ /**
+ * This function goes over each file and removes the keys that the node is not responsible for
+ * and only keeps keys that this node is responsible for.
+ * @throws IOException
+ */
+ void doCleanupCompaction() throws IOException
+ {
+ isCompacting_.set(true);
+ List<String> files = new ArrayList<String>(ssTables_);
+ for(String file: files)
+ {
+ try
+ {
+ doCleanup(file);
+ }
+ catch ( Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+ isCompacting_.set(false);
+ }
+ /**
+ * cleans up one particular file by removing keys that this node is not responsible for.
+ * @param file
+ * @throws IOException
+ */
+ /* TODO: Take care of the comments later. */
+ void doCleanup(String file) throws IOException
+ {
+ if(file == null )
+ return;
+ List<Range> myRanges = null;
+ List<String> files = new ArrayList<String>();
+ files.add(file);
+ List<String> newFiles = new ArrayList<String>();
+ Map<EndPoint, List<Range>> endPointtoRangeMap = StorageService.instance().constructEndPointToRangesMap();
+ myRanges = endPointtoRangeMap.get(StorageService.getLocalStorageEndPoint());
+ List<BloomFilter> compactedBloomFilters = new ArrayList<BloomFilter>();
+ doFileAntiCompaction(files, myRanges, null, bufSize_, newFiles, compactedBloomFilters);
+ logger_.debug("Original file : " + file + " of size " + new File(file).length());
+ lock_.writeLock().lock();
+ try
+ {
+ ssTables_.remove(file);
+ SSTable.removeAssociatedBloomFilter(file);
+ for (String newfile : newFiles)
+ {
+ logger_.debug("New file : " + newfile + " of size " + new File(newfile).length());
+ if ( newfile != null )
+ {
+ ssTables_.add(newfile);
+ logger_.debug("Inserting bloom filter for file " + newfile);
+ SSTable.storeBloomFilter(newfile, compactedBloomFilters.get(0));
+ }
+ }
+ SSTable.delete(file);
+ }
+ finally
+ {
+ lock_.writeLock().unlock();
+ }
+ }
+
+ /**
+ * This function is used to do the anti compaction process , it spits out the file which has keys that belong to a given range
+ * If the target is not specified it spits out the file as a compacted file with the unecessary ranges wiped out.
+ * @param files
+ * @param ranges
+ * @param target
+ * @param minBufferSize
+ * @param fileList
+ * @return
+ * @throws IOException
+ */
+ boolean doFileAntiCompaction(List<String> files, List<Range> ranges, EndPoint target, int minBufferSize, List<String> fileList, List<BloomFilter> compactedBloomFilters) throws IOException
+ {
+ boolean result = false;
+ long startTime = System.currentTimeMillis();
+ long totalBytesRead = 0;
+ long totalBytesWritten = 0;
+ long totalkeysRead = 0;
+ long totalkeysWritten = 0;
+ String rangeFileLocation = null;
+ String mergedFileName = null;
+ try
+ {
+ // Calculate the expected compacted filesize
+ long expectedRangeFileSize = getExpectedCompactedFileSize(files);
+ /* in the worst case a node will be giving out alf of its data so we take a chance */
+ expectedRangeFileSize = expectedRangeFileSize / 2;
+ rangeFileLocation = DatabaseDescriptor.getCompactionFileLocation(expectedRangeFileSize);
+// boolean isLoop = isLoopAround( ranges );
+// Range maxRange = getMaxRange( ranges );
+ // If the compaction file path is null that means we have no space left for this compaction.
+ if( rangeFileLocation == null )
+ {
+ logger_.warn("Total bytes to be written for range compaction ..."
+ + expectedRangeFileSize + " is greater than the safe limit of the disk space available.");
+ return result;
+ }
+ PriorityQueue<FileStruct> pq = initializePriorityQueue(files, ranges, minBufferSize);
+ if (pq.size() > 0)
+ {
+ mergedFileName = getTempFileName();
+ SSTable ssTableRange = null ;
+ String lastkey = null;
+ List<FileStruct> lfs = new ArrayList<FileStruct>();
+ DataOutputBuffer bufOut = new DataOutputBuffer();
+ int expectedBloomFilterSize = SSTable.getApproximateKeyCount(files);
+ expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize : SSTable.indexInterval();
+ logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
+ /* Create the bloom filter for the compacted file. */
+ BloomFilter compactedRangeBloomFilter = new BloomFilter(expectedBloomFilterSize, 15);
+ List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
+
+ while (pq.size() > 0 || lfs.size() > 0)
+ {
+ FileStruct fs = null;
+ if (pq.size() > 0)
+ {
+ fs = pq.poll();
+ }
+ if (fs != null
+ && (lastkey == null || lastkey.compareTo(fs.key_) == 0))
+ {
+ // The keys are the same so we need to add this to the
+ // ldfs list
+ lastkey = fs.key_;
+ lfs.add(fs);
+ }
+ else
+ {
+ Collections.sort(lfs, new FileStructComparator());
+ ColumnFamily columnFamily = null;
+ bufOut.reset();
+ if(lfs.size() > 1)
+ {
+ for (FileStruct filestruct : lfs)
+ {
+ try
+ {
+ /* read the length although we don't need it */
+ filestruct.bufIn_.readInt();
+ // Skip the Index
+ IndexHelper.skipBloomFilterAndIndex(filestruct.bufIn_);
+ // We want to add only 2 and resolve them right there in order to save on memory footprint
+ if(columnFamilies.size() > 1)
+ {
+ // Now merge the 2 column families
+ columnFamily = resolve(columnFamilies);
+ columnFamilies.clear();
+ if( columnFamily != null)
+ {
+ // add the merged columnfamily back to the list
+ columnFamilies.add(columnFamily);
+ }
+
+ }
+ // deserialize into column families
+ columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.bufIn_));
+ }
+ catch ( Exception ex)
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ continue;
+ }
+ }
+ // Now after merging all crap append to the sstable
+ columnFamily = resolve(columnFamilies);
+ columnFamilies.clear();
+ if( columnFamily != null )
+ {
+ /* serialize the cf with column indexes */
+ ColumnFamily.serializer2().serialize(columnFamily, bufOut);
+ }
+ }
+ else
+ {
+ FileStruct filestruct = lfs.get(0);
+ try
+ {
+ /* read the length although we don't need it */
+ int size = filestruct.bufIn_.readInt();
+ bufOut.write(filestruct.bufIn_, size);
+ }
+ catch ( Exception ex)
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ filestruct.reader_.close();
+ continue;
+ }
+ }
+ if ( Range.isKeyInRanges(ranges, lastkey) )
+ {
+ if(ssTableRange == null )
+ {
+ if( target != null )
+ rangeFileLocation = rangeFileLocation + System.getProperty("file.separator") + "bootstrap";
+ FileUtils.createDirectory(rangeFileLocation);
+ ssTableRange = new SSTable(rangeFileLocation, mergedFileName);
+ }
+ try
+ {
+ ssTableRange.append(lastkey, bufOut);
+ compactedRangeBloomFilter.fill(lastkey);
+ }
+ catch(Exception ex)
+ {
+ logger_.warn( LogUtil.throwableToString(ex) );
+ }
+ }
+ totalkeysWritten++;
+ for (FileStruct filestruct : lfs)
+ {
+ try
+ {
+ filestruct = getNextKey ( filestruct );
+ if(filestruct == null)
+ {
+ continue;
+ }
+ /* keep on looping until we find a key in the range */
+ while ( !Range.isKeyInRanges(ranges, filestruct.key_ ) )
+ {
+ filestruct = getNextKey ( filestruct );
+ if(filestruct == null)
+ {
+ break;
+ }
+ /* check if we need to continue , if we are done with ranges empty the queue and close all file handles and exit */
+ //if( !isLoop && StorageService.hash(filestruct.key).compareTo(maxRange.right()) > 0 && !filestruct.key.equals(""))
+ //{
+ //filestruct.reader.close();
+ //filestruct = null;
+ //break;
+ //}
+ }
+ if ( filestruct != null)
+ {
+ pq.add(filestruct);
+ }
+ totalkeysRead++;
+ }
+ catch ( Exception ex )
+ {
+ // Ignore the exception as it might be a corrupted file
+ // in any case we have read as far as possible from it
+ // and it will be deleted after compaction.
+ logger_.warn(LogUtil.throwableToString(ex));
+ filestruct.reader_.close();
+ continue;
+ }
+ }
+ lfs.clear();
+ lastkey = null;
+ if (fs != null)
+ {
+ // Add back the fs since we processed the rest of
+ // filestructs
+ pq.add(fs);
+ }
+ }
+ }
+ if( ssTableRange != null )
+ {
+ if ( fileList == null )
+ fileList = new ArrayList<String>();
+ ssTableRange.closeRename(compactedRangeBloomFilter, fileList);
+ if(compactedBloomFilters != null)
+ compactedBloomFilters.add(compactedRangeBloomFilter);
+ }
+ }
+ }
+ catch ( Exception ex)
+ {
+ logger_.warn( LogUtil.throwableToString(ex) );
+ }
+ logger_.debug("Total time taken for range split ..."
+ + (System.currentTimeMillis() - startTime));
+ logger_.debug("Total bytes Read for range split ..." + totalBytesRead);
+ logger_.debug("Total bytes written for range split ..."
+ + totalBytesWritten + " Total keys read ..." + totalkeysRead);
+ return result;
+ }
+
+ private void doWrite(SSTable ssTable, String key, DataOutputBuffer bufOut) throws IOException
+ {
+ PartitionerType pType = StorageService.getPartitionerType();
+ switch ( pType )
+ {
+ case OPHF:
+ ssTable.append(key, bufOut);
+ break;
+
+ default:
+ String[] peices = key.split(":");
+ key = peices[1];
+ BigInteger hash = new BigInteger(peices[0]);
+ ssTable.append(key, hash, bufOut);
+ break;
+ }
+ }
+
+ private void doFill(BloomFilter bf, String key)
+ {
+ PartitionerType pType = StorageService.getPartitionerType();
+ switch ( pType )
+ {
+ case OPHF:
+ bf.fill(key);
+ break;
+
+ default:
+ String[] peices = key.split(":");
+ bf.fill(peices[1]);
+ break;
+ }
+ }
+
+ /*
+ * This function does the actual compaction for files.
+ * It maintains a priority queue of with the first key from each file
+ * and then removes the top of the queue and adds it to the SStable and
+ * repeats this process while reading the next from each file until its
+ * done with all the files . The SStable to which the keys are written
+ * represents the new compacted file. Before writing if there are keys
+ * that occur in multiple files and are the same then a resolution is done
+ * to get the latest data.
+ *
+ */
+ void doFileCompaction(List<String> files, int minBufferSize) throws IOException
+ {
+ String newfile = null;
+ long startTime = System.currentTimeMillis();
+ long totalBytesRead = 0;
+ long totalBytesWritten = 0;
+ long totalkeysRead = 0;
+ long totalkeysWritten = 0;
+ try
+ {
+ // Calculate the expected compacted filesize
+ long expectedCompactedFileSize = getExpectedCompactedFileSize(files);
+ String compactionFileLocation = DatabaseDescriptor.getCompactionFileLocation(expectedCompactedFileSize);
+ // If the compaction file path is null that means we have no space left for this compaction.
+ if( compactionFileLocation == null )
+ {
+ String maxFile = getMaxSizeFile( files );
+ files.remove( maxFile );
+ doFileCompaction(files , minBufferSize);
+ return;
+ }
+ PriorityQueue<FileStruct> pq = initializePriorityQueue(files, null, minBufferSize);
+ if (pq.size() > 0)
+ {
+ String mergedFileName = getTempFileName( files );
+ SSTable ssTable = null;
+ String lastkey = null;
+ List<FileStruct> lfs = new ArrayList<FileStruct>();
+ DataOutputBuffer bufOut = new DataOutputBuffer();
+ int expectedBloomFilterSize = SSTable.getApproximateKeyCount(files);
+ expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize : SSTable.indexInterval();
+ logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
+ /* Create the bloom filter for the compacted file. */
+ BloomFilter compactedBloomFilter = new BloomFilter(expectedBloomFilterSize, 15);
+ List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
+
+ while (pq.size() > 0 || lfs.size() > 0)
+ {
+ FileStruct fs = null;
+ if (pq.size() > 0)
+ {
+ fs = pq.poll();
+ }
+ if (fs != null
+ && (lastkey == null || lastkey.compareTo(fs.key_) == 0))
+ {
+ // The keys are the same so we need to add this to the
+ // ldfs list
+ lastkey = fs.key_;
+ lfs.add(fs);
+ }
+ else
+ {
+ Collections.sort(lfs, new FileStructComparator());
+ ColumnFamily columnFamily = null;
+ bufOut.reset();
+ if(lfs.size() > 1)
+ {
+ for (FileStruct filestruct : lfs)
+ {
+ try
+ {
+ /* read the length although we don't need it */
+ filestruct.bufIn_.readInt();
+ // Skip the Index
+ IndexHelper.skipBloomFilterAndIndex(filestruct.bufIn_);
+ // We want to add only 2 and resolve them right there in order to save on memory footprint
+ if(columnFamilies.size() > 1)
+ {
+ // Now merge the 2 column families
+ columnFamily = resolve(columnFamilies);
+ columnFamilies.clear();
+ if( columnFamily != null)
+ {
+ // add the merged columnfamily back to the list
+ columnFamilies.add(columnFamily);
+ }
+
+ }
+ // deserialize into column families
+ columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.bufIn_));
+ }
+ catch ( Exception ex)
+ {
+ continue;
+ }
+ }
+ // Now after merging all crap append to the sstable
+ columnFamily = resolve(columnFamilies);
+ columnFamilies.clear();
+ if( columnFamily != null )
+ {
+ /* serialize the cf with column indexes */
+ ColumnFamily.serializer2().serialize(columnFamily, bufOut);
+ }
+ }
+ else
+ {
+ FileStruct filestruct = lfs.get(0);
+ try
+ {
+ /* read the length although we don't need it */
+ int size = filestruct.bufIn_.readInt();
+ bufOut.write(filestruct.bufIn_, size);
+ }
+ catch ( Exception ex)
+ {
+ ex.printStackTrace();
+ filestruct.reader_.close();
+ continue;
+ }
+ }
+
+ if ( ssTable == null )
+ {
+ PartitionerType pType = StorageService.getPartitionerType();
+ ssTable = new SSTable(compactionFileLocation, mergedFileName, pType);
+ }
+ doWrite(ssTable, lastkey, bufOut);
+
+ /* Fill the bloom filter with the key */
+ doFill(compactedBloomFilter, lastkey);
+ totalkeysWritten++;
+ for (FileStruct filestruct : lfs)
+ {
+ try
+ {
+ filestruct = getNextKey(filestruct);
+ if(filestruct == null)
+ {
+ continue;
+ }
+ pq.add(filestruct);
+ totalkeysRead++;
+ }
+ catch ( Throwable ex )
+ {
+ // Ignore the exception as it might be a corrupted file
+ // in any case we have read as far as possible from it
+ // and it will be deleted after compaction.
+ filestruct.reader_.close();
+ continue;
+ }
+ }
+ lfs.clear();
+ lastkey = null;
+ if (fs != null)
+ {
+ /* Add back the fs since we processed the rest of filestructs */
+ pq.add(fs);
+ }
+ }
+ }
+ if ( ssTable != null )
+ {
+ ssTable.closeRename(compactedBloomFilter);
+ newfile = ssTable.getDataFileLocation();
+ }
+ lock_.writeLock().lock();
+ try
+ {
+ for (String file : files)
+ {
+ ssTables_.remove(file);
+ SSTable.removeAssociatedBloomFilter(file);
+ }
+ if ( newfile != null )
+ {
+ ssTables_.add(newfile);
+ logger_.debug("Inserting bloom filter for file " + newfile);
+ SSTable.storeBloomFilter(newfile, compactedBloomFilter);
+ totalBytesWritten = (new File(newfile)).length();
+ }
+ }
+ finally
+ {
+ lock_.writeLock().unlock();
+ }
+ for (String file : files)
+ {
+ SSTable.delete(file);
+ }
+ }
+ }
+ catch ( Exception ex)
+ {
+ logger_.warn( LogUtil.throwableToString(ex) );
+ }
+ logger_.debug("Total time taken for compaction ..."
+ + (System.currentTimeMillis() - startTime));
+ logger_.debug("Total bytes Read for compaction ..." + totalBytesRead);
+ logger_.debug("Total bytes written for compaction ..."
+ + totalBytesWritten + " Total keys read ..." + totalkeysRead);
+ return;
+ }
+
+ /*
+ * Take a snap shot of this columnfamily store.
+ */
+ public void snapshot( String snapshotDirectory ) throws IOException
+ {
+ File snapshotDir = new File(snapshotDirectory);
+ if( !snapshotDir.exists() )
+ snapshotDir.mkdir();
+ lock_.writeLock().lock();
+ List<String> files = new ArrayList<String>(ssTables_);
+ try
+ {
+ for (String file : files)
+ {
+ File f = new File(file);
+ Path existingLink = f.toPath();
+ File hardLinkFile = new File(snapshotDirectory + System.getProperty("file.separator") + f.getName());
+ Path hardLink = hardLinkFile.toPath();
+ hardLink.createLink(existingLink);
+ }
+ }
+ finally
+ {
+ lock_.writeLock().unlock();
+ }
+ }
+}