You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/27 06:39:42 UTC
svn commit: r759026 [1/3] -
/incubator/cassandra/trunk/src/org/apache/cassandra/db/
Author: alakshman
Date: Fri Mar 27 05:39:40 2009
New Revision: 759026
URL: http://svn.apache.org/viewvc?rev=759026&view=rev
Log:
This is a wierd revert to fix some issues. Some changes will need to be re-applied.
Added:
incubator/cassandra/trunk/src/org/apache/cassandra/db/PrimaryKey.java
- copied unchanged from r758964, incubator/cassandra/trunk/src/org/apache/cassandra/db/PrimaryKey.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/SequentialScanner.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/WriteResponseMessage.java
- copied unchanged from r758964, incubator/cassandra/trunk/src/org/apache/cassandra/db/WriteResponseMessage.java
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/db/AbstractColumnFactory.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryMemtable.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnComparatorFactory.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLog.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/CountFilter.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/DBManager.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStructComparator.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/HintedHandOffManager.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/IColumn.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/MemtableManager.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/NamesFilter.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/RecoveryManager.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/Row.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutationVerbHandler.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/SystemTable.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/Table.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/TimeFilter.java
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/AbstractColumnFactory.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/AbstractColumnFactory.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/AbstractColumnFactory.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/AbstractColumnFactory.java Fri Mar 27 05:39:40 2009
@@ -31,13 +31,13 @@
abstract class AbstractColumnFactory
{
private static Map<String, AbstractColumnFactory> columnFactory_ = new HashMap<String, AbstractColumnFactory>();
-
+
static
{
columnFactory_.put(ColumnFamily.getColumnType("Standard"),new ColumnFactory());
columnFactory_.put(ColumnFamily.getColumnType("Super"),new SuperColumnFactory());
}
-
+
static AbstractColumnFactory getColumnFactory(String columnType)
{
/* Create based on the type required. */
@@ -46,11 +46,10 @@
else
return columnFactory_.get("Super");
}
-
+
public abstract IColumn createColumn(String name);
public abstract IColumn createColumn(String name, byte[] value);
- public abstract IColumn createColumn(String name, byte[] value, long timestamp);
- public abstract IColumn createColumn(String name, byte[] value, long timestamp, boolean deleted);
+ public abstract IColumn createColumn(String name, byte[] value, long timestamp);
public abstract ICompactSerializer2<IColumn> createColumnSerializer();
}
@@ -60,21 +59,17 @@
{
return new Column(name);
}
-
+
public IColumn createColumn(String name, byte[] value)
{
return new Column(name, value);
}
-
+
public IColumn createColumn(String name, byte[] value, long timestamp)
{
return new Column(name, value, timestamp);
}
-
- public IColumn createColumn(String name, byte[] value, long timestamp, boolean deleted) {
- return new Column(name, value, timestamp, deleted);
- }
-
+
public ICompactSerializer2<IColumn> createColumnSerializer()
{
return Column.serializer();
@@ -108,28 +103,29 @@
}
return superColumn;
}
-
+
public IColumn createColumn(String name, byte[] value)
{
- return createColumn(name, value, 0);
+ String[] values = SuperColumnFactory.getSuperColumnAndColumn(name);
+ if ( values.length != 2 )
+ throw new IllegalArgumentException("Super Column " + name + " in invalid format. Must be in <super column name>:<column name> format.");
+ IColumn superColumn = new SuperColumn(values[0]);
+ IColumn subColumn = new Column(values[1], value);
+ superColumn.addColumn(values[1], subColumn);
+ return superColumn;
}
-
- public IColumn createColumn(String name, byte[] value, long timestamp)
- {
- return createColumn(name, value, timestamp, false);
- }
-
- public IColumn createColumn(String name, byte[] value, long timestamp, boolean deleted)
+
+ public IColumn createColumn(String name, byte[] value, long timestamp)
{
String[] values = SuperColumnFactory.getSuperColumnAndColumn(name);
if ( values.length != 2 )
throw new IllegalArgumentException("Super Column " + name + " in invalid format. Must be in <super column name>:<column name> format.");
IColumn superColumn = new SuperColumn(values[0]);
- IColumn subColumn = new Column(values[1], value, timestamp, deleted);
+ IColumn subColumn = new Column(values[1], value, timestamp);
superColumn.addColumn(values[1], subColumn);
return superColumn;
}
-
+
public ICompactSerializer2<IColumn> createColumnSerializer()
{
return SuperColumn.serializer();
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryMemtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryMemtable.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryMemtable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/BinaryMemtable.java Fri Mar 27 05:39:40 2009
@@ -29,9 +29,8 @@
import java.util.concurrent.locks.ReentrantLock;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.io.SSTable;
-
+import org.apache.cassandra.utils.BloomFilter;
import org.apache.log4j.Logger;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java Fri Mar 27 05:39:40 2009
@@ -18,41 +18,60 @@
package org.apache.cassandra.db;
+import java.io.DataInput;
import java.io.DataInputStream;
+import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.Serializable;
import java.util.Collection;
-
-import org.apache.commons.lang.ArrayUtils;
-
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.IFileReader;
+import org.apache.cassandra.io.IFileWriter;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.HashingSchemes;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
/**
- * Column is immutable, which prevents all kinds of confusion in a multithreaded environment.
- * (TODO: look at making SuperColumn immutable too. This is trickier but is probably doable
- * with something like PCollections -- http://code.google.com
- *
- * Author : Avinash Lakshman ( alakshman@facebook.com ) & Prashant Malik ( pmalik@facebook.com )
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
*/
-public final class Column implements IColumn
+public final class Column implements IColumn, Serializable
{
- private static ColumnSerializer serializer_ = new ColumnSerializer();
+ private static Logger logger_ = Logger.getLogger(SuperColumn.class);
+ private static ICompactSerializer2<IColumn> serializer_;
+ private final static String seperator_ = ":";
+ static
+ {
+ serializer_ = new ColumnSerializer();
+ }
- static ColumnSerializer serializer()
+ static ICompactSerializer2<IColumn> serializer()
{
return serializer_;
}
- private final String name;
- private final byte[] value;
- private final long timestamp;
- private final boolean isMarkedForDelete;
+ private String name_;
+ private byte[] value_ = new byte[0];
+ private long timestamp_ = 0;
+
+ private transient AtomicBoolean isMarkedForDelete_;
+
+ /* CTOR for JAXB */
+ Column()
+ {
+ }
Column(String name)
{
- this(name, ArrayUtils.EMPTY_BYTE_ARRAY);
+ name_ = name;
}
Column(String name, byte[] value)
@@ -62,71 +81,54 @@
Column(String name, byte[] value, long timestamp)
{
- this(name, value, timestamp, false);
- }
-
- Column(String name, byte[] value, long timestamp, boolean isDeleted)
- {
- assert name != null;
- assert value != null;
- this.name = name;
- this.value = value;
- this.timestamp = timestamp;
- isMarkedForDelete = isDeleted;
+ this(name);
+ value_ = value;
+ timestamp_ = timestamp;
}
public String name()
{
- return name;
- }
-
- public IColumn getSubColumn(String columnName)
- {
- throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
+ return name_;
}
public byte[] value()
{
- return value;
+ return value_;
}
public byte[] value(String key)
{
- throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
+ throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
}
public Collection<IColumn> getSubColumns()
{
- throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
+ throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
+ }
+
+ public IColumn getSubColumn( String columnName )
+ {
+ throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
}
public int getObjectCount()
{
- return 1;
+ return 1;
}
public long timestamp()
{
- return timestamp;
+ return timestamp_;
}
public long timestamp(String key)
{
- throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
+ throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
}
public boolean isMarkedForDelete()
{
- return isMarkedForDelete;
- }
-
- public long getMarkedForDeleteAt()
- {
- if (!isMarkedForDelete())
- {
- throw new IllegalStateException("column is not marked for delete");
- }
- return timestamp;
+ return (isMarkedForDelete_ != null) ? isMarkedForDelete_.get() : false;
}
public int size()
@@ -140,11 +142,11 @@
* + entire byte array.
*/
- /*
- * We store the string as UTF-8 encoded, so when we calculate the length, it
- * should be converted to UTF-8.
- */
- return IColumn.UtfPrefix_ + FBUtilities.getUTF8Length(name) + DBConstants.boolSize_ + DBConstants.tsSize_ + DBConstants.intSize_ + value.length;
+ /*
+ * We store the string as UTF-8 encoded, so when we calculate the length, it
+ * should be converted to UTF-8.
+ */
+ return IColumn.UtfPrefix_ + FBUtilities.getUTF8Length(name_) + DBConstants.boolSize_ + DBConstants.tsSize_ + DBConstants.intSize_ + value_.length;
}
/*
@@ -153,43 +155,112 @@
*/
public int serializedSize()
{
- return size();
+ return size();
}
public void addColumn(String name, IColumn column)
{
- throw new UnsupportedOperationException("This operation is not supported for simple columns.");
+ throw new UnsupportedOperationException("This operation is not supported for simple columns.");
+ }
+
+ public void delete()
+ {
+ if ( isMarkedForDelete_ == null )
+ isMarkedForDelete_ = new AtomicBoolean(true);
+ else
+ isMarkedForDelete_.set(true);
+ value_ = new byte[0];
}
+ public void repair(IColumn column)
+ {
+ if( timestamp() < column.timestamp() )
+ {
+ value_ = column.value();
+ timestamp_ = column.timestamp();
+ }
+ }
public IColumn diff(IColumn column)
{
- if (timestamp() < column.timestamp())
- {
- return column;
- }
- return null;
+ IColumn columnDiff = null;
+ if( timestamp() < column.timestamp() )
+ {
+ columnDiff = new Column(column.name(),column.value(),column.timestamp());
+ }
+ return columnDiff;
+ }
+
+ /*
+ * Resolve the column by comparing timestamps
+ * if a newer vaue is being input
+ * take the change else ignore .
+ *
+ */
+ public boolean putColumn(IColumn column)
+ {
+ if ( !(column instanceof Column))
+ throw new UnsupportedOperationException("Only Column objects should be put here");
+ if( !name_.equals(column.name()))
+ throw new IllegalArgumentException("The name should match the name of the current column or super column");
+ if(timestamp_ <= column.timestamp())
+ {
+ return true;
+ }
+ return false;
}
public String toString()
{
- StringBuilder sb = new StringBuilder();
- sb.append(name);
- sb.append(":");
- sb.append(isMarkedForDelete());
- sb.append(":");
- sb.append(value().length);
- sb.append("@");
- sb.append(timestamp());
- return sb.toString();
+ StringBuilder sb = new StringBuilder();
+ sb.append(name_);
+ sb.append(":");
+ sb.append(isMarkedForDelete());
+ sb.append(":");
+ sb.append(timestamp());
+ sb.append(":");
+ sb.append(value().length);
+ sb.append(":");
+ sb.append(value());
+ sb.append(":");
+ return sb.toString();
}
public byte[] digest()
{
- StringBuilder stringBuilder = new StringBuilder();
- stringBuilder.append(name);
- stringBuilder.append(":");
- stringBuilder.append(timestamp);
- return stringBuilder.toString().getBytes();
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append(name_);
+ stringBuilder.append(seperator_);
+ stringBuilder.append(timestamp_);
+ return stringBuilder.toString().getBytes();
+ }
+
+ /**
+ * This method is basically implemented for Writable interface
+ * for M/R.
+ */
+ public void readFields(DataInput in) throws IOException
+ {
+ name_ = in.readUTF();
+ boolean delete = in.readBoolean();
+ long ts = in.readLong();
+ int size = in.readInt();
+ byte[] value = new byte[size];
+ in.readFully(value);
+ if ( delete )
+ delete();
+ }
+
+ /**
+ * This method is basically implemented for Writable interface
+ * for M/R.
+ */
+ public void write(DataOutput out) throws IOException
+ {
+ out.writeUTF(name_);
+ out.writeBoolean(isMarkedForDelete());
+ out.writeLong(timestamp_);
+ out.writeInt(value().length);
+ out.write(value());
}
}
@@ -213,7 +284,9 @@
int size = dis.readInt();
byte[] value = new byte[size];
dis.readFully(value);
- column = new Column(name, value, ts, delete);
+ column = new Column(name, value, ts);
+ if ( delete )
+ column.delete();
return column;
}
@@ -230,7 +303,7 @@
{
if ( dis.available() == 0 )
return null;
-
+
String name = dis.readUTF();
IColumn column = new Column(name);
column = filter.filter(column, dis);
@@ -266,8 +339,8 @@
/*
* If this is being called with identity filter
* since a column name is passed in we know
- * that this is a final call
- * Hence if the column is found set the filter to done
+ * that this is a final call
+ * Hence if the column is found set the filter to done
* so that we do not look for the column in further files
*/
IdentityFilter f = (IdentityFilter)filter;
@@ -295,6 +368,5 @@
/* size of the column */
int size = dis.readInt();
dis.skip(size);
- }
+ }
}
-
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnComparatorFactory.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnComparatorFactory.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnComparatorFactory.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnComparatorFactory.java Fri Mar 27 05:39:40 2009
@@ -28,123 +28,127 @@
public class ColumnComparatorFactory
{
- public static enum ComparatorType
- {
- NAME,
- TIMESTAMP
- }
-
- private static Comparator<IColumn> nameComparator_ = new ColumnNameComparator();
- private static Comparator<IColumn> timestampComparator_ = new ColumnTimestampComparator();
-
- public static Comparator<IColumn> getComparator(ComparatorType comparatorType)
- {
- Comparator<IColumn> columnComparator = timestampComparator_;
-
- switch (comparatorType)
- {
- case NAME:
- columnComparator = nameComparator_;
- break;
-
- case TIMESTAMP:
-
- default:
- columnComparator = timestampComparator_;
- break;
- }
-
- return columnComparator;
- }
-
- public static Comparator<IColumn> getComparator(int comparatorTypeInt)
- {
- ComparatorType comparatorType = ComparatorType.NAME;
-
- if (comparatorTypeInt == ComparatorType.NAME.ordinal())
- {
- comparatorType = ComparatorType.NAME;
- }
- else if (comparatorTypeInt == ComparatorType.TIMESTAMP.ordinal())
- {
- comparatorType = ComparatorType.TIMESTAMP;
- }
- return getComparator(comparatorType);
- }
-
+ public static enum ComparatorType
+ {
+ NAME,
+ TIMESTAMP
+ }
+
+ private static Comparator<IColumn> nameComparator_ = new ColumnNameComparator();
+ private static Comparator<IColumn> timestampComparator_ = new ColumnTimestampComparator();
+
+ public static Comparator<IColumn> getComparator(ComparatorType comparatorType)
+ {
+ Comparator<IColumn> columnComparator = timestampComparator_;
+
+ switch(comparatorType)
+ {
+ case NAME:
+ columnComparator = nameComparator_;
+ break;
+
+ case TIMESTAMP:
+
+ default:
+ columnComparator = timestampComparator_;
+ break;
+ }
+
+ return columnComparator;
+ }
+
+ public static Comparator<IColumn> getComparator(int comparatorTypeInt)
+ {
+ ComparatorType comparatorType = ComparatorType.NAME;
+
+ if(comparatorTypeInt == ComparatorType.NAME.ordinal())
+ {
+ comparatorType = ComparatorType.NAME;
+ }
+ else if(comparatorTypeInt == ComparatorType.TIMESTAMP.ordinal())
+ {
+ comparatorType = ComparatorType.TIMESTAMP;
+ }
+ return getComparator(comparatorType);
+ }
+
+ public static void main(String[] args)
+ {
+ IColumn col1 = new Column("Column-9");
+ IColumn col2 = new Column("Column-10");
+ System.out.println("Result of compare: " + getComparator(ComparatorType.NAME).compare(col1, col2));
+ }
}
abstract class AbstractColumnComparator implements Comparator<IColumn>, Serializable
{
- protected ColumnComparatorFactory.ComparatorType comparatorType_;
-
- public AbstractColumnComparator(ColumnComparatorFactory.ComparatorType comparatorType)
- {
- comparatorType_ = comparatorType;
- }
+ protected ColumnComparatorFactory.ComparatorType comparatorType_;
- ColumnComparatorFactory.ComparatorType getComparatorType()
- {
- return comparatorType_;
- }
+ public AbstractColumnComparator(ColumnComparatorFactory.ComparatorType comparatorType)
+ {
+ comparatorType_ = comparatorType;
+ }
+
+ ColumnComparatorFactory.ComparatorType getComparatorType()
+ {
+ return comparatorType_;
+ }
}
class ColumnTimestampComparator extends AbstractColumnComparator
{
- ColumnTimestampComparator()
- {
- super(ColumnComparatorFactory.ComparatorType.TIMESTAMP);
- }
+ ColumnTimestampComparator()
+ {
+ super(ColumnComparatorFactory.ComparatorType.TIMESTAMP);
+ }
- /* if the time-stamps are the same then sort by names */
+ /* if the time-stamps are the same then sort by names */
public int compare(IColumn column1, IColumn column2)
{
- assert column1.getClass() == column2.getClass();
- /* inverse sort by time to get hte latest first */
- long result = column2.timestamp() - column1.timestamp();
- int finalResult = 0;
- if (result == 0)
- {
- result = column1.name().compareTo(column2.name());
- }
- if (result > 0)
- {
- finalResult = 1;
- }
- if (result < 0)
- {
- finalResult = -1;
- }
+ /* inverse sort by time to get hte latest first */
+ long result = column2.timestamp() - column1.timestamp();
+ int finalResult = 0;
+ if(result == 0)
+ {
+ result = column1.name().compareTo(column2.name());
+ }
+ if(result > 0)
+ {
+ finalResult = 1;
+ }
+ if( result < 0 )
+ {
+ finalResult = -1;
+ }
return finalResult;
}
}
class ColumnNameComparator extends AbstractColumnComparator
{
- ColumnNameComparator()
- {
- super(ColumnComparatorFactory.ComparatorType.NAME);
- }
+ ColumnNameComparator()
+ {
+ super(ColumnComparatorFactory.ComparatorType.NAME);
+ }
/* if the names are the same then sort by time-stamps */
public int compare(IColumn column1, IColumn column2)
{
- assert column1.getClass() == column2.getClass();
- long result = column1.name().compareTo(column2.name());
- int finalResult = 0;
- if (result == 0 && (column1 instanceof Column))
- {
- /* inverse sort by time to get the latest first */
- result = column2.timestamp() - column1.timestamp();
- }
- if (result > 0)
- {
- finalResult = 1;
- }
- if (result < 0)
- {
- finalResult = -1;
- }
+ long result = column1.name().compareTo(column2.name());
+ int finalResult = 0;
+ if(result == 0)
+ {
+ /* inverse sort by time to get hte latest first */
+ result = column2.timestamp() - column1.timestamp();
+ }
+ if(result > 0)
+ {
+ finalResult = 1;
+ }
+ if( result < 0 )
+ {
+ finalResult = -1;
+ }
return finalResult;
}
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java Fri Mar 27 05:39:40 2009
@@ -18,29 +18,28 @@
package org.apache.cassandra.db;
+import java.io.DataInput;
import java.io.DataInputStream;
+import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.Serializable;
import java.lang.reflect.Proxy;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.lang.StringUtils;
-import org.apache.log4j.Logger;
-
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.HashingSchemes;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.io.*;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
*/
-public final class ColumnFamily
+
+public final class ColumnFamily implements Serializable
{
private static ICompactSerializer2<ColumnFamily> serializer_;
public static final short utfPrefix_ = 2;
@@ -57,7 +56,7 @@
/* TODO: These are the various column types. Hard coded for now. */
columnTypes_.put("Standard", "Standard");
columnTypes_.put("Super", "Super");
-
+
indexTypes_.put("Name", "Name");
indexTypes_.put("Time", "Time");
}
@@ -71,7 +70,7 @@
* This method returns the serializer whose methods are
* preprocessed by a dynamic proxy.
*/
- public static ICompactSerializer2<ColumnFamily> serializerWithIndexes()
+ public static ICompactSerializer2<ColumnFamily> serializer2()
{
return (ICompactSerializer2<ColumnFamily>)Proxy.newProxyInstance( ColumnFamily.class.getClassLoader(), new Class[]{ICompactSerializer2.class}, new CompactSerializerInvocationHandler<ColumnFamily>(serializer_) );
}
@@ -95,9 +94,9 @@
private String name_;
- private transient ICompactSerializer2<IColumn> columnSerializer_;
- private long markedForDeleteAt = Long.MIN_VALUE;
- private AtomicInteger size_ = new AtomicInteger(0);
+ private transient ICompactSerializer2<IColumn> columnSerializer_;
+ private transient AtomicBoolean isMarkedForDelete_;
+ private AtomicInteger size_ = new AtomicInteger(0);
private EfficientBidiMap columns_;
private Comparator<IColumn> columnComparator_;
@@ -123,16 +122,20 @@
return columnComparator_;
}
+
+ ColumnFamily()
+ {
+ }
- public ColumnFamily(String cfName)
+ public ColumnFamily(String cf)
{
- name_ = cfName;
+ name_ = cf;
createColumnFactoryAndColumnSerializer();
}
- public ColumnFamily(String cfName, String columnType)
+ public ColumnFamily(String cf, String columnType)
{
- this(cfName);
+ name_ = cf;
createColumnFactoryAndColumnSerializer(columnType);
}
@@ -165,7 +168,7 @@
ColumnFamily cloneMe()
{
ColumnFamily cf = new ColumnFamily(name_);
- cf.markedForDeleteAt = markedForDeleteAt;
+ cf.isMarkedForDelete_ = isMarkedForDelete_;
cf.columns_ = columns_.cloneMe();
return cf;
}
@@ -175,15 +178,18 @@
return name_;
}
- /*
+ /**
* We need to go through each column
* in the column family and resolve it before adding
*/
void addColumns(ColumnFamily cf)
{
- for (IColumn column : cf.getAllColumns())
+ Map<String, IColumn> columns = cf.getColumns();
+ Set<String> cNames = columns.keySet();
+
+ for ( String cName : cNames )
{
- addColumn(column);
+ addColumn(cName, columns.get(cName));
}
}
@@ -193,9 +199,10 @@
return columnSerializer_;
}
- public void addColumn(String name)
+ public void createColumn(String name)
{
- addColumn(columnFactory_.createColumn(name));
+ IColumn column = columnFactory_.createColumn(name);
+ addColumn(column.name(), column);
}
int getColumnCount()
@@ -204,7 +211,7 @@
Map<String, IColumn> columns = columns_.getColumns();
if( columns != null )
{
- if(!isSuper())
+ if(!DatabaseDescriptor.getColumnType(name_).equals("Super"))
{
count = columns.size();
}
@@ -220,26 +227,17 @@
return count;
}
- public boolean isSuper()
- {
- return DatabaseDescriptor.getColumnType(name_).equals("Super");
- }
-
- public void addColumn(String name, byte[] value)
+ public void createColumn(String name, byte[] value)
{
- addColumn(name, value, 0);
+ IColumn column = columnFactory_.createColumn(name, value);
+ addColumn(column.name(), column);
}
- public void addColumn(String name, byte[] value, long timestamp)
- {
- addColumn(name, value, timestamp, false);
- }
-
- public void addColumn(String name, byte[] value, long timestamp, boolean deleted)
+ public void createColumn(String name, byte[] value, long timestamp)
{
- IColumn column = columnFactory_.createColumn(name, value, timestamp, deleted);
- addColumn(column);
- }
+ IColumn column = columnFactory_.createColumn(name, value, timestamp);
+ addColumn(column.name(), column);
+ }
void clear()
{
@@ -250,30 +248,29 @@
* If we find an old column that has the same name
* the ask it to resolve itself else add the new column .
*/
- void addColumn(IColumn column)
+ void addColumn(String name, IColumn column)
{
- String name = column.name();
+ int newSize = 0;
IColumn oldColumn = columns_.get(name);
- if (oldColumn != null)
+ if ( oldColumn != null )
{
- if (oldColumn instanceof SuperColumn)
+ int oldSize = oldColumn.size();
+ if( oldColumn.putColumn(column))
{
- int oldSize = oldColumn.size();
- ((SuperColumn) oldColumn).putColumn(column);
- size_.addAndGet(oldColumn.size() - oldSize);
+ // This will never be called for super column as put column always returns false.
+ columns_.put(name, column);
+ newSize = column.size();
}
else
{
- if (oldColumn.timestamp() <= column.timestamp())
- {
- columns_.put(name, column);
- size_.addAndGet(column.size());
- }
+ newSize = oldColumn.size();
}
+ size_.addAndGet(newSize - oldSize);
}
else
{
- size_.addAndGet(column.size());
+ newSize = column.size();
+ size_.addAndGet(newSize);
columns_.put(name, column);
}
}
@@ -283,12 +280,12 @@
return columns_.get( name );
}
- public SortedSet<IColumn> getAllColumns()
+ public Collection<IColumn> getAllColumns()
{
return columns_.getSortedColumns();
}
- public Map<String, IColumn> getColumns()
+ Map<String, IColumn> getColumns()
{
return columns_.getColumns();
}
@@ -298,14 +295,17 @@
columns_.remove(columnName);
}
- void delete(long timestamp)
+ void delete()
{
- markedForDeleteAt = timestamp;
+ if ( isMarkedForDelete_ == null )
+ isMarkedForDelete_ = new AtomicBoolean(true);
+ else
+ isMarkedForDelete_.set(true);
}
- public boolean isMarkedForDelete()
+ boolean isMarkedForDelete()
{
- return markedForDeleteAt > Long.MIN_VALUE;
+ return ( ( isMarkedForDelete_ == null ) ? false : isMarkedForDelete_.get() );
}
/*
@@ -334,8 +334,28 @@
*/
void repair(ColumnFamily columnFamily)
{
- for (IColumn column : columnFamily.getAllColumns()) {
- addColumn(column);
+ Map<String, IColumn> columns = columnFamily.getColumns();
+ Set<String> cNames = columns.keySet();
+
+ for ( String cName : cNames )
+ {
+ IColumn columnInternal = columns_.get(cName);
+ IColumn columnExternal = columns.get(cName);
+
+ if( columnInternal == null )
+ {
+ if(DatabaseDescriptor.getColumnFamilyType(name_).equals(ColumnFamily.getColumnType("Super")))
+ {
+ columnInternal = new SuperColumn(columnExternal.name());
+ columns_.put(cName, columnInternal);
+ }
+ if(DatabaseDescriptor.getColumnFamilyType(name_).equals(ColumnFamily.getColumnType("Standard")))
+ {
+ columnInternal = columnExternal;
+ columns_.put(cName, columnInternal);
+ }
+ }
+ columnInternal.repair(columnExternal);
}
}
@@ -357,14 +377,14 @@
IColumn columnExternal = columns.get(cName);
if( columnInternal == null )
{
- cfDiff.addColumn(columnExternal);
+ cfDiff.addColumn(cName, columnExternal);
}
else
{
IColumn columnDiff = columnInternal.diff(columnExternal);
if(columnDiff != null)
{
- cfDiff.addColumn(columnDiff);
+ cfDiff.addColumn(cName, columnDiff);
}
}
}
@@ -403,174 +423,193 @@
public String toString()
{
StringBuilder sb = new StringBuilder();
- sb.append("ColumnFamily(");
sb.append(name_);
+ sb.append(":");
+ sb.append(isMarkedForDelete());
+ sb.append(":");
+ Collection<IColumn> columns = getAllColumns();
+ sb.append(columns.size());
+ sb.append(":");
- if (isMarkedForDelete()) {
- sb.append(" -delete at " + getMarkedForDeleteAt() + "-");
+ for ( IColumn column : columns )
+ {
+ sb.append(column.toString());
}
-
- sb.append(" [");
- sb.append(StringUtils.join(getAllColumns(), ", "));
- sb.append("])");
-
+ sb.append(":");
return sb.toString();
}
public byte[] digest()
{
Set<IColumn> columns = columns_.getSortedColumns();
- byte[] xorHash = null;
+ byte[] xorHash = new byte[0];
+ byte[] tmpHash = new byte[0];
for(IColumn column : columns)
{
- if(xorHash == null)
+ if(xorHash.length == 0)
{
xorHash = column.digest();
}
else
{
- xorHash = FBUtilities.xor(xorHash, column.digest());
+ tmpHash = column.digest();
+ xorHash = FBUtilities.xor(xorHash, tmpHash);
}
}
return xorHash;
}
+}
- public long getMarkedForDeleteAt() {
- return markedForDeleteAt;
+class ColumnFamilySerializer implements ICompactSerializer2<ColumnFamily>
+{
+ /*
+ * We are going to create indexes, and write out that information as well. The format
+ * of the data serialized is as follows.
+ *
+ * 1) Without indexes:
+ * // written by the data
+ * <boolean false (index is not present)>
+ * <column family id>
+ * <is marked for delete>
+ * <total number of columns>
+ * <columns data>
+
+ * <boolean true (index is present)>
+ *
+ * This part is written by the column indexer
+ * <size of index in bytes>
+ * <list of column names and their offsets relative to the first column>
+ *
+ * <size of the cf in bytes>
+ * <column family id>
+ * <is marked for delete>
+ * <total number of columns>
+ * <columns data>
+ */
+ public void serialize(ColumnFamily columnFamily, DataOutputStream dos) throws IOException
+ {
+ Collection<IColumn> columns = columnFamily.getAllColumns();
+
+ /* write the column family id */
+ dos.writeUTF(columnFamily.name());
+ /* write if this cf is marked for delete */
+ dos.writeBoolean(columnFamily.isMarkedForDelete());
+ /* write the size is the number of columns */
+ dos.writeInt(columns.size());
+ /* write the column data */
+ for ( IColumn column : columns )
+ {
+ columnFamily.getColumnSerializer().serialize(column, dos);
+ }
}
- public static class ColumnFamilySerializer implements ICompactSerializer2<ColumnFamily>
+ /*
+ * Use this method to create a bare bones Column Family. This column family
+ * does not have any of the Column information.
+ */
+ private ColumnFamily defreezeColumnFamily(DataInputStream dis) throws IOException
{
- /*
- * We are going to create indexes, and write out that information as well. The format
- * of the data serialized is as follows.
- *
- * 1) Without indexes:
- * // written by the data
- * <boolean false (index is not present)>
- * <column family id>
- * <is marked for delete>
- * <total number of columns>
- * <columns data>
-
- * <boolean true (index is present)>
- *
- * This part is written by the column indexer
- * <size of index in bytes>
- * <list of column names and their offsets relative to the first column>
- *
- * <size of the cf in bytes>
- * <column family id>
- * <is marked for delete>
- * <total number of columns>
- * <columns data>
- */
- public void serialize(ColumnFamily columnFamily, DataOutputStream dos) throws IOException
- {
- Collection<IColumn> columns = columnFamily.getAllColumns();
-
- /* write the column family id */
- dos.writeUTF(columnFamily.name());
- /* write if this cf is marked for delete */
- dos.writeLong(columnFamily.getMarkedForDeleteAt());
-
- /* write the size is the number of columns */
- dos.writeInt(columns.size());
+ String name = dis.readUTF();
+ boolean delete = dis.readBoolean();
+ ColumnFamily cf = new ColumnFamily(name);
+ if ( delete )
+ cf.delete();
+ return cf;
+ }
- /* write the column data */
- for ( IColumn column : columns )
- {
- columnFamily.getColumnSerializer().serialize(column, dos);
- }
+ /*
+ * This method fills the Column Family object with the column information
+ * from the DataInputStream. The "items" parameter tells us whether we need
+ * all the columns or just a subset of all the Columns that make up the
+ * Column Family. If "items" is -1 then we need all the columns if not we
+ * deserialize only as many columns as indicated by the "items" parameter.
+ */
+ private void fillColumnFamily(ColumnFamily cf, DataInputStream dis) throws IOException
+ {
+ int size = dis.readInt();
+ IColumn column = null;
+ for ( int i = 0; i < size; ++i )
+ {
+ column = cf.getColumnSerializer().deserialize(dis);
+ if(column != null)
+ {
+ cf.addColumn(column.name(), column);
+ }
}
+ }
- /*
- * Use this method to create a bare bones Column Family. This column family
- * does not have any of the Column information.
- */
- private ColumnFamily defreezeColumnFamily(DataInputStream dis) throws IOException
- {
- String name = dis.readUTF();
- ColumnFamily cf = new ColumnFamily(name);
- cf.delete(dis.readLong());
- return cf;
- }
-
- public ColumnFamily deserialize(DataInputStream dis) throws IOException
- {
- ColumnFamily cf = defreezeColumnFamily(dis);
- int size = dis.readInt();
- IColumn column = null;
- for ( int i = 0; i < size; ++i )
- {
- column = cf.getColumnSerializer().deserialize(dis);
- if(column != null)
- {
- cf.addColumn(column);
- }
- }
- return cf;
- }
+ public ColumnFamily deserialize(DataInputStream dis) throws IOException
+ {
+ ColumnFamily cf = defreezeColumnFamily(dis);
+ if ( !cf.isMarkedForDelete() )
+ fillColumnFamily(cf,dis);
+ return cf;
+ }
- /*
- * This version of deserialize is used when we need a specific set if columns for
- * a column family specified in the name cfName parameter.
- */
- public ColumnFamily deserialize(DataInputStream dis, IFilter filter) throws IOException
- {
- ColumnFamily cf = defreezeColumnFamily(dis);
- int size = dis.readInt();
- IColumn column = null;
+ /*
+ * This version of deserialize is used when we need a specific set if columns for
+ * a column family specified in the name cfName parameter.
+ */
+ public ColumnFamily deserialize(DataInputStream dis, IFilter filter) throws IOException
+ {
+ ColumnFamily cf = defreezeColumnFamily(dis);
+ if ( !cf.isMarkedForDelete() )
+ {
+ int size = dis.readInt();
+ IColumn column = null;
for ( int i = 0; i < size; ++i )
{
- column = cf.getColumnSerializer().deserialize(dis, filter);
- if(column != null)
- {
- cf.addColumn(column);
- column = null;
- if(filter.isDone())
- {
- break;
- }
- }
+ column = cf.getColumnSerializer().deserialize(dis, filter);
+ if(column != null)
+ {
+ cf.addColumn(column.name(), column);
+ column = null;
+ if(filter.isDone())
+ {
+ break;
+ }
+ }
}
- return cf;
}
+ return cf;
+ }
- /*
- * Deserialize a particular column or super column or the entire columnfamily given a : seprated name
- * name could be of the form cf:superColumn:column or cf:column or cf
- */
- public ColumnFamily deserialize(DataInputStream dis, String name, IFilter filter) throws IOException
- {
- String[] names = RowMutation.getColumnAndColumnFamily(name);
- String columnName = "";
- if ( names.length == 1 )
- return deserialize(dis, filter);
- if( names.length == 2 )
- columnName = names[1];
- if( names.length == 3 )
- columnName = names[1]+ ":" + names[2];
+ /*
+ * Deserialize a particular column or super column or the entire columnfamily given a : seprated name
+ * name could be of the form cf:superColumn:column or cf:column or cf
+ */
+ public ColumnFamily deserialize(DataInputStream dis, String name, IFilter filter) throws IOException
+ {
+ String[] names = RowMutation.getColumnAndColumnFamily(name);
+ String columnName = "";
+ if ( names.length == 1 )
+ return deserialize(dis, filter);
+ if( names.length == 2 )
+ columnName = names[1];
+ if( names.length == 3 )
+ columnName = names[1]+ ":" + names[2];
- ColumnFamily cf = defreezeColumnFamily(dis);
+ ColumnFamily cf = defreezeColumnFamily(dis);
+ if ( !cf.isMarkedForDelete() )
+ {
/* read the number of columns */
- int size = dis.readInt();
+ int size = dis.readInt();
for ( int i = 0; i < size; ++i )
{
- IColumn column = cf.getColumnSerializer().deserialize(dis, columnName, filter);
- if ( column != null )
- {
- cf.addColumn(column);
- break;
- }
+ IColumn column = cf.getColumnSerializer().deserialize(dis, columnName, filter);
+ if ( column != null )
+ {
+ cf.addColumn(column.name(), column);
+ break;
+ }
}
- return cf;
}
+ return cf;
+ }
- public void skip(DataInputStream dis) throws IOException
- {
- throw new UnsupportedOperationException("This operation is not yet supported.");
- }
+ public void skip(DataInputStream dis) throws IOException
+ {
+ throw new UnsupportedOperationException("This operation is not yet supported.");
}
-}
+}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java Fri Mar 27 05:39:40 2009
@@ -18,27 +18,18 @@
package org.apache.cassandra.db;
-import java.io.File;
-import java.io.IOException;
+import java.io.*;
import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Set;
-import java.util.StringTokenizer;
+import java.util.*;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.log4j.Logger;
-
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.DataInputBuffer;
@@ -47,10 +38,16 @@
import org.apache.cassandra.io.SSTable;
import org.apache.cassandra.io.SequenceFile;
import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.PartitionerType;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.BloomFilter;
import org.apache.cassandra.utils.FileUtils;
import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.utils.*;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -80,7 +77,7 @@
private ReentrantReadWriteLock lock_ = new ReentrantReadWriteLock(true);
/* Flag indicates if a compaction is in process */
- private AtomicBoolean isCompacting_ = new AtomicBoolean(false);
+ public AtomicBoolean isCompacting_ = new AtomicBoolean(false);
ColumnFamilyStore(String table, String columnFamily) throws IOException
{
@@ -130,7 +127,7 @@
for (File file : files)
{
String filename = file.getName();
- if(((file.length() == 0) || (filename.contains("-" + SSTable.temporaryFile_)) ) && (filename.contains(columnFamily_)))
+ if(((file.length() == 0) || (filename.indexOf("-" + SSTable.temporaryFile_) != -1) ) && (filename.indexOf(columnFamily_) != -1))
{
file.delete();
continue;
@@ -139,7 +136,7 @@
String[] tblCfName = getTableAndColumnFamilyName(filename);
if (tblCfName[0].equals(table_)
&& tblCfName[1].equals(columnFamily_)
- && filename.contains("-Data.db"))
+ && filename.indexOf("-Data.db") != -1)
{
ssTables.add(file.getAbsoluteFile());
}
@@ -177,7 +174,7 @@
* disk and the total space oocupied by the data files
* associated with this Column Family.
*/
- public String cfStats(String newLineSeparator)
+ public String cfStats(String newLineSeparator, java.text.DecimalFormat df)
{
StringBuilder sb = new StringBuilder();
/*
@@ -260,7 +257,7 @@
if( ranges != null)
futurePtr = MinorCompactionManager.instance().submit(ColumnFamilyStore.this, ranges, target, fileList);
else
- MinorCompactionManager.instance().submitMajor(ColumnFamilyStore.this, skip);
+ MinorCompactionManager.instance().submitMajor(ColumnFamilyStore.this, ranges, skip);
boolean result = true;
try
@@ -333,7 +330,8 @@
{
// Psuedo increment so that we do not generate consecutive numbers
fileIndexGenerator_.incrementAndGet();
- return table_ + "-" + columnFamily_ + "-" + fileIndexGenerator_.incrementAndGet();
+ String name = table_ + "-" + columnFamily_ + "-" + fileIndexGenerator_.incrementAndGet();
+ return name;
}
/*
@@ -343,7 +341,8 @@
{
// Psuedo increment so that we do not generate consecutive numbers
fileIndexGenerator_.incrementAndGet();
- return table_ + "-" + columnFamily_ + "-" + SSTable.temporaryFile_ + "-" + fileIndexGenerator_.incrementAndGet();
+ String name = table_ + "-" + columnFamily_ + "-" + SSTable.temporaryFile_ + "-" + fileIndexGenerator_.incrementAndGet() ;
+ return name;
}
/*
@@ -364,8 +363,9 @@
lowestIndex = getIndexFromFileName(files.get(0));
index = lowestIndex + 1 ;
-
- return table_ + "-" + columnFamily_ + "-" + SSTable.temporaryFile_ + "-" + index;
+
+ String name = table_ + "-" + columnFamily_ + "-" + SSTable.temporaryFile_ + "-" + index ;
+ return name;
}
@@ -383,6 +383,14 @@
}
/*
+ * This version is used when we forceflush.
+ */
+ void switchMemtable() throws IOException
+ {
+ memtable_.set( new Memtable(table_, columnFamily_) );
+ }
+
+ /*
* This version is used only on start up when we are recovering from logs.
* In the future we may want to parellelize the log processing for a table
* by having a thread per log file present for recovery. Re-visit at that
@@ -394,14 +402,14 @@
binaryMemtable_.get().put(key, buffer);
}
- void forceFlush() throws IOException
+ void forceFlush(boolean fRecovery) throws IOException
{
//MemtableManager.instance().submit(getColumnFamilyName(), memtable_.get() , CommitLog.CommitLogContext.NULL);
//memtable_.get().flush(true, CommitLog.CommitLogContext.NULL);
- memtable_.get().forceflush(this);
+ memtable_.get().forceflush(this, fRecovery);
}
- void forceFlushBinary()
+ void forceFlushBinary() throws IOException
{
BinaryMemtableManager.instance().submit(getColumnFamilyName(), binaryMemtable_.get());
//binaryMemtable_.get().flush(true);
@@ -430,36 +438,66 @@
binaryMemtable_.get().put(key, buffer);
}
- public ColumnFamily getColumnFamily(String key, String columnFamilyColumn, IFilter filter) throws IOException
- {
- List<ColumnFamily> columnFamilies = getColumnFamilies(key, columnFamilyColumn, filter);
- return resolveAndRemoveDeleted(columnFamilies);
- }
-
/**
*
* Get the column family in the most efficient order.
* 1. Memtable
* 2. Sorted list of files
*/
- List<ColumnFamily> getColumnFamilies(String key, String columnFamilyColumn, IFilter filter) throws IOException
+ public ColumnFamily getColumnFamily(String key, String cf, IFilter filter) throws IOException
{
- List<ColumnFamily> columnFamilies1 = new ArrayList<ColumnFamily>();
+ List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
+ ColumnFamily columnFamily = null;
+ long start = System.currentTimeMillis();
/* Get the ColumnFamily from Memtable */
- getColumnFamilyFromCurrentMemtable(key, columnFamilyColumn, filter, columnFamilies1);
- if (columnFamilies1.size() == 0 || !filter.isDone())
+ getColumnFamilyFromCurrentMemtable(key, cf, filter, columnFamilies);
+ if(columnFamilies.size() != 0)
{
- /* Check if MemtableManager has any historical information */
- MemtableManager.instance().getColumnFamily(key, columnFamily_, columnFamilyColumn, filter, columnFamilies1);
+ if(filter.isDone())
+ return columnFamilies.get(0);
}
- List<ColumnFamily> columnFamilies = columnFamilies1;
- if (columnFamilies.size() == 0 || !filter.isDone())
+ /* Check if MemtableManager has any historical information */
+ MemtableManager.instance().getColumnFamily(key, columnFamily_, cf, filter, columnFamilies);
+ if(columnFamilies.size() != 0)
+ {
+ columnFamily = resolve(columnFamilies);
+ if(filter.isDone())
+ return columnFamily;
+ columnFamilies.clear();
+ columnFamilies.add(columnFamily);
+ }
+ getColumnFamilyFromDisk(key, cf, columnFamilies, filter);
+ logger_.debug("DISK TIME: " + (System.currentTimeMillis() - start)
+ + " ms.");
+ columnFamily = resolve(columnFamilies);
+
+ return columnFamily;
+ }
+
+ public ColumnFamily getColumnFamilyFromMemory(String key, String cf, IFilter filter)
+ {
+ List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
+ ColumnFamily columnFamily = null;
+ long start = System.currentTimeMillis();
+ /* Get the ColumnFamily from Memtable */
+ getColumnFamilyFromCurrentMemtable(key, cf, filter, columnFamilies);
+ if(columnFamilies.size() != 0)
{
- long start = System.currentTimeMillis();
- getColumnFamilyFromDisk(key, columnFamilyColumn, columnFamilies, filter);
- logger_.debug("DISK TIME: " + (System.currentTimeMillis() - start) + " ms.");
+ if(filter.isDone())
+ return columnFamilies.get(0);
+ }
+ /* Check if MemtableManager has any historical information */
+ MemtableManager.instance().getColumnFamily(key, columnFamily_, cf, filter, columnFamilies);
+ if(columnFamilies.size() != 0)
+ {
+ columnFamily = resolve(columnFamilies);
+ if(filter.isDone())
+ return columnFamily;
+ columnFamilies.clear();
+ columnFamilies.add(columnFamily);
}
- return columnFamilies;
+ columnFamily = resolve(columnFamilies);
+ return columnFamily;
}
/**
@@ -499,14 +537,33 @@
long start = System.currentTimeMillis();
if (columnFamily != null)
{
+ /*
+ * TODO
+ * By using the filter before removing deleted columns
+ * we have a efficient implementation of timefilter
+ * but for count filter this can return wrong results
+ * we need to take care of that later.
+ */
+ /* suppress columns marked for delete */
+ Map<String, IColumn> columns = columnFamily.getColumns();
+ Set<String> cNames = columns.keySet();
+
+ for (String cName : cNames)
+ {
+ IColumn column = columns.get(cName);
+ if (column.isMarkedForDelete())
+ columns.remove(cName);
+ }
columnFamilies.add(columnFamily);
if(filter.isDone())
{
break;
}
}
- logger_.debug("DISK Data structure population TIME: " + (System.currentTimeMillis() - start) + " ms.");
+ logger_.debug("DISK Data structure population TIME: " + (System.currentTimeMillis() - start)
+ + " ms.");
}
+ files.clear();
}
@@ -520,11 +577,12 @@
if (bufIn.getLength() == 0)
return null;
start = System.currentTimeMillis();
- ColumnFamily columnFamily = ColumnFamily.serializer().deserialize(bufIn, cf, filter);
+ ColumnFamily columnFamily = null;
+ columnFamily = ColumnFamily.serializer().deserialize(bufIn, cf, filter);
logger_.debug("DISK Deserialize TIME: " + (System.currentTimeMillis() - start) + " ms.");
if (columnFamily == null)
- return null;
- return columnFamily;
+ return columnFamily;
+ return (!columnFamily.isMarkedForDelete()) ? columnFamily : null;
}
private void getColumnFamilyFromCurrentMemtable(String key, String cf, IFilter filter, List<ColumnFamily> columnFamilies)
@@ -533,66 +591,20 @@
ColumnFamily columnFamily = memtable_.get().get(key, cf, filter);
if (columnFamily != null)
{
- columnFamilies.add(columnFamily);
+ if (!columnFamily.isMarkedForDelete())
+ columnFamilies.add(columnFamily);
}
}
- /** merge all columnFamilies into a single instance, with only the newest versions of columns preserved. */
- static ColumnFamily resolve(List<ColumnFamily> columnFamilies)
+ private ColumnFamily resolve(List<ColumnFamily> columnFamilies)
{
int size = columnFamilies.size();
if (size == 0)
- return null;
-
- // start from nothing so that we don't include potential deleted columns from the first instance
- String cfname = columnFamilies.get(0).name();
- ColumnFamily cf = new ColumnFamily(cfname);
-
- // merge
- for (ColumnFamily cf2 : columnFamilies)
- {
- assert cf.name().equals(cf2.name());
- cf.addColumns(cf2);
- cf.delete(Math.max(cf.getMarkedForDeleteAt(), cf2.getMarkedForDeleteAt()));
- }
- return cf;
- }
-
- /** like resolve, but leaves the resolved CF as the only item in the list */
- private static void merge(List<ColumnFamily> columnFamilies)
- {
- ColumnFamily cf = resolve(columnFamilies);
- columnFamilies.clear();
- columnFamilies.add(cf);
- }
-
- private static ColumnFamily resolveAndRemoveDeleted(List<ColumnFamily> columnFamilies) {
- ColumnFamily cf = resolve(columnFamilies);
- return removeDeleted(cf);
- }
-
- static ColumnFamily removeDeleted(ColumnFamily cf) {
- if (cf == null) {
- return null;
- }
- for (String cname : new ArrayList<String>(cf.getColumns().keySet())) {
- IColumn c = cf.getColumns().get(cname);
- if (c instanceof SuperColumn) {
- long min_timestamp = Math.max(c.getMarkedForDeleteAt(), cf.getMarkedForDeleteAt());
- // don't operate directly on the supercolumn, it could be the one in the memtable
- cf.remove(cname);
- IColumn sc = new SuperColumn(cname);
- for (IColumn subColumn : c.getSubColumns()) {
- if (!subColumn.isMarkedForDelete() && subColumn.timestamp() >= min_timestamp) {
- sc.addColumn(subColumn.name(), subColumn);
- }
- }
- if (sc.getSubColumns().size() > 0) {
- cf.addColumn(sc);
- }
- } else if (c.isMarkedForDelete() || c.timestamp() < cf.getMarkedForDeleteAt()) {
- cf.remove(cname);
- }
+ return null;
+ ColumnFamily cf = columnFamilies.get(0);
+ for ( int i = 1; i < size ; ++i )
+ {
+ cf.addColumns(columnFamilies.get(i));
}
return cf;
}
@@ -605,7 +617,19 @@
*/
void applyNow(String key, ColumnFamily columnFamily) throws IOException
{
- memtable_.get().putOnRecovery(key, columnFamily);
+ if (!columnFamily.isMarkedForDelete())
+ memtable_.get().putOnRecovery(key, columnFamily);
+ }
+
+ /*
+ * Delete doesn't mean we can blindly delete. We need to write this to disk
+ * as being marked for delete. This is to prevent a previous value from
+ * resuscitating a column family that has been deleted.
+ */
+ void delete(String key, ColumnFamily columnFamily)
+ throws IOException
+ {
+ memtable_.get().remove(key, columnFamily);
}
/*
@@ -629,7 +653,7 @@
* param @ filename - filename just flushed to disk
* param @ bf - bloom filter which indicates the keys that are in this file.
*/
- void storeLocation(String filename, BloomFilter bf)
+ void storeLocation(String filename, BloomFilter bf) throws IOException
{
boolean doCompaction = false;
int ssTableSize = 0;
@@ -664,7 +688,7 @@
}
}
- PriorityQueue<FileStruct> initializePriorityQueue(List<String> files, List<Range> ranges, int minBufferSize)
+ PriorityQueue<FileStruct> initializePriorityQueue(List<String> files, List<Range> ranges, int minBufferSize) throws IOException
{
PriorityQueue<FileStruct> pq = new PriorityQueue<FileStruct>();
if (files.size() > 1 || (ranges != null && files.size() > 0))
@@ -675,9 +699,13 @@
{
try
{
- fs = new FileStruct(SequenceFile.bufferedReader(file, bufferSize));
- fs.getNextKey();
- if(fs.isExhausted())
+ fs = new FileStruct();
+ fs.bufIn_ = new DataInputBuffer();
+ fs.bufOut_ = new DataOutputBuffer();
+ fs.reader_ = SequenceFile.bufferedReader(file, bufferSize);
+ fs.key_ = null;
+ fs = getNextKey(fs);
+ if(fs == null)
continue;
pq.add(fs);
}
@@ -686,16 +714,17 @@
ex.printStackTrace();
try
{
- if (fs != null)
+ if(fs != null)
{
- fs.close();
+ fs.reader_.close();
}
}
catch(Exception e)
{
logger_.warn("Unable to close file :" + file);
}
- }
+ continue;
+ }
}
}
return pq;
@@ -747,7 +776,7 @@
/*
* Break the files into buckets and then compact.
*/
- void doCompaction()
+ void doCompaction() throws IOException
{
isCompacting_.set(true);
List<String> files = new ArrayList<String>(ssTables_);
@@ -789,6 +818,7 @@
{
isCompacting_.set(false);
}
+ return;
}
void doMajorCompaction(long skip) throws IOException
@@ -796,13 +826,18 @@
doMajorCompactionInternal( skip );
}
+ void doMajorCompaction() throws IOException
+ {
+ doMajorCompactionInternal( 0 );
+ }
+
/*
* Compact all the files irrespective of the size.
* skip : is the ammount in Gb of the files to be skipped
* all files greater than skip GB are skipped for this compaction.
* Except if skip is 0 , in that case this is ignored and all files are taken.
*/
- void doMajorCompactionInternal(long skip)
+ void doMajorCompactionInternal(long skip) throws IOException
{
isCompacting_.set(true);
List<String> filesInternal = new ArrayList<String>(ssTables_);
@@ -835,6 +870,7 @@
{
isCompacting_.set(false);
}
+ return ;
}
/*
@@ -872,14 +908,41 @@
return maxFile;
}
- boolean doAntiCompaction(List<Range> ranges, EndPoint target, List<String> fileList)
+ Range getMaxRange( List<Range> ranges )
+ {
+ Range maxRange = new Range( BigInteger.ZERO, BigInteger.ZERO );
+ for( Range range : ranges)
+ {
+ if( range.left().compareTo(maxRange.left()) > 0 )
+ {
+ maxRange = range;
+ }
+ }
+ return maxRange;
+ }
+
+ boolean isLoopAround ( List<Range> ranges )
+ {
+ boolean isLoop = false;
+ for( Range range : ranges)
+ {
+ if( range.left().compareTo(range.right()) > 0 )
+ {
+ isLoop = true;
+ break;
+ }
+ }
+ return isLoop;
+ }
+
+ boolean doAntiCompaction(List<Range> ranges, EndPoint target, List<String> fileList) throws IOException
{
isCompacting_.set(true);
List<String> files = new ArrayList<String>(ssTables_);
boolean result = true;
try
{
- result = doFileAntiCompaction(files, ranges, target, fileList, null);
+ result = doFileAntiCompaction(files, ranges, target, bufSize_, fileList, null);
}
catch ( Exception ex)
{
@@ -893,6 +956,38 @@
}
+ /*
+ * Read the next key from the data file , this fn will skip teh block index
+ * and read teh next available key into the filestruct that is passed.
+ * If it cannot read or a end of file is reached it will return null.
+ */
+ FileStruct getNextKey(FileStruct filestruct) throws IOException
+ {
+ filestruct.bufOut_.reset();
+ if (filestruct.reader_.isEOF())
+ {
+ filestruct.reader_.close();
+ return null;
+ }
+
+ long bytesread = filestruct.reader_.next(filestruct.bufOut_);
+ if (bytesread == -1)
+ {
+ filestruct.reader_.close();
+ return null;
+ }
+
+ filestruct.bufIn_.reset(filestruct.bufOut_.getData(), filestruct.bufOut_.getLength());
+ filestruct.key_ = filestruct.bufIn_.readUTF();
+ /* If the key we read is the Block Index Key then we are done reading the keys so exit */
+ if ( filestruct.key_.equals(SSTable.blockIndexKey_) )
+ {
+ filestruct.reader_.close();
+ return null;
+ }
+ return filestruct;
+ }
+
void forceCleanup()
{
MinorCompactionManager.instance().submitCleanup(ColumnFamilyStore.this);
@@ -903,7 +998,7 @@
* and only keeps keys that this node is responsible for.
* @throws IOException
*/
- void doCleanupCompaction()
+ void doCleanupCompaction() throws IOException
{
isCompacting_.set(true);
List<String> files = new ArrayList<String>(ssTables_);
@@ -937,7 +1032,7 @@
Map<EndPoint, List<Range>> endPointtoRangeMap = StorageService.instance().constructEndPointToRangesMap();
myRanges = endPointtoRangeMap.get(StorageService.getLocalStorageEndPoint());
List<BloomFilter> compactedBloomFilters = new ArrayList<BloomFilter>();
- doFileAntiCompaction(files, myRanges, null, newFiles, compactedBloomFilters);
+ doFileAntiCompaction(files, myRanges, null, bufSize_, newFiles, compactedBloomFilters);
logger_.debug("Original file : " + file + " of size " + new File(file).length());
lock_.writeLock().lock();
try
@@ -968,11 +1063,12 @@
* @param files
* @param ranges
* @param target
+ * @param minBufferSize
* @param fileList
* @return
* @throws IOException
*/
- boolean doFileAntiCompaction(List<String> files, List<Range> ranges, EndPoint target, List<String> fileList, List<BloomFilter> compactedBloomFilters)
+ boolean doFileAntiCompaction(List<String> files, List<Range> ranges, EndPoint target, int minBufferSize, List<String> fileList, List<BloomFilter> compactedBloomFilters) throws IOException
{
boolean result = false;
long startTime = System.currentTimeMillis();
@@ -998,7 +1094,7 @@
+ expectedRangeFileSize + " is greater than the safe limit of the disk space available.");
return result;
}
- PriorityQueue<FileStruct> pq = initializePriorityQueue(files, ranges, ColumnFamilyStore.bufSize_);
+ PriorityQueue<FileStruct> pq = initializePriorityQueue(files, ranges, minBufferSize);
if (pq.size() > 0)
{
mergedFileName = getTempFileName();
@@ -1021,11 +1117,11 @@
fs = pq.poll();
}
if (fs != null
- && (lastkey == null || lastkey.compareTo(fs.getKey()) == 0))
+ && (lastkey == null || lastkey.compareTo(fs.key_) == 0))
{
// The keys are the same so we need to add this to the
// ldfs list
- lastkey = fs.getKey();
+ lastkey = fs.key_;
lfs.add(fs);
}
else
@@ -1040,30 +1136,38 @@
try
{
/* read the length although we don't need it */
- filestruct.getBufIn().readInt();
+ filestruct.bufIn_.readInt();
// Skip the Index
- IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
+ IndexHelper.skipBloomFilterAndIndex(filestruct.bufIn_);
// We want to add only 2 and resolve them right there in order to save on memory footprint
if(columnFamilies.size() > 1)
{
// Now merge the 2 column families
- merge(columnFamilies);
+ columnFamily = resolve(columnFamilies);
+ columnFamilies.clear();
+ if( columnFamily != null)
+ {
+ // add the merged columnfamily back to the list
+ columnFamilies.add(columnFamily);
+ }
+
}
// deserialize into column families
- columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
+ columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.bufIn_));
}
catch ( Exception ex)
{
logger_.warn(LogUtil.throwableToString(ex));
- }
+ continue;
+ }
}
// Now after merging all crap append to the sstable
- columnFamily = resolveAndRemoveDeleted(columnFamilies);
+ columnFamily = resolve(columnFamilies);
columnFamilies.clear();
if( columnFamily != null )
{
/* serialize the cf with column indexes */
- ColumnFamily.serializerWithIndexes().serialize(columnFamily, bufOut);
+ ColumnFamily.serializer2().serialize(columnFamily, bufOut);
}
}
else
@@ -1072,17 +1176,17 @@
try
{
/* read the length although we don't need it */
- int size = filestruct.getBufIn().readInt();
- bufOut.write(filestruct.getBufIn(), size);
+ int size = filestruct.bufIn_.readInt();
+ bufOut.write(filestruct.bufIn_, size);
}
catch ( Exception ex)
{
logger_.warn(LogUtil.throwableToString(ex));
- filestruct.close();
+ filestruct.reader_.close();
continue;
}
}
- if ( Range.isKeyInRanges(lastkey, ranges) )
+ if ( Range.isKeyInRanges(ranges, lastkey) )
{
if(ssTableRange == null )
{
@@ -1106,28 +1210,28 @@
{
try
{
- filestruct.getNextKey();
- if (filestruct.isExhausted())
+ filestruct = getNextKey ( filestruct );
+ if(filestruct == null)
{
continue;
}
/* keep on looping until we find a key in the range */
- while ( !Range.isKeyInRanges(filestruct.getKey(), ranges) )
+ while ( !Range.isKeyInRanges(ranges, filestruct.key_ ) )
{
- filestruct.getNextKey();
- if (filestruct.isExhausted())
+ filestruct = getNextKey ( filestruct );
+ if(filestruct == null)
{
break;
}
/* check if we need to continue , if we are done with ranges empty the queue and close all file handles and exit */
- //if( !isLoop && StorageService.token(filestruct.key).compareTo(maxRange.right()) > 0 && !filestruct.key.equals(""))
+ //if( !isLoop && StorageService.hash(filestruct.key).compareTo(maxRange.right()) > 0 && !filestruct.key.equals(""))
//{
//filestruct.reader.close();
//filestruct = null;
//break;
//}
}
- if (!filestruct.isExhausted())
+ if ( filestruct != null)
{
pq.add(filestruct);
}
@@ -1139,8 +1243,9 @@
// in any case we have read as far as possible from it
// and it will be deleted after compaction.
logger_.warn(LogUtil.throwableToString(ex));
- filestruct.close();
- }
+ filestruct.reader_.close();
+ continue;
+ }
}
lfs.clear();
lastkey = null;
@@ -1173,10 +1278,39 @@
+ totalBytesWritten + " Total keys read ..." + totalkeysRead);
return result;
}
-
- private void doFill(BloomFilter bf, String decoratedKey)
+
+ private void doWrite(SSTable ssTable, String key, DataOutputBuffer bufOut) throws IOException
{
- bf.fill(StorageService.getPartitioner().undecorateKey(decoratedKey));
+ PartitionerType pType = StorageService.getPartitionerType();
+ switch ( pType )
+ {
+ case OPHF:
+ ssTable.append(key, bufOut);
+ break;
+
+ default:
+ String[] peices = key.split(":");
+ key = peices[1];
+ BigInteger hash = new BigInteger(peices[0]);
+ ssTable.append(key, hash, bufOut);
+ break;
+ }
+ }
+
+ private void doFill(BloomFilter bf, String key)
+ {
+ PartitionerType pType = StorageService.getPartitionerType();
+ switch ( pType )
+ {
+ case OPHF:
+ bf.fill(key);
+ break;
+
+ default:
+ String[] peices = key.split(":");
+ bf.fill(peices[1]);
+ break;
+ }
}
/*
@@ -1190,7 +1324,7 @@
* to get the latest data.
*
*/
- void doFileCompaction(List<String> files, int minBufferSize)
+ void doFileCompaction(List<String> files, int minBufferSize) throws IOException
{
String newfile = null;
long startTime = System.currentTimeMillis();
@@ -1234,11 +1368,11 @@
fs = pq.poll();
}
if (fs != null
- && (lastkey == null || lastkey.compareTo(fs.getKey()) == 0))
+ && (lastkey == null || lastkey.compareTo(fs.key_) == 0))
{
// The keys are the same so we need to add this to the
// ldfs list
- lastkey = fs.getKey();
+ lastkey = fs.key_;
lfs.add(fs);
}
else
@@ -1253,29 +1387,37 @@
try
{
/* read the length although we don't need it */
- filestruct.getBufIn().readInt();
+ filestruct.bufIn_.readInt();
// Skip the Index
- IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
+ IndexHelper.skipBloomFilterAndIndex(filestruct.bufIn_);
// We want to add only 2 and resolve them right there in order to save on memory footprint
if(columnFamilies.size() > 1)
{
- merge(columnFamilies);
+ // Now merge the 2 column families
+ columnFamily = resolve(columnFamilies);
+ columnFamilies.clear();
+ if( columnFamily != null)
+ {
+ // add the merged columnfamily back to the list
+ columnFamilies.add(columnFamily);
+ }
+
}
// deserialize into column families
- columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
+ columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.bufIn_));
}
catch ( Exception ex)
- {
- logger_.warn("error in filecompaction", ex);
- }
+ {
+ continue;
+ }
}
// Now after merging all crap append to the sstable
- columnFamily = resolveAndRemoveDeleted(columnFamilies);
+ columnFamily = resolve(columnFamilies);
columnFamilies.clear();
if( columnFamily != null )
{
/* serialize the cf with column indexes */
- ColumnFamily.serializerWithIndexes().serialize(columnFamily, bufOut);
+ ColumnFamily.serializer2().serialize(columnFamily, bufOut);
}
}
else
@@ -1284,23 +1426,24 @@
try
{
/* read the length although we don't need it */
- int size = filestruct.getBufIn().readInt();
- bufOut.write(filestruct.getBufIn(), size);
+ int size = filestruct.bufIn_.readInt();
+ bufOut.write(filestruct.bufIn_, size);
}
catch ( Exception ex)
{
ex.printStackTrace();
- filestruct.close();
+ filestruct.reader_.close();
continue;
}
}
if ( ssTable == null )
{
- ssTable = new SSTable(compactionFileLocation, mergedFileName);
+ PartitionerType pType = StorageService.getPartitionerType();
+ ssTable = new SSTable(compactionFileLocation, mergedFileName, pType);
}
- ssTable.append(lastkey, bufOut);
-
+ doWrite(ssTable, lastkey, bufOut);
+
/* Fill the bloom filter with the key */
doFill(compactedBloomFilter, lastkey);
totalkeysWritten++;
@@ -1308,8 +1451,8 @@
{
try
{
- filestruct.getNextKey();
- if (filestruct.isExhausted())
+ filestruct = getNextKey(filestruct);
+ if(filestruct == null)
{
continue;
}
@@ -1321,8 +1464,9 @@
// Ignore the exception as it might be a corrupted file
// in any case we have read as far as possible from it
// and it will be deleted after compaction.
- filestruct.close();
- }
+ filestruct.reader_.close();
+ continue;
+ }
}
lfs.clear();
lastkey = null;
@@ -1373,25 +1517,6 @@
logger_.debug("Total bytes Read for compaction ..." + totalBytesRead);
logger_.debug("Total bytes written for compaction ..."
+ totalBytesWritten + " Total keys read ..." + totalkeysRead);
- }
-
- public boolean isSuper()
- {
- return DatabaseDescriptor.getColumnType(getColumnFamilyName()).equals("Super");
- }
-
- public void flushMemtableOnRecovery() throws IOException
- {
- memtable_.get().flushOnRecovery();
- }
-
- public Object getMemtable()
- {
- return memtable_.get();
- }
-
- public Set<String> getSSTableFilenames()
- {
- return Collections.unmodifiableSet(ssTables_);
+ return;
}
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLog.java?rev=759026&r1=759025&r2=759026&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/CommitLog.java Fri Mar 27 05:39:40 2009
@@ -21,18 +21,22 @@
import java.io.*;
import java.util.*;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.io.IFileReader;
import org.apache.cassandra.io.IFileWriter;
import org.apache.cassandra.io.SequenceFile;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.FileUtils;
import org.apache.cassandra.utils.LogUtil;
import org.apache.log4j.Logger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.cassandra.io.*;
+import org.apache.cassandra.utils.*;
/*
* Commit Log tracks every write operation into the system. The aim
@@ -372,7 +376,7 @@
try
{
Row row = Row.serializer().deserialize(bufIn);
- Map<String, ColumnFamily> columnFamilies = new HashMap<String, ColumnFamily>(row.getColumnFamilyMap());
+ Map<String, ColumnFamily> columnFamilies = new HashMap<String, ColumnFamily>(row.getColumnFamilies());
/* remove column families that have already been flushed */
Set<String> cNames = columnFamilies.keySet();
@@ -419,7 +423,7 @@
*/
private void updateHeader(Row row) throws IOException
{
- Map<String, ColumnFamily> columnFamilies = row.getColumnFamilyMap();
+ Map<String, ColumnFamily> columnFamilies = row.getColumnFamilies();
Table table = Table.open(table_);
Set<String> cNames = columnFamilies.keySet();
for ( String cName : cNames )
@@ -626,6 +630,9 @@
public static void main(String[] args) throws Throwable
{
LogUtil.init();
+
+ // the return value is not used in this case
+ DatabaseDescriptor.init();
File logDir = new File(DatabaseDescriptor.getLogFileLocation());
File[] files = logDir.listFiles();