You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/21 03:36:54 UTC
svn commit: r796108 [2/4] - in /incubator/cassandra/trunk: conf/ interface/
interface/gen-java/org/apache/cassandra/service/
src/java/org/apache/cassandra/cli/ src/java/org/apache/cassandra/config/
src/java/org/apache/cassandra/cql/common/ src/java/org...
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java Tue Jul 21 01:36:52 2009
@@ -19,6 +19,7 @@
package org.apache.cassandra.cql.common;
import java.util.*;
+import java.io.UnsupportedEncodingException;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.cql.execution.RuntimeErrorMsg;
@@ -59,7 +60,7 @@
}
// specific column lookup
- public List<Map<String,String>> getRows() throws RuntimeException
+ public List<Map<String,String>> getRows() throws UnsupportedEncodingException
{
String columnKey = (String)(columnKey_.get());
QueryPath path = null;
@@ -68,7 +69,7 @@
if (superColumnKey_ != null)
{
superColumnKey = (String)(superColumnKey_.get());
- path = new QueryPath(cfMetaData_.cfName, superColumnKey);
+ path = new QueryPath(cfMetaData_.cfName, superColumnKey.getBytes("UTF-8"));
}
else
{
@@ -79,7 +80,7 @@
try
{
String key = (String)(rowKey_.get());
- ReadCommand readCommand = new SliceByNamesReadCommand(cfMetaData_.tableName, key, path, Arrays.asList(columnKey));
+ ReadCommand readCommand = new SliceByNamesReadCommand(cfMetaData_.tableName, key, path, Arrays.asList(columnKey.getBytes("UTF-8")));
row = StorageProxy.readProtocol(readCommand, StorageService.ConsistencyLevel.WEAK);
}
catch (Exception e)
@@ -97,13 +98,13 @@
if (superColumnKey_ != null)
{
// this is the super column case
- IColumn column = cfamily.getColumn(superColumnKey);
+ IColumn column = cfamily.getColumn(superColumnKey.getBytes("UTF-8"));
if (column != null)
columns = column.getSubColumns();
}
else
{
- columns = cfamily.getAllColumns();
+ columns = cfamily.getSortedColumns();
}
if (columns != null && columns.size() > 0)
@@ -120,7 +121,7 @@
List<Map<String, String>> rows = new LinkedList<Map<String, String>>();
Map<String, String> result = new HashMap<String, String>();
- result.put(cfMetaData_.n_columnKey, column.name());
+ result.put(cfMetaData_.n_columnKey, new String(column.name(), "UTF-8"));
result.put(cfMetaData_.n_columnValue, new String(column.value()));
result.put(cfMetaData_.n_columnTimestamp, Long.toString(column.timestamp()));
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java Tue Jul 21 01:36:52 2009
@@ -18,16 +18,12 @@
package org.apache.cassandra.db;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
import java.util.Collection;
import java.nio.ByteBuffer;
import org.apache.commons.lang.ArrayUtils;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.db.marshal.AbstractType;
/**
@@ -47,27 +43,27 @@
return serializer_;
}
- private final String name;
+ private final byte[] name;
private final byte[] value;
private final long timestamp;
private final boolean isMarkedForDelete;
- Column(String name)
+ Column(byte[] name)
{
this(name, ArrayUtils.EMPTY_BYTE_ARRAY);
}
- Column(String name, byte[] value)
+ Column(byte[] name, byte[] value)
{
this(name, value, 0);
}
- public Column(String name, byte[] value, long timestamp)
+ public Column(byte[] name, byte[] value, long timestamp)
{
this(name, value, timestamp, false);
}
- public Column(String name, byte[] value, long timestamp, boolean isDeleted)
+ public Column(byte[] name, byte[] value, long timestamp, boolean isDeleted)
{
assert name != null;
assert value != null;
@@ -77,12 +73,12 @@
isMarkedForDelete = isDeleted;
}
- public String name()
+ public byte[] name()
{
return name;
}
- public IColumn getSubColumn(String columnName)
+ public Column getSubColumn(byte[] columnName)
{
throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
}
@@ -92,7 +88,7 @@
return value;
}
- public byte[] value(String key)
+ public byte[] value(byte[] key)
{
throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
}
@@ -112,7 +108,7 @@
return timestamp;
}
- public long timestamp(String key)
+ public long timestamp(byte[] key)
{
throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
}
@@ -146,7 +142,7 @@
* We store the string as UTF-8 encoded, so when we calculate the length, it
* should be converted to UTF-8.
*/
- return IColumn.UtfPrefix_ + FBUtilities.getUTF8Length(name) + DBConstants.boolSize_ + DBConstants.tsSize_ + DBConstants.intSize_ + value.length;
+ return IColumn.UtfPrefix_ + name.length + DBConstants.boolSize_ + DBConstants.tsSize_ + DBConstants.intSize_ + value.length;
}
/*
@@ -172,19 +168,6 @@
return null;
}
- public String toString()
- {
- StringBuilder sb = new StringBuilder();
- sb.append(name);
- sb.append(":");
- sb.append(isMarkedForDelete());
- sb.append(":");
- sb.append(value().length);
- sb.append("@");
- sb.append(timestamp());
- return sb.toString();
- }
-
public byte[] digest()
{
StringBuilder stringBuilder = new StringBuilder();
@@ -210,27 +193,18 @@
}
return timestamp - o.timestamp;
}
-}
-class ColumnSerializer implements ICompactSerializer<IColumn>
-{
- public void serialize(IColumn column, DataOutputStream dos) throws IOException
+ public String getString(AbstractType comparator)
{
- dos.writeUTF(column.name());
- dos.writeBoolean(column.isMarkedForDelete());
- dos.writeLong(column.timestamp());
- dos.writeInt(column.value().length);
- dos.write(column.value());
- }
-
- public IColumn deserialize(DataInputStream dis) throws IOException
- {
- String name = dis.readUTF();
- boolean delete = dis.readBoolean();
- long ts = dis.readLong();
- int size = dis.readInt();
- byte[] value = new byte[size];
- dis.readFully(value);
- return new Column(name, value, ts, delete);
+ StringBuilder sb = new StringBuilder();
+ sb.append(comparator.getString(name));
+ sb.append(":");
+ sb.append(isMarkedForDelete());
+ sb.append(":");
+ sb.append(value.length);
+ sb.append("@");
+ sb.append(timestamp());
+ return sb.toString();
}
}
+
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Tue Jul 21 01:36:52 2009
@@ -22,16 +22,16 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.reflect.Proxy;
+import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentSkipListMap;
-import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.ArrayUtils;
import org.apache.log4j.Logger;
@@ -39,6 +39,8 @@
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.MarshalException;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -51,7 +53,6 @@
private static Logger logger_ = Logger.getLogger( ColumnFamily.class );
private static Map<String, String> columnTypes_ = new HashMap<String, String>();
- private static Map<String, String> indexTypes_ = new HashMap<String, String>();
private String type_;
private String table_;
@@ -61,9 +62,6 @@
/* TODO: These are the various column types. Hard coded for now. */
columnTypes_.put("Standard", "Standard");
columnTypes_.put("Super", "Super");
-
- indexTypes_.put("Name", "Name");
- indexTypes_.put("Time", "Time");
}
public static ICompactSerializer<ColumnFamily> serializer()
@@ -87,17 +85,10 @@
return columnTypes_.get(key);
}
- public static String getColumnSortProperty(String columnIndexProperty)
- {
- if ( columnIndexProperty == null )
- return indexTypes_.get("Time");
- return indexTypes_.get(columnIndexProperty);
- }
-
public static ColumnFamily create(String tableName, String cfName)
{
String columnType = DatabaseDescriptor.getColumnFamilyType(tableName, cfName);
- Comparator<IColumn> comparator = ColumnComparatorFactory.getComparator(ColumnComparatorFactory.ComparatorType.NAME);
+ AbstractType comparator = DatabaseDescriptor.getType(tableName, cfName);
return new ColumnFamily(cfName, columnType, comparator);
}
@@ -107,20 +98,14 @@
private long markedForDeleteAt = Long.MIN_VALUE;
private int localDeletionTime = Integer.MIN_VALUE;
private AtomicInteger size_ = new AtomicInteger(0);
- private EfficientBidiMap columns_;
+ private ConcurrentSkipListMap<byte[], IColumn> columns_;
- public ColumnFamily(String cfName, String columnType, Comparator<IColumn> comparator)
+ public ColumnFamily(String cfName, String columnType, AbstractType comparator)
{
name_ = cfName;
type_ = columnType;
columnSerializer_ = columnType.equals("Standard") ? Column.serializer() : SuperColumn.serializer();
- if(columns_ == null)
- columns_ = new EfficientBidiMap(comparator);
- }
-
- public ColumnFamily(String cfName, String columnType, ColumnComparatorFactory.ComparatorType indexType)
- {
- this(cfName, columnType, ColumnComparatorFactory.getComparator(indexType));
+ columns_ = new ConcurrentSkipListMap<byte[], IColumn>(comparator);
}
public ColumnFamily cloneMeShallow()
@@ -134,7 +119,7 @@
ColumnFamily cloneMe()
{
ColumnFamily cf = cloneMeShallow();
- cf.columns_ = columns_.cloneMe();
+ cf.columns_ = columns_.clone();
return cf;
}
@@ -149,7 +134,7 @@
*/
void addColumns(ColumnFamily cf)
{
- for (IColumn column : cf.getAllColumns())
+ for (IColumn column : cf.getSortedColumns())
{
addColumn(column);
}
@@ -163,22 +148,17 @@
int getColumnCount()
{
int count = 0;
- Map<String, IColumn> columns = columns_.getColumns();
- if( columns != null )
- {
- if(!isSuper())
- {
- count = columns.size();
- }
- else
- {
- Collection<IColumn> values = columns.values();
- for(IColumn column: values)
- {
- count += column.getObjectCount();
- }
- }
- }
+ if(!isSuper())
+ {
+ count = columns_.size();
+ }
+ else
+ {
+ for(IColumn column: columns_.values())
+ {
+ count += column.getObjectCount();
+ }
+ }
return count;
}
@@ -199,12 +179,28 @@
IColumn column;
if (path.superColumnName == null)
{
+ try
+ {
+ getComparator().validate(path.columnName);
+ }
+ catch (Exception e)
+ {
+ throw new MarshalException("Invalid column name in " + path.columnFamilyName + " for " + getComparator().getClass().getName());
+ }
column = new Column(path.columnName, value, timestamp, deleted);
}
else
{
+ try
+ {
+ getComparator().validate(path.superColumnName);
+ }
+ catch (Exception e)
+ {
+ throw new MarshalException("Invalid supercolumn name in " + path.columnFamilyName + " for " + getComparator().getClass().getName());
+ }
column = new SuperColumn(path.superColumnName);
- column.addColumn(new Column(path.columnName, value, timestamp, deleted));
+ column.addColumn(new Column(path.columnName, value, timestamp, deleted)); // checks subcolumn name
}
addColumn(column);
}
@@ -223,7 +219,7 @@
*/
public void addColumn(IColumn column)
{
- String name = column.name();
+ byte[] name = column.name();
IColumn oldColumn = columns_.get(name);
if (oldColumn != null)
{
@@ -249,22 +245,27 @@
}
}
- public IColumn getColumn(String name)
+ public IColumn getColumn(byte[] name)
+ {
+ return columns_.get(name);
+ }
+
+ public SortedSet<byte[]> getColumnNames()
{
- return columns_.get( name );
+ return columns_.keySet();
}
- public SortedSet<IColumn> getAllColumns()
+ public Collection<IColumn> getSortedColumns()
{
- return columns_.getSortedColumns();
+ return columns_.values();
}
- public Map<String, IColumn> getColumns()
+ public Map<byte[], IColumn> getColumnsMap()
{
- return columns_.getColumns();
+ return columns_;
}
- public void remove(String columnName)
+ public void remove(byte[] columnName)
{
columns_.remove(columnName);
}
@@ -301,50 +302,44 @@
// (don't need to worry about cfNew containing IColumns that are shadowed by
// the delete tombstone, since cfNew was generated by CF.resolve, which
// takes care of those for us.)
- Map<String, IColumn> columns = cfComposite.getColumns();
- Set<String> cNames = columns.keySet();
- for ( String cName : cNames )
- {
- IColumn columnInternal = columns_.get(cName);
- IColumn columnExternal = columns.get(cName);
- if( columnInternal == null )
- {
- cfDiff.addColumn(columnExternal);
- }
- else
- {
- IColumn columnDiff = columnInternal.diff(columnExternal);
- if(columnDiff != null)
- {
- cfDiff.addColumn(columnDiff);
- }
- }
+ Map<byte[], IColumn> columns = cfComposite.getColumnsMap();
+ Set<byte[]> cNames = columns.keySet();
+ for (byte[] cName : cNames)
+ {
+ IColumn columnInternal = columns_.get(cName);
+ IColumn columnExternal = columns.get(cName);
+ if (columnInternal == null)
+ {
+ cfDiff.addColumn(columnExternal);
+ }
+ else
+ {
+ IColumn columnDiff = columnInternal.diff(columnExternal);
+ if (columnDiff != null)
+ {
+ cfDiff.addColumn(columnDiff);
+ }
+ }
}
- if (!cfDiff.getColumns().isEmpty() || cfDiff.isMarkedForDelete())
+ if (!cfDiff.getColumnsMap().isEmpty() || cfDiff.isMarkedForDelete())
return cfDiff;
else
return null;
}
- private Comparator<IColumn> getComparator()
+ public AbstractType getComparator()
{
- return columns_.getComparator();
- }
-
- public ColumnComparatorFactory.ComparatorType getComparatorType()
- {
- return ColumnComparatorFactory.ComparatorType.NAME;
+ return (AbstractType)columns_.comparator();
}
int size()
{
- if ( size_.get() == 0 )
+ if (size_.get() == 0)
{
- Set<String> cNames = columns_.getColumns().keySet();
- for ( String cName : cNames )
+ for (IColumn column : columns_.values())
{
- size_.addAndGet(columns_.get(cName).size());
+ size_.addAndGet(column.size());
}
}
return size_.get();
@@ -374,7 +369,7 @@
}
sb.append(" [");
- sb.append(StringUtils.join(getAllColumns(), ", "));
+ sb.append(getComparator().getColumnsString(getSortedColumns()));
sb.append("])");
return sb.toString();
@@ -382,20 +377,19 @@
public byte[] digest()
{
- Set<IColumn> columns = columns_.getSortedColumns();
- byte[] xorHash = ArrayUtils.EMPTY_BYTE_ARRAY;
- for(IColumn column : columns)
- {
- if(xorHash.length == 0)
- {
- xorHash = column.digest();
- }
- else
- {
+ byte[] xorHash = ArrayUtils.EMPTY_BYTE_ARRAY;
+ for (IColumn column : columns_.values())
+ {
+ if (xorHash.length == 0)
+ {
+ xorHash = column.digest();
+ }
+ else
+ {
xorHash = FBUtilities.xor(xorHash, column.digest());
- }
- }
- return xorHash;
+ }
+ }
+ return xorHash;
}
public long getMarkedForDeleteAt()
@@ -462,11 +456,11 @@
*/
public void serialize(ColumnFamily columnFamily, DataOutputStream dos) throws IOException
{
- Collection<IColumn> columns = columnFamily.getAllColumns();
+ Collection<IColumn> columns = columnFamily.getSortedColumns();
dos.writeUTF(columnFamily.name());
dos.writeUTF(columnFamily.type_);
- dos.writeInt(columnFamily.getComparatorType().ordinal());
+ dos.writeUTF(columnFamily.getComparator().getClass().getCanonicalName());
dos.writeInt(columnFamily.localDeletionTime);
dos.writeLong(columnFamily.markedForDeleteAt);
@@ -481,7 +475,7 @@
{
ColumnFamily cf = new ColumnFamily(dis.readUTF(),
dis.readUTF(),
- ColumnComparatorFactory.ComparatorType.values()[dis.readInt()]);
+ readComparator(dis));
cf.delete(dis.readInt(), dis.readLong());
int size = dis.readInt();
IColumn column;
@@ -492,6 +486,23 @@
}
return cf;
}
+
+ private AbstractType readComparator(DataInputStream dis) throws IOException
+ {
+ String className = dis.readUTF();
+ try
+ {
+ return (AbstractType)Class.forName(className).getConstructor().newInstance();
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new RuntimeException("Unable to load comparator class '" + className + "'. probably this means you have obsolete sstables lying around", e);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue Jul 21 01:36:52 2009
@@ -40,6 +40,7 @@
import org.apache.cassandra.utils.*;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.collections.IteratorUtils;
@@ -534,9 +535,9 @@
// either way (tombstone or non- getting priority) would be fine,
// but we picked this way because it makes removing delivered hints
// easier for HintedHandoffManager.
- for (String cname : new ArrayList<String>(cf.getColumns().keySet()))
+ for (byte[] cname : cf.getColumnNames())
{
- IColumn c = cf.getColumns().get(cname);
+ IColumn c = cf.getColumnsMap().get(cname);
if (c instanceof SuperColumn)
{
long minTimestamp = Math.max(c.getMarkedForDeleteAt(), cf.getMarkedForDeleteAt());
@@ -1411,7 +1412,7 @@
return writeStats_.mean();
}
- public ColumnFamily getColumnFamily(String key, QueryPath path, String start, String finish, boolean isAscending, int limit) throws IOException
+ public ColumnFamily getColumnFamily(String key, QueryPath path, byte[] start, byte[] finish, boolean isAscending, int limit) throws IOException
{
return getColumnFamily(new SliceQueryFilter(key, path, start, finish, isAscending, limit));
}
@@ -1428,14 +1429,17 @@
*/
public ColumnFamily getColumnFamily(QueryFilter filter, int gcBefore) throws IOException
{
+ assert columnFamily_.equals(filter.getColumnFamilyName());
+
// if we are querying subcolumns of a supercolumn, fetch the supercolumn with NQF, then filter in-memory.
if (filter.path.superColumnName != null)
{
- QueryFilter nameFilter = new NamesQueryFilter(filter.key, new QueryPath(filter.path.columnFamilyName), filter.path.superColumnName);
+ AbstractType comparator = DatabaseDescriptor.getType(table_, columnFamily_);
+ QueryFilter nameFilter = new NamesQueryFilter(filter.key, new QueryPath(columnFamily_), filter.path.superColumnName);
ColumnFamily cf = getColumnFamily(nameFilter);
if (cf != null)
{
- for (IColumn column : cf.getAllColumns())
+ for (IColumn column : cf.getSortedColumns())
{
filter.filterSuperColumn((SuperColumn) column);
}
@@ -1455,7 +1459,7 @@
memtableLock_.readLock().lock();
try
{
- iter = filter.getMemColumnIterator(memtable_);
+ iter = filter.getMemColumnIterator(memtable_, getComparator());
returnCF = iter.getColumnFamily();
}
finally
@@ -1468,7 +1472,7 @@
List<Memtable> memtables = getUnflushedMemtables(filter.getColumnFamilyName());
for (Memtable memtable:memtables)
{
- iter = filter.getMemColumnIterator(memtable);
+ iter = filter.getMemColumnIterator(memtable, getComparator());
returnCF.delete(iter.getColumnFamily());
iterators.add(iter);
}
@@ -1477,7 +1481,7 @@
List<SSTableReader> sstables = new ArrayList<SSTableReader>(ssTables_.values());
for (SSTableReader sstable : sstables)
{
- iter = filter.getSSTableColumnIterator(sstable);
+ iter = filter.getSSTableColumnIterator(sstable, getComparator());
if (iter.hasNext()) // initializes iter.CF
{
returnCF.delete(iter.getColumnFamily());
@@ -1485,7 +1489,7 @@
iterators.add(iter);
}
- Comparator<IColumn> comparator = filter.getColumnComparator();
+ Comparator<IColumn> comparator = filter.getColumnComparator(getComparator());
Iterator collated = IteratorUtils.collatedIterator(comparator, iterators);
if (!collated.hasNext())
return null;
@@ -1513,6 +1517,11 @@
}
}
+ public AbstractType getComparator()
+ {
+ return DatabaseDescriptor.getType(table_, columnFamily_);
+ }
+
/**
* for testing. no effort is made to clear historical memtables.
*/
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java Tue Jul 21 01:36:52 2009
@@ -28,11 +28,11 @@
import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.io.IndexHelper;
import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.db.marshal.AbstractType;
/**
* Help to create an index for a column family based on size of columns
- * Author : Karthik Ranganathan ( kranganathan@facebook.com )
*/
public class ColumnIndexer
@@ -46,7 +46,7 @@
*/
public static void serialize(ColumnFamily columnFamily, DataOutputStream dos) throws IOException
{
- Collection<IColumn> columns = columnFamily.getAllColumns();
+ Collection<IColumn> columns = columnFamily.getSortedColumns();
BloomFilter bf = createColumnBloomFilter(columns);
/* Write out the bloom filter. */
DataOutputBuffer bufOut = new DataOutputBuffer();
@@ -57,7 +57,7 @@
dos.write(bufOut.getData(), 0, bufOut.getLength());
/* Do the indexing */
- doIndexing(columnFamily.getComparatorType(), columns, dos);
+ doIndexing(columnFamily.getComparator(), columns, dos);
}
/**
@@ -69,20 +69,20 @@
private static BloomFilter createColumnBloomFilter(Collection<IColumn> columns)
{
int columnCount = 0;
- for ( IColumn column : columns )
+ for (IColumn column : columns)
{
columnCount += column.getObjectCount();
}
-
+
BloomFilter bf = new BloomFilter(columnCount, 4);
- for ( IColumn column : columns )
+ for (IColumn column : columns)
{
bf.add(column.name());
/* If this is SuperColumn type Column Family we need to get the subColumns too. */
- if ( column instanceof SuperColumn )
+ if (column instanceof SuperColumn)
{
Collection<IColumn> subColumns = column.getSubColumns();
- for ( IColumn subColumn : subColumns )
+ for (IColumn subColumn : subColumns)
{
bf.add(subColumn.name());
}
@@ -90,17 +90,6 @@
}
return bf;
}
-
- private static IndexHelper.ColumnIndexInfo getColumnIndexInfo(ColumnComparatorFactory.ComparatorType typeInfo, IColumn column)
- {
- IndexHelper.ColumnIndexInfo cIndexInfo = null;
-
- cIndexInfo = IndexHelper.ColumnIndexFactory.instance(typeInfo);
- cIndexInfo.set(typeInfo == ColumnComparatorFactory.ComparatorType.NAME
- ? column.name() : column.timestamp());
-
- return cIndexInfo;
- }
/**
* Given the collection of columns in the Column Family,
@@ -111,7 +100,7 @@
* to be written.
* @throws IOException
*/
- private static void doIndexing(ColumnComparatorFactory.ComparatorType typeInfo, Collection<IColumn> columns, DataOutputStream dos) throws IOException
+ private static void doIndexing(AbstractType comparator, Collection<IColumn> columns, DataOutputStream dos) throws IOException
{
/* we are going to write column indexes */
int numColumns = 0;
@@ -139,7 +128,7 @@
* SuperColumn always use the name indexing scheme for
* the SuperColumns. We will fix this later.
*/
- IndexHelper.ColumnIndexInfo cIndexInfo = getColumnIndexInfo(typeInfo, column);
+ IndexHelper.ColumnIndexInfo cIndexInfo = new IndexHelper.ColumnIndexInfo(column.name(), 0, 0, comparator);
cIndexInfo.position(position);
cIndexInfo.count(numColumns);
columnIndexList.add(cIndexInfo);
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java?rev=796108&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java Tue Jul 21 01:36:52 2009
@@ -0,0 +1,45 @@
+package org.apache.cassandra.db;
+
+import java.io.*;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class ColumnSerializer implements ICompactSerializer<IColumn>
+{
+ public static void writeName(byte[] name, DataOutput out) throws IOException
+ {
+ int length = name.length;
+ assert length <= 65535;
+ out.writeByte((length >> 8) & 0xFF);
+ out.writeByte(length & 0xFF);
+ out.write(name);
+ }
+
+ public static byte[] readName(DataInput in) throws IOException
+ {
+ int length = 0;
+ length |= (in.readByte() << 8);
+ length |= in.readByte();
+ byte[] bytes = new byte[length];
+ in.readFully(bytes);
+ return bytes;
+ }
+
+ public void serialize(IColumn column, DataOutputStream dos) throws IOException
+ {
+ ColumnSerializer.writeName(column.name(), dos);
+ dos.writeBoolean(column.isMarkedForDelete());
+ dos.writeLong(column.timestamp());
+ FBUtilities.writeByteArray(column.value(), dos);
+ }
+
+ public Column deserialize(DataInputStream dis) throws IOException
+ {
+ byte[] name = ColumnSerializer.readName(dis);
+ boolean delete = dis.readBoolean();
+ long ts = dis.readLong();
+ byte[] value = FBUtilities.readByteArray(dis);
+ return new Column(name, value, ts, delete);
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Tue Jul 21 01:36:52 2009
@@ -101,7 +101,7 @@
return quorumResponseHandler.get();
}
- private static void deleteEndPoint(String endpointAddress, String tableName, String key, long timestamp) throws IOException
+ private static void deleteEndPoint(byte[] endpointAddress, String tableName, byte[] key, long timestamp) throws IOException
{
RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, tableName);
rm.delete(new QueryPath(HINTS_CF, key, endpointAddress), timestamp);
@@ -124,16 +124,15 @@
Collection<ColumnFamily> cfs = row.getColumnFamilies();
for (ColumnFamily cf : cfs)
{
- Set<IColumn> columns = cf.getAllColumns();
long maxTS = Long.MIN_VALUE;
if (!cf.isSuper())
{
- for (IColumn col : columns)
+ for (IColumn col : cf.getSortedColumns())
maxTS = Math.max(maxTS, col.timestamp());
}
else
{
- for (IColumn col : columns)
+ for (IColumn col : cf.getSortedColumns())
{
maxTS = Math.max(maxTS, col.timestamp());
Collection<IColumn> subColumns = col.getSubColumns();
@@ -166,15 +165,17 @@
{
continue;
}
- Collection<IColumn> keys = hintColumnFamily.getAllColumns();
+ Collection<IColumn> keys = hintColumnFamily.getSortedColumns();
for (IColumn keyColumn : keys)
{
Collection<IColumn> endpoints = keyColumn.getSubColumns();
+ String keyStr = new String(keyColumn.name(), "UTF-8");
int deleted = 0;
for (IColumn endpoint : endpoints)
{
- if (sendMessage(endpoint.name(), tableName, keyColumn.name()))
+ String endpointStr = new String(endpoint.name(), "UTF-8");
+ if (sendMessage(endpointStr, tableName, keyStr))
{
deleteEndPoint(endpoint.name(), tableName, keyColumn.name(), keyColumn.timestamp());
deleted++;
@@ -182,7 +183,7 @@
}
if (deleted == endpoints.size())
{
- deleteHintedData(tableName, keyColumn.name());
+ deleteHintedData(tableName, keyStr);
}
}
}
@@ -198,6 +199,7 @@
if (logger_.isDebugEnabled())
logger_.debug("Started hinted handoff for endPoint " + endPoint.getHost());
+ String targetEPBytes = endPoint.getHost();
// 1. Scan through all the keys that we need to handoff
// 2. For each key read the list of recipients if the endpoint matches send
// 3. Delete that recipient from the key if write was successful
@@ -209,19 +211,20 @@
{
continue;
}
- Collection<IColumn> keys = hintedColumnFamily.getAllColumns();
+ Collection<IColumn> keys = hintedColumnFamily.getSortedColumns();
for (IColumn keyColumn : keys)
{
+ String keyStr = new String(keyColumn.name(), "UTF-8");
Collection<IColumn> endpoints = keyColumn.getSubColumns();
- for (IColumn endpoint : endpoints)
+ for (IColumn hintEndPoint : endpoints)
{
- if (endpoint.name().equals(endPoint.getHost()) && sendMessage(endpoint.name(), null, keyColumn.name()))
+ if (hintEndPoint.name().equals(targetEPBytes) && sendMessage(endPoint.getHost(), null, keyStr))
{
- deleteEndPoint(endpoint.name(), tableName, keyColumn.name(), keyColumn.timestamp());
+ deleteEndPoint(hintEndPoint.name(), tableName, keyColumn.name(), keyColumn.timestamp());
if (endpoints.size() == 1)
{
- deleteHintedData(tableName, keyColumn.name());
+ deleteHintedData(tableName, keyStr);
}
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java Tue Jul 21 01:36:52 2009
@@ -19,7 +19,8 @@
package org.apache.cassandra.db;
import java.util.Collection;
-import java.util.Map;
+
+import org.apache.cassandra.db.marshal.AbstractType;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -30,18 +31,19 @@
public static short UtfPrefix_ = 2;
public boolean isMarkedForDelete();
public long getMarkedForDeleteAt();
- public String name();
+ public byte[] name();
public int size();
public int serializedSize();
public long timestamp();
- public long timestamp(String key);
+ public long timestamp(byte[] columnName);
public byte[] value();
- public byte[] value(String key);
+ public byte[] value(byte[] columnName);
public Collection<IColumn> getSubColumns();
- public IColumn getSubColumn(String columnName);
+ public IColumn getSubColumn(byte[] columnName);
public void addColumn(IColumn column);
public IColumn diff(IColumn column);
public int getObjectCount();
public byte[] digest();
public int getLocalDeletionTime(); // for tombstone GC, so int is sufficient granularity
+ public String getString(AbstractType comparator);
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Tue Jul 21 01:36:52 2009
@@ -33,6 +33,7 @@
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.DestructivePQIterator;
import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.log4j.Logger;
@@ -279,12 +280,12 @@
/**
* obtain an iterator of columns in this memtable in the specified order starting from a given column.
*/
- public ColumnIterator getSliceIterator(SliceQueryFilter filter)
+ public ColumnIterator getSliceIterator(SliceQueryFilter filter, AbstractType typeComparator)
{
ColumnFamily cf = columnFamilies_.get(filter.key);
final ColumnFamily columnFamily = cf == null ? ColumnFamily.create(table_, filter.getColumnFamilyName()) : cf.cloneMeShallow();
- final IColumn columns[] = (cf == null ? columnFamily : cf).getAllColumns().toArray(new IColumn[columnFamily.getAllColumns().size()]);
+ final IColumn columns[] = (cf == null ? columnFamily : cf).getSortedColumns().toArray(new IColumn[columnFamily.getSortedColumns().size()]);
// TODO if we are dealing with supercolumns, we need to clone them while we have the read lock since they can be modified later
if (!filter.isAscending)
ArrayUtils.reverse(columns);
@@ -296,7 +297,7 @@
// can't use a ColumnComparatorFactory comparator since those compare on both name and time (and thus will fail to match
// our dummy column, since the time there is arbitrary).
- Comparator<IColumn> comparator = filter.getColumnComparator();
+ Comparator<IColumn> comparator = filter.getColumnComparator(typeComparator);
int index = Arrays.binarySearch(columns, startIColumn, comparator);
final int startIndex = index < 0 ? -(index + 1) : index;
@@ -328,8 +329,8 @@
return new SimpleAbstractColumnIterator()
{
- private Iterator<String> iter = filter.columns.iterator();
- private String current;
+ private Iterator<byte[]> iter = filter.columns.iterator();
+ private byte[] current;
public ColumnFamily getColumnFamily()
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java Tue Jul 21 01:36:52 2009
@@ -28,6 +28,8 @@
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.config.DatabaseDescriptor;
public abstract class ReadCommand
@@ -80,6 +82,11 @@
public abstract ReadCommand copy();
public abstract Row getRow(Table table) throws IOException;
+
+ protected AbstractType getComparator()
+ {
+ return DatabaseDescriptor.getType(table, getColumnFamilyName());
+ }
}
class ReadCommandSerializer implements ICompactSerializer<ReadCommand>
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Tue Jul 21 01:36:52 2009
@@ -40,8 +40,10 @@
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.BatchMutationSuper;
import org.apache.cassandra.service.BatchMutation;
+import org.apache.cassandra.service.InvalidRequestException;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.marshal.MarshalException;
/**
@@ -67,11 +69,6 @@
private String key_;
protected Map<String, ColumnFamily> modifications_ = new HashMap<String, ColumnFamily>();
- /* Ctor for JAXB */
- private RowMutation()
- {
- }
-
public RowMutation(String table, String key)
{
table_ = table;
@@ -110,9 +107,9 @@
return modifications_.keySet();
}
- void addHints(String hint) throws IOException
+ void addHints(String key, String host) throws IOException
{
- QueryPath path = new QueryPath(HintedHandOffManager.HINTS_CF, null, hint);
+ QueryPath path = new QueryPath(HintedHandOffManager.HINTS_CF, key.getBytes("UTF-8"), host.getBytes("UTF-8"));
add(path, ArrayUtils.EMPTY_BYTE_ARRAY, System.currentTimeMillis());
}
@@ -261,7 +258,7 @@
return rm;
}
- public static RowMutation getRowMutation(String table, BatchMutationSuper batchMutationSuper)
+ public static RowMutation getRowMutation(String table, BatchMutationSuper batchMutationSuper) throws InvalidRequestException
{
RowMutation rm = new RowMutation(table, batchMutationSuper.key.trim());
for (String cfName : batchMutationSuper.cfmap.keySet())
@@ -270,7 +267,14 @@
{
for (org.apache.cassandra.service.Column column : super_column.columns)
{
- rm.add(new QueryPath(cfName, super_column.name, column.name), column.value, column.timestamp);
+ try
+ {
+ rm.add(new QueryPath(cfName, super_column.name, column.name), column.value, column.timestamp);
+ }
+ catch (MarshalException e)
+ {
+ throw new InvalidRequestException(e.getMessage());
+ }
}
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Tue Jul 21 01:36:52 2009
@@ -73,7 +73,7 @@
logger_.debug("Adding hint for " + hint);
/* add necessary hints to this mutation */
RowMutation hintedMutation = new RowMutation(Table.SYSTEM_TABLE, rm.table());
- hintedMutation.addHints(rm.key() + ":" + hint.getHost());
+ hintedMutation.addHints(rm.key(), hint.getHost());
hintedMutation.apply();
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Scanner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Scanner.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Scanner.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Scanner.java Tue Jul 21 01:36:52 2009
@@ -21,8 +21,6 @@
import java.util.*;
import java.io.IOException;
-import org.apache.cassandra.config.DatabaseDescriptor;
-
/**
* This class is used to loop through a retrieved column family
@@ -66,7 +64,7 @@
ColumnFamily columnFamily = table.get(key, cf);
if ( columnFamily != null )
{
- Collection<IColumn> columns = columnFamily.getAllColumns();
+ Collection<IColumn> columns = columnFamily.getSortedColumns();
columnIt_ = columns.iterator();
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java Tue Jul 21 01:36:52 2009
@@ -27,22 +27,25 @@
import org.apache.cassandra.service.ColumnParent;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.config.DatabaseDescriptor;
public class SliceByNamesReadCommand extends ReadCommand
{
public final QueryPath columnParent;
- public final SortedSet<String> columnNames;
+ public final SortedSet<byte[]> columnNames;
- public SliceByNamesReadCommand(String table, String key, ColumnParent column_parent, Collection<String> columnNames)
+ public SliceByNamesReadCommand(String table, String key, ColumnParent column_parent, Collection<byte[]> columnNames)
{
this(table, key, new QueryPath(column_parent), columnNames);
}
- public SliceByNamesReadCommand(String table, String key, QueryPath path, Collection<String> columnNames)
+ public SliceByNamesReadCommand(String table, String key, QueryPath path, Collection<byte[]> columnNames)
{
super(table, key, CMD_TYPE_GET_SLICE_BY_NAMES);
this.columnParent = path;
- this.columnNames = new TreeSet<String>(columnNames);
+ this.columnNames = new TreeSet<byte[]>(getComparator());
+ this.columnNames.addAll(columnNames);
}
@Override
@@ -72,7 +75,7 @@
"table='" + table + '\'' +
", key='" + key + '\'' +
", columnParent='" + columnParent + '\'' +
- ", columns=[" + StringUtils.join(columnNames, ", ") + "]" +
+ ", columns=[" + getComparator().getString(columnNames) + "]" +
')';
}
@@ -91,10 +94,9 @@
dos.writeInt(realRM.columnNames.size());
if (realRM.columnNames.size() > 0)
{
- for (String cName : realRM.columnNames)
+ for (byte[] cName : realRM.columnNames)
{
- dos.writeInt(cName.getBytes().length);
- dos.write(cName.getBytes());
+ ColumnSerializer.writeName(cName, dos);
}
}
}
@@ -108,12 +110,10 @@
QueryPath columnParent = QueryPath.deserialize(dis);
int size = dis.readInt();
- List<String> columns = new ArrayList<String>();
+ List<byte[]> columns = new ArrayList<byte[]>();
for (int i = 0; i < size; ++i)
{
- byte[] bytes = new byte[dis.readInt()];
- dis.readFully(bytes);
- columns.add(new String(bytes));
+ columns.add(ColumnSerializer.readName(dis));
}
SliceByNamesReadCommand rm = new SliceByNamesReadCommand(table, key, columnParent, columns);
rm.setDigestQuery(isDigest);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java Tue Jul 21 01:36:52 2009
@@ -28,16 +28,16 @@
public class SliceFromReadCommand extends ReadCommand
{
public final QueryPath column_parent;
- public final String start, finish;
+ public final byte[] start, finish;
public final boolean isAscending;
public final int count;
- public SliceFromReadCommand(String table, String key, ColumnParent column_parent, String start, String finish, boolean isAscending, int count)
+ public SliceFromReadCommand(String table, String key, ColumnParent column_parent, byte[] start, byte[] finish, boolean isAscending, int count)
{
this(table, key, new QueryPath(column_parent), start, finish, isAscending, count);
}
- public SliceFromReadCommand(String table, String key, QueryPath columnParent, String start, String finish, boolean isAscending, int count)
+ public SliceFromReadCommand(String table, String key, QueryPath columnParent, byte[] start, byte[] finish, boolean isAscending, int count)
{
super(table, key, CMD_TYPE_GET_SLICE);
this.column_parent = columnParent;
@@ -74,8 +74,8 @@
"table='" + table + '\'' +
", key='" + key + '\'' +
", column_parent='" + column_parent + '\'' +
- ", start='" + start + '\'' +
- ", finish='" + finish + '\'' +
+ ", start='" + getComparator().getString(start) + '\'' +
+ ", finish='" + getComparator().getString(finish) + '\'' +
", isAscending=" + isAscending +
", count=" + count +
')';
@@ -92,8 +92,8 @@
dos.writeUTF(realRM.table);
dos.writeUTF(realRM.key);
realRM.column_parent.serialize(dos);
- dos.writeUTF(realRM.start);
- dos.writeUTF(realRM.finish);
+ ColumnSerializer.writeName(realRM.start, dos);
+ ColumnSerializer.writeName(realRM.finish, dos);
dos.writeBoolean(realRM.isAscending);
dos.writeInt(realRM.count);
}
@@ -102,7 +102,13 @@
public ReadCommand deserialize(DataInputStream dis) throws IOException
{
boolean isDigest = dis.readBoolean();
- SliceFromReadCommand rm = new SliceFromReadCommand(dis.readUTF(), dis.readUTF(), QueryPath.deserialize(dis), dis.readUTF(), dis.readUTF(), dis.readBoolean(), dis.readInt());
+ SliceFromReadCommand rm = new SliceFromReadCommand(dis.readUTF(),
+ dis.readUTF(),
+ QueryPath.deserialize(dis),
+ ColumnSerializer.readName(dis),
+ ColumnSerializer.readName(dis),
+ dis.readBoolean(),
+ dis.readInt());
rm.setDigestQuery(isDigest);
return rm;
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Tue Jul 21 01:36:52 2009
@@ -23,17 +23,18 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
-import java.util.Set;
-import java.util.Map;
+import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.MarshalException;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -49,19 +50,21 @@
return serializer_;
}
- private String name_;
- private ConcurrentSkipListMap<String, IColumn> columns_ = new ConcurrentSkipListMap<String, IColumn>();
+ private byte[] name_;
+ // TODO make subcolumn comparator configurable
+ private ConcurrentSkipListMap<byte[], IColumn> columns_ = new ConcurrentSkipListMap<byte[], IColumn>(new LongType());
private int localDeletionTime = Integer.MIN_VALUE;
private long markedForDeleteAt = Long.MIN_VALUE;
private AtomicInteger size_ = new AtomicInteger(0);
- SuperColumn()
+ SuperColumn(byte[] name)
{
+ name_ = name;
}
- SuperColumn(String name)
+ public AbstractType getComparator()
{
- name_ = name;
+ return (AbstractType)columns_.comparator();
}
public SuperColumn cloneMeShallow()
@@ -76,7 +79,7 @@
return markedForDeleteAt > Long.MIN_VALUE;
}
- public String name()
+ public byte[] name()
{
return name_;
}
@@ -86,19 +89,13 @@
return columns_.values();
}
- public IColumn getSubColumn(String columnName)
+ public IColumn getSubColumn(byte[] columnName)
{
IColumn column = columns_.get(columnName);
assert column == null || column instanceof Column;
return column;
}
- public int compareTo(IColumn superColumn)
- {
- return (name_.compareTo(superColumn.name()));
- }
-
-
public int size()
{
/*
@@ -133,7 +130,7 @@
* We need to keep the way we are calculating the column size in sync with the
* way we are calculating the size for the column family serializer.
*/
- return IColumn.UtfPrefix_ + FBUtilities.getUTF8Length(name_) + DBConstants.boolSize_ + DBConstants.intSize_ + DBConstants.intSize_ + getSizeOfAllColumns();
+ return IColumn.UtfPrefix_ + name_.length + DBConstants.boolSize_ + DBConstants.intSize_ + DBConstants.intSize_ + getSizeOfAllColumns();
}
/**
@@ -150,7 +147,7 @@
return size;
}
- public void remove(String columnName)
+ public void remove(byte[] columnName)
{
columns_.remove(columnName);
}
@@ -160,9 +157,9 @@
throw new UnsupportedOperationException("This operation is not supported for Super Columns.");
}
- public long timestamp(String key)
+ public long timestamp(byte[] columnName)
{
- IColumn column = columns_.get(key);
+ IColumn column = columns_.get(columnName);
if ( column instanceof SuperColumn )
throw new UnsupportedOperationException("A super column cannot hold other super columns.");
if ( column != null )
@@ -175,9 +172,9 @@
throw new UnsupportedOperationException("This operation is not supported for Super Columns.");
}
- public byte[] value(String key)
+ public byte[] value(byte[] columnName)
{
- IColumn column = columns_.get(key);
+ IColumn column = columns_.get(columnName);
if ( column != null )
return column.value();
throw new IllegalArgumentException("Value was requested for a column that does not exist.");
@@ -187,6 +184,14 @@
{
if (!(column instanceof Column))
throw new UnsupportedOperationException("A super column can only contain simple columns.");
+ try
+ {
+ getComparator().validate(column.name());
+ }
+ catch (Exception e)
+ {
+ throw new MarshalException("Invalid column name in supercolumn for " + getComparator().getClass().getName());
+ }
IColumn oldColumn = columns_.get(column.name());
if ( oldColumn == null )
{
@@ -213,10 +218,14 @@
*/
public void putColumn(IColumn column)
{
- if ( !(column instanceof SuperColumn))
- throw new UnsupportedOperationException("Only Super column objects should be put here");
- if( !name_.equals(column.name()))
- throw new IllegalArgumentException("The name should match the name of the current column or super column");
+ if (!(column instanceof SuperColumn))
+ {
+ throw new UnsupportedOperationException("Only Super column objects should be put here");
+ }
+ if (!Arrays.equals(name_, column.name()))
+ {
+ throw new IllegalArgumentException("The name should match the name of the current column or super column");
+ }
for (IColumn subColumn : column.getSubColumns())
{
@@ -281,7 +290,7 @@
byte[] xorHash = ArrayUtils.EMPTY_BYTE_ARRAY;
if(name_ == null)
return xorHash;
- xorHash = name_.getBytes();
+ xorHash = name_.clone();
for(IColumn column : columns_.values())
{
xorHash = FBUtilities.xor(xorHash, column.digest());
@@ -289,19 +298,18 @@
return xorHash;
}
-
- public String toString()
+ public String getString(AbstractType comparator)
{
StringBuilder sb = new StringBuilder();
sb.append("SuperColumn(");
- sb.append(name_);
+ sb.append(comparator.getString(name_));
if (isMarkedForDelete()) {
- sb.append(" -delete at " + getMarkedForDeleteAt() + "-");
+ sb.append(" -delete at ").append(getMarkedForDeleteAt()).append("-");
}
sb.append(" [");
- sb.append(StringUtils.join(getSubColumns(), ", "));
+ sb.append(getComparator().getColumnsString(columns_.values()));
sb.append("])");
return sb.toString();
@@ -324,7 +332,7 @@
public void serialize(IColumn column, DataOutputStream dos) throws IOException
{
SuperColumn superColumn = (SuperColumn)column;
- dos.writeUTF(superColumn.name());
+ ColumnSerializer.writeName(column.name(), dos);
dos.writeInt(superColumn.getLocalDeletionTime());
dos.writeLong(superColumn.getMarkedForDeleteAt());
@@ -341,7 +349,7 @@
public IColumn deserialize(DataInputStream dis) throws IOException
{
- String name = dis.readUTF();
+ byte[] name = ColumnSerializer.readName(dis);
SuperColumn superColumn = new SuperColumn(name);
superColumn.markForDeleteAt(dis.readInt(), dis.readLong());
assert dis.available() > 0;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java Tue Jul 21 01:36:52 2009
@@ -19,6 +19,7 @@
package org.apache.cassandra.db;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
import org.apache.log4j.Logger;
@@ -26,9 +27,9 @@
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.utils.BasicUtilities;
-import org.apache.cassandra.db.filter.IdentityQueryFilter;
import org.apache.cassandra.db.filter.NamesQueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.filter.QueryFilter;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -39,8 +40,20 @@
private static Logger logger_ = Logger.getLogger(SystemTable.class);
public static final String LOCATION_CF = "LocationInfo";
private static final String LOCATION_KEY = "L"; // only one row in Location CF
- private static final String TOKEN = "Token";
- private static final String GENERATION = "Generation";
+ private static final byte[] TOKEN = utf8("Token");
+ private static final byte[] GENERATION = utf8("Generation");
+
+ private static byte[] utf8(String str)
+ {
+ try
+ {
+ return str.getBytes("UTF-8");
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
/*
* This method is used to update the SystemTable with the new token.
@@ -50,7 +63,8 @@
IPartitioner p = StorageService.getPartitioner();
Table table = Table.open(Table.SYSTEM_TABLE);
/* Retrieve the "LocationInfo" column family */
- ColumnFamily cf = table.getColumnFamilyStore(LOCATION_CF).getColumnFamily(new NamesQueryFilter(LOCATION_KEY, new QueryPath(LOCATION_KEY), TOKEN));
+ QueryFilter filter = new NamesQueryFilter(LOCATION_KEY, new QueryPath(LOCATION_CF), TOKEN);
+ ColumnFamily cf = table.getColumnFamilyStore(LOCATION_CF).getColumnFamily(filter);
long oldTokenColumnTimestamp = cf.getColumn(SystemTable.TOKEN).timestamp();
/* create the "Token" whose value is the new token. */
IColumn tokenColumn = new Column(SystemTable.TOKEN, p.getTokenFactory().toByteArray(token), oldTokenColumnTimestamp + 1);
@@ -74,7 +88,8 @@
{
/* Read the system table to retrieve the storage ID and the generation */
Table table = Table.open(Table.SYSTEM_TABLE);
- ColumnFamily cf = table.getColumnFamilyStore(LOCATION_CF).getColumnFamily(new NamesQueryFilter(LOCATION_KEY, new QueryPath(LOCATION_KEY), GENERATION));
+ QueryFilter filter = new NamesQueryFilter(LOCATION_KEY, new QueryPath(LOCATION_CF), GENERATION);
+ ColumnFamily cf = table.getColumnFamilyStore(LOCATION_CF).getColumnFamily(filter);
IPartitioner p = StorageService.getPartitioner();
if (cf == null)
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Tue Jul 21 01:36:52 2009
@@ -27,6 +27,7 @@
import org.apache.commons.collections.IteratorUtils;
import org.apache.commons.collections.Predicate;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.BootstrapInitiateMessage;
@@ -591,7 +592,7 @@
for (ColumnFamily columnFamily : row.getColumnFamilies())
{
- Collection<IColumn> columns = columnFamily.getAllColumns();
+ Collection<IColumn> columns = columnFamily.getSortedColumns();
for(IColumn column : columns)
{
ColumnFamilyStore cfStore = columnFamilyStores_.get(column.name());
@@ -704,7 +705,8 @@
}
// make sure there is actually non-tombstone content associated w/ this key
// TODO record the key source(s) somehow and only check that source (e.g., memtable or sstable)
- if (cfs.getColumnFamily(new SliceQueryFilter(current, new QueryPath(cfName), "", "", true, 1), Integer.MAX_VALUE) != null)
+ QueryFilter filter = new SliceQueryFilter(current, new QueryPath(cfName), ArrayUtils.EMPTY_BYTE_ARRAY, ArrayUtils.EMPTY_BYTE_ARRAY, true, 1);
+ if (cfs.getColumnFamily(filter, Integer.MAX_VALUE) != null)
{
keys.add(current);
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/IdentityQueryFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/IdentityQueryFilter.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/IdentityQueryFilter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/IdentityQueryFilter.java Tue Jul 21 01:36:52 2009
@@ -1,5 +1,7 @@
package org.apache.cassandra.db.filter;
+import org.apache.commons.lang.ArrayUtils;
+
import org.apache.cassandra.db.SuperColumn;
public class IdentityQueryFilter extends SliceQueryFilter
@@ -9,7 +11,7 @@
*/
public IdentityQueryFilter(String key, QueryPath path)
{
- super(key, path, "", "", true, Integer.MAX_VALUE);
+ super(key, path, ArrayUtils.EMPTY_BYTE_ARRAY, ArrayUtils.EMPTY_BYTE_ARRAY, true, Integer.MAX_VALUE);
}
@Override
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java Tue Jul 21 01:36:52 2009
@@ -4,36 +4,51 @@
import java.util.SortedSet;
import java.util.Arrays;
import java.util.TreeSet;
+import java.util.Comparator;
import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.utils.ReducingIterator;
-import org.apache.cassandra.db.filter.ColumnIterator;
import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.SuperColumn;
+import org.apache.cassandra.db.marshal.AbstractType;
public class NamesQueryFilter extends QueryFilter
{
- public final SortedSet<String> columns;
+ public final SortedSet<byte[]> columns;
- public NamesQueryFilter(String key, QueryPath columnParent, SortedSet<String> columns)
+ public NamesQueryFilter(String key, QueryPath columnParent, SortedSet<byte[]> columns)
{
super(key, columnParent);
this.columns = columns;
}
- public NamesQueryFilter(String key, QueryPath columnParent, String column)
+ public NamesQueryFilter(String key, QueryPath columnParent, byte[] column)
{
- this(key, columnParent, new TreeSet<String>(Arrays.asList(column)));
+ this(key, columnParent, getSingleColumnSet(column));
}
- public ColumnIterator getMemColumnIterator(Memtable memtable)
+ private static TreeSet<byte[]> getSingleColumnSet(byte[] column)
+ {
+ Comparator<byte[]> singleColumnComparator = new Comparator<byte[]>()
+ {
+ public int compare(byte[] o1, byte[] o2)
+ {
+ return Arrays.equals(o1, o2) ? 0 : -1;
+ }
+ };
+ TreeSet<byte[]> set = new TreeSet<byte[]>(singleColumnComparator);
+ set.add(column);
+ return set;
+ }
+
+ public ColumnIterator getMemColumnIterator(Memtable memtable, AbstractType comparator)
{
return memtable.getNamesIterator(this);
}
- public ColumnIterator getSSTableColumnIterator(SSTableReader sstable) throws IOException
+ public ColumnIterator getSSTableColumnIterator(SSTableReader sstable, AbstractType comparator) throws IOException
{
return new SSTableNamesIterator(sstable.getFilename(), key, getColumnFamilyName(), columns);
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java Tue Jul 21 01:36:52 2009
@@ -3,10 +3,12 @@
import java.io.IOException;
import java.util.Comparator;
import java.util.Iterator;
+import java.util.Arrays;
import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.utils.ReducingIterator;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.AbstractType;
public abstract class QueryFilter
{
@@ -23,13 +25,13 @@
* returns an iterator that returns columns from the given memtable
* matching the Filter criteria in sorted order.
*/
- public abstract ColumnIterator getMemColumnIterator(Memtable memtable);
+ public abstract ColumnIterator getMemColumnIterator(Memtable memtable, AbstractType comparator);
/**
* returns an iterator that returns columns from the given SSTable
* matching the Filter criteria in sorted order.
*/
- public abstract ColumnIterator getSSTableColumnIterator(SSTableReader sstable) throws IOException;
+ public abstract ColumnIterator getSSTableColumnIterator(SSTableReader sstable, AbstractType comparator) throws IOException;
/**
* collects columns from reducedColumns into returnCF. Termination is determined
@@ -44,13 +46,13 @@
*/
public abstract void filterSuperColumn(SuperColumn superColumn);
- public Comparator<IColumn> getColumnComparator()
+ public Comparator<IColumn> getColumnComparator(final AbstractType comparator)
{
return new Comparator<IColumn>()
{
public int compare(IColumn c1, IColumn c2)
{
- return c1.name().compareTo(c2.name());
+ return comparator.compare(c1.name(), c2.name());
}
};
}
@@ -63,9 +65,9 @@
{
ColumnFamily curCF = returnCF.cloneMeShallow();
- protected Object getKey(IColumn o)
+ protected boolean isEqual(IColumn o1, IColumn o2)
{
- return o == null ? null : o.name();
+ return Arrays.equals(o1.name(), o2.name());
}
public void reduce(IColumn current)
@@ -75,7 +77,7 @@
protected IColumn getReduced()
{
- IColumn c = curCF.getAllColumns().first();
+ IColumn c = curCF.getSortedColumns().iterator().next();
curCF.clear();
return c;
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryPath.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryPath.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryPath.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryPath.java Tue Jul 21 01:36:52 2009
@@ -4,23 +4,22 @@
import java.io.IOException;
import java.io.DataInputStream;
+import org.apache.commons.lang.ArrayUtils;
+
import org.apache.cassandra.service.ColumnParent;
import org.apache.cassandra.service.ColumnPath;
import org.apache.cassandra.service.ColumnPathOrParent;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.db.ColumnSerializer;
public class QueryPath
{
public final String columnFamilyName;
- public final String superColumnName;
- public final String columnName;
+ public final byte[] superColumnName;
+ public final byte[] columnName;
- public QueryPath(String columnFamilyName, String superColumnName, String columnName)
+ public QueryPath(String columnFamilyName, byte[] superColumnName, byte[] columnName)
{
- // TODO remove these when we're sure the last vestiges of the old api are gone
- assert columnFamilyName == null || !columnFamilyName.contains(":");
- assert superColumnName == null || !superColumnName.contains(":");
- assert columnName == null || !columnName.contains(":");
-
this.columnFamilyName = columnFamilyName;
this.superColumnName = superColumnName;
this.columnName = columnName;
@@ -31,7 +30,7 @@
this(columnParent.column_family, columnParent.super_column, null);
}
- public QueryPath(String columnFamilyName, String superColumnName)
+ public QueryPath(String columnFamilyName, byte[] superColumnName)
{
this(columnFamilyName, superColumnName, null);
}
@@ -51,7 +50,7 @@
this(column_path_or_parent.column_family, column_path_or_parent.super_column, column_path_or_parent.column);
}
- public static QueryPath column(String columnName)
+ public static QueryPath column(byte[] columnName)
{
return new QueryPath(null, null, columnName);
}
@@ -69,18 +68,18 @@
public void serialize(DataOutputStream dos) throws IOException
{
assert !"".equals(columnFamilyName);
- assert !"".equals(superColumnName);
- assert !"".equals(columnName);
+ assert superColumnName == null || superColumnName.length > 0;
+ assert columnName == null || columnName.length > 0;
dos.writeUTF(columnFamilyName == null ? "" : columnFamilyName);
- dos.writeUTF(superColumnName == null ? "" : superColumnName);
- dos.writeUTF(columnName == null ? "" : columnName);
+ ColumnSerializer.writeName(superColumnName == null ? ArrayUtils.EMPTY_BYTE_ARRAY : superColumnName, dos);
+ ColumnSerializer.writeName(columnName == null ? ArrayUtils.EMPTY_BYTE_ARRAY : columnName, dos);
}
public static QueryPath deserialize(DataInputStream din) throws IOException
{
String cfName = din.readUTF();
- String scName = din.readUTF();
- String cName = din.readUTF();
- return new QueryPath(cfName.isEmpty() ? null : cfName, scName.isEmpty() ? null : scName, cName.isEmpty() ? null : cName);
+ byte[] scName = ColumnSerializer.readName(din);
+ byte[] cName = ColumnSerializer.readName(din);
+ return new QueryPath(cfName.isEmpty() ? null : cfName, scName.length == 0 ? null : scName, cName.length == 0 ? null : cName);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java Tue Jul 21 01:36:52 2009
@@ -13,10 +13,10 @@
{
private ColumnFamily cf;
private Iterator<IColumn> iter;
- public final SortedSet<String> columns;
+ public final SortedSet<byte[]> columns;
// TODO make this actually iterate so we don't have to read + deserialize + filter data that we don't need due to merging other sstables
- public SSTableNamesIterator(String filename, String key, String cfName, SortedSet<String> columns) throws IOException
+ public SSTableNamesIterator(String filename, String key, String cfName, SortedSet<byte[]> columns) throws IOException
{
this.columns = columns;
SSTableReader ssTable = SSTableReader.open(filename);
@@ -24,7 +24,7 @@
if (buffer.getLength() > 0)
{
cf = ColumnFamily.serializer().deserialize(buffer);
- iter = cf.getAllColumns().iterator();
+ iter = cf.getSortedColumns().iterator();
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java Tue Jul 21 01:36:52 2009
@@ -5,6 +5,7 @@
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.DataOutputBuffer;
import org.apache.cassandra.io.DataInputBuffer;
import org.apache.cassandra.io.SequenceFile;
@@ -17,20 +18,22 @@
class SSTableSliceIterator extends AbstractIterator<IColumn> implements ColumnIterator
{
protected boolean isAscending;
- private String startColumn;
+ private byte[] startColumn;
private DataOutputBuffer outBuf = new DataOutputBuffer();
private DataInputBuffer inBuf = new DataInputBuffer();
private int curColumnIndex;
private ColumnFamily curCF = null;
private ArrayList<IColumn> curColumns = new ArrayList<IColumn>();
private SequenceFile.ColumnGroupReader reader;
+ private AbstractType comparator;
- public SSTableSliceIterator(String filename, String key, String cfName, String startColumn, boolean isAscending)
+ public SSTableSliceIterator(String filename, String key, String cfName, AbstractType comparator, byte[] startColumn, boolean isAscending)
throws IOException
{
this.isAscending = isAscending;
SSTableReader ssTable = SSTableReader.open(filename);
reader = ssTable.getColumnGroupReader(key, cfName, startColumn, isAscending);
+ this.comparator = comparator;
this.startColumn = startColumn;
curColumnIndex = isAscending ? 0 : -1;
}
@@ -38,9 +41,9 @@
private boolean isColumnNeeded(IColumn column)
{
if (isAscending)
- return (column.name().compareTo(startColumn) >= 0);
+ return comparator.compare(column.name(), startColumn) >= 0;
else
- return (column.name().compareTo(startColumn) <= 0);
+ return comparator.compare(column.name(), startColumn) <= 0;
}
private void getColumnsFromBuffer() throws IOException
@@ -51,7 +54,7 @@
if (curCF == null)
curCF = columnFamily.cloneMeShallow();
curColumns.clear();
- for (IColumn column : columnFamily.getAllColumns())
+ for (IColumn column : columnFamily.getSortedColumns())
if (isColumnNeeded(column))
curColumns.add(column);
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java Tue Jul 21 01:36:52 2009
@@ -8,14 +8,15 @@
import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.utils.ReducingIterator;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.AbstractType;
public class SliceQueryFilter extends QueryFilter
{
- public final String start, finish;
+ public final byte[] start, finish;
public final boolean isAscending;
public final int count;
- public SliceQueryFilter(String key, QueryPath columnParent, String start, String finish, boolean ascending, int count)
+ public SliceQueryFilter(String key, QueryPath columnParent, byte[] start, byte[] finish, boolean ascending, int count)
{
super(key, columnParent);
this.start = start;
@@ -24,14 +25,14 @@
this.count = count;
}
- public ColumnIterator getMemColumnIterator(Memtable memtable)
+ public ColumnIterator getMemColumnIterator(Memtable memtable, AbstractType comparator)
{
- return memtable.getSliceIterator(this);
+ return memtable.getSliceIterator(this, comparator);
}
- public ColumnIterator getSSTableColumnIterator(SSTableReader sstable) throws IOException
+ public ColumnIterator getSSTableColumnIterator(SSTableReader sstable, AbstractType comparator) throws IOException
{
- return new SSTableSliceIterator(sstable.getFilename(), key, getColumnFamilyName(), start, isAscending);
+ return new SSTableSliceIterator(sstable.getFilename(), key, getColumnFamilyName(), comparator, start, isAscending);
}
public void filterSuperColumn(SuperColumn superColumn)
@@ -41,23 +42,23 @@
}
@Override
- public Comparator<IColumn> getColumnComparator()
+ public Comparator<IColumn> getColumnComparator(AbstractType comparator)
{
- Comparator<IColumn> comparator = super.getColumnComparator();
- return isAscending ? comparator : new ReverseComparator(comparator);
+ return isAscending ? super.getColumnComparator(comparator) : new ReverseComparator(super.getColumnComparator(comparator));
}
public void collectColumns(ColumnFamily returnCF, ReducingIterator<IColumn> reducedColumns, int gcBefore)
{
int liveColumns = 0;
+ AbstractType comparator = returnCF.getComparator();
for (IColumn column : reducedColumns)
{
if (liveColumns >= count)
break;
- if (!finish.isEmpty()
- && ((isAscending && column.name().compareTo(finish) > 0))
- || (!isAscending && column.name().compareTo(finish) < 0))
+ if (finish.length > 0
+ && ((isAscending && comparator.compare(column.name(), finish) > 0))
+ || (!isAscending && comparator.compare(column.name(), finish) < 0))
break;
if (!column.isMarkedForDelete())
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java?rev=796108&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java Tue Jul 21 01:36:52 2009
@@ -0,0 +1,41 @@
+package org.apache.cassandra.db.marshal;
+
+import java.util.Comparator;
+import java.util.Collection;
+
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.SuperColumn;
+
+public abstract class AbstractType implements Comparator<byte[]>
+{
+ /** get a string representation of the bytes suitable for log messages */
+ public abstract String getString(byte[] bytes);
+
+ /** validate that the byte array is a valid sequence for the type we are supposed to be comparing */
+ public void validate(byte[] bytes)
+ {
+ getString(bytes);
+ }
+
+ /** convenience method */
+ public String getString(Collection<byte[]> names)
+ {
+ StringBuilder builder = new StringBuilder();
+ for (byte[] name : names)
+ {
+ builder.append(getString(name)).append(",");
+ }
+ return builder.toString();
+ }
+
+ /** convenience method */
+ public String getColumnsString(Collection<IColumn> columns)
+ {
+ StringBuilder builder = new StringBuilder();
+ for (IColumn column : columns)
+ {
+ builder.append(getString(column.name())).append(",");
+ }
+ return builder.toString();
+ }
+}
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java?rev=796108&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java Tue Jul 21 01:36:52 2009
@@ -0,0 +1,42 @@
+package org.apache.cassandra.db.marshal;
+
+import java.io.UnsupportedEncodingException;
+
+public class AsciiType extends AbstractType
+{
+ public int compare(byte[] o1, byte[] o2)
+ {
+ int length = Math.max(o1.length, o2.length);
+ for (int i = 0; i < length; i++)
+ {
+ int index = i + 1;
+ if (index > o1.length && index <= o2.length)
+ {
+ return -1;
+ }
+ if (index > o2.length && index <= o1.length)
+ {
+ return 1;
+ }
+
+ int delta = o1[i] - o2[i];
+ if (delta != 0)
+ {
+ return delta;
+ }
+ }
+ return 0;
+ }
+
+ public String getString(byte[] bytes)
+ {
+ try
+ {
+ return new String(bytes, "US-ASCII");
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}