You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/21 03:36:54 UTC

svn commit: r796108 [2/4] - in /incubator/cassandra/trunk: conf/ interface/ interface/gen-java/org/apache/cassandra/service/ src/java/org/apache/cassandra/cli/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/cql/common/ src/java/org...

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java Tue Jul 21 01:36:52 2009
@@ -19,6 +19,7 @@
 package org.apache.cassandra.cql.common;
 
 import java.util.*;
+import java.io.UnsupportedEncodingException;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.cql.execution.RuntimeErrorMsg;
@@ -59,7 +60,7 @@
     }
 
     // specific column lookup
-    public List<Map<String,String>> getRows() throws RuntimeException
+    public List<Map<String,String>> getRows() throws UnsupportedEncodingException
     {
         String columnKey = (String)(columnKey_.get());
         QueryPath path = null;
@@ -68,7 +69,7 @@
         if (superColumnKey_ != null)
         {
             superColumnKey = (String)(superColumnKey_.get());
-            path = new QueryPath(cfMetaData_.cfName, superColumnKey);
+            path = new QueryPath(cfMetaData_.cfName, superColumnKey.getBytes("UTF-8"));
         }
         else
         {
@@ -79,7 +80,7 @@
         try
         {
             String key = (String)(rowKey_.get());
-            ReadCommand readCommand = new SliceByNamesReadCommand(cfMetaData_.tableName, key, path, Arrays.asList(columnKey));
+            ReadCommand readCommand = new SliceByNamesReadCommand(cfMetaData_.tableName, key, path, Arrays.asList(columnKey.getBytes("UTF-8")));
             row = StorageProxy.readProtocol(readCommand, StorageService.ConsistencyLevel.WEAK);
         }
         catch (Exception e)
@@ -97,13 +98,13 @@
                 if (superColumnKey_ != null)
                 {
                     // this is the super column case
-                    IColumn column = cfamily.getColumn(superColumnKey);
+                    IColumn column = cfamily.getColumn(superColumnKey.getBytes("UTF-8"));
                     if (column != null)
                         columns = column.getSubColumns();
                 }
                 else
                 {
-                    columns = cfamily.getAllColumns();
+                    columns = cfamily.getSortedColumns();
                 }
 
                 if (columns != null && columns.size() > 0)
@@ -120,7 +121,7 @@
                         List<Map<String, String>> rows = new LinkedList<Map<String, String>>();
 
                         Map<String, String> result = new HashMap<String, String>();
-                        result.put(cfMetaData_.n_columnKey, column.name());
+                        result.put(cfMetaData_.n_columnKey, new String(column.name(), "UTF-8"));
                         result.put(cfMetaData_.n_columnValue, new String(column.value()));
                         result.put(cfMetaData_.n_columnTimestamp, Long.toString(column.timestamp()));
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java Tue Jul 21 01:36:52 2009
@@ -18,16 +18,12 @@
 
 package org.apache.cassandra.db;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
 import java.util.Collection;
 import java.nio.ByteBuffer;
 
 import org.apache.commons.lang.ArrayUtils;
 
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.db.marshal.AbstractType;
 
 
 /**
@@ -47,27 +43,27 @@
         return serializer_;
     }
 
-    private final String name;
+    private final byte[] name;
     private final byte[] value;
     private final long timestamp;
     private final boolean isMarkedForDelete;
 
-    Column(String name)
+    Column(byte[] name)
     {
         this(name, ArrayUtils.EMPTY_BYTE_ARRAY);
     }
 
-    Column(String name, byte[] value)
+    Column(byte[] name, byte[] value)
     {
         this(name, value, 0);
     }
 
-    public Column(String name, byte[] value, long timestamp)
+    public Column(byte[] name, byte[] value, long timestamp)
     {
         this(name, value, timestamp, false);
     }
 
-    public Column(String name, byte[] value, long timestamp, boolean isDeleted)
+    public Column(byte[] name, byte[] value, long timestamp, boolean isDeleted)
     {
         assert name != null;
         assert value != null;
@@ -77,12 +73,12 @@
         isMarkedForDelete = isDeleted;
     }
 
-    public String name()
+    public byte[] name()
     {
         return name;
     }
 
-    public IColumn getSubColumn(String columnName)
+    public Column getSubColumn(byte[] columnName)
     {
         throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
     }
@@ -92,7 +88,7 @@
         return value;
     }
 
-    public byte[] value(String key)
+    public byte[] value(byte[] key)
     {
         throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
     }
@@ -112,7 +108,7 @@
         return timestamp;
     }
 
-    public long timestamp(String key)
+    public long timestamp(byte[] key)
     {
         throw new UnsupportedOperationException("This operation is unsupported on simple columns.");
     }
@@ -146,7 +142,7 @@
            * We store the string as UTF-8 encoded, so when we calculate the length, it
            * should be converted to UTF-8.
            */
-        return IColumn.UtfPrefix_ + FBUtilities.getUTF8Length(name) + DBConstants.boolSize_ + DBConstants.tsSize_ + DBConstants.intSize_ + value.length;
+        return IColumn.UtfPrefix_ + name.length + DBConstants.boolSize_ + DBConstants.tsSize_ + DBConstants.intSize_ + value.length;
     }
 
     /*
@@ -172,19 +168,6 @@
         return null;
     }
 
-    public String toString()
-    {
-        StringBuilder sb = new StringBuilder();
-        sb.append(name);
-        sb.append(":");
-        sb.append(isMarkedForDelete());
-        sb.append(":");
-        sb.append(value().length);
-        sb.append("@");
-        sb.append(timestamp());
-        return sb.toString();
-    }
-
     public byte[] digest()
     {
         StringBuilder stringBuilder = new StringBuilder();
@@ -210,27 +193,18 @@
         }
         return timestamp - o.timestamp;
     }
-}
 
-class ColumnSerializer implements ICompactSerializer<IColumn>
-{
-    public void serialize(IColumn column, DataOutputStream dos) throws IOException
+    public String getString(AbstractType comparator)
     {
-        dos.writeUTF(column.name());
-        dos.writeBoolean(column.isMarkedForDelete());
-        dos.writeLong(column.timestamp());
-        dos.writeInt(column.value().length);
-        dos.write(column.value());
-    }
-
-    public IColumn deserialize(DataInputStream dis) throws IOException
-    {
-        String name = dis.readUTF();
-        boolean delete = dis.readBoolean();
-        long ts = dis.readLong();
-        int size = dis.readInt();
-        byte[] value = new byte[size];
-        dis.readFully(value);
-        return new Column(name, value, ts, delete);
+        StringBuilder sb = new StringBuilder();
+        sb.append(comparator.getString(name));
+        sb.append(":");
+        sb.append(isMarkedForDelete());
+        sb.append(":");
+        sb.append(value.length);
+        sb.append("@");
+        sb.append(timestamp());
+        return sb.toString();
     }
 }
+

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Tue Jul 21 01:36:52 2009
@@ -22,16 +22,16 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.lang.reflect.Proxy;
+import java.lang.reflect.InvocationTargetException;
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentSkipListMap;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.log4j.Logger;
 
@@ -39,6 +39,8 @@
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.MarshalException;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -51,7 +53,6 @@
 
     private static Logger logger_ = Logger.getLogger( ColumnFamily.class );
     private static Map<String, String> columnTypes_ = new HashMap<String, String>();
-    private static Map<String, String> indexTypes_ = new HashMap<String, String>();
     private String type_;
     private String table_;
 
@@ -61,9 +62,6 @@
         /* TODO: These are the various column types. Hard coded for now. */
         columnTypes_.put("Standard", "Standard");
         columnTypes_.put("Super", "Super");
-
-        indexTypes_.put("Name", "Name");
-        indexTypes_.put("Time", "Time");
     }
 
     public static ICompactSerializer<ColumnFamily> serializer()
@@ -87,17 +85,10 @@
     	return columnTypes_.get(key);
     }
 
-    public static String getColumnSortProperty(String columnIndexProperty)
-    {
-    	if ( columnIndexProperty == null )
-    		return indexTypes_.get("Time");
-        return indexTypes_.get(columnIndexProperty);
-    }
-
     public static ColumnFamily create(String tableName, String cfName)
     {
         String columnType = DatabaseDescriptor.getColumnFamilyType(tableName, cfName);
-        Comparator<IColumn> comparator = ColumnComparatorFactory.getComparator(ColumnComparatorFactory.ComparatorType.NAME);
+        AbstractType comparator = DatabaseDescriptor.getType(tableName, cfName);
         return new ColumnFamily(cfName, columnType, comparator);
     }
 
@@ -107,20 +98,14 @@
     private long markedForDeleteAt = Long.MIN_VALUE;
     private int localDeletionTime = Integer.MIN_VALUE;
     private AtomicInteger size_ = new AtomicInteger(0);
-    private EfficientBidiMap columns_;
+    private ConcurrentSkipListMap<byte[], IColumn> columns_;
 
-    public ColumnFamily(String cfName, String columnType, Comparator<IColumn> comparator)
+    public ColumnFamily(String cfName, String columnType, AbstractType comparator)
     {
         name_ = cfName;
         type_ = columnType;
         columnSerializer_ = columnType.equals("Standard") ? Column.serializer() : SuperColumn.serializer();
-        if(columns_ == null)
-            columns_ = new EfficientBidiMap(comparator);
-    }
-
-    public ColumnFamily(String cfName, String columnType, ColumnComparatorFactory.ComparatorType indexType)
-    {
-        this(cfName, columnType, ColumnComparatorFactory.getComparator(indexType));
+        columns_ = new ConcurrentSkipListMap<byte[], IColumn>(comparator);
     }
 
     public ColumnFamily cloneMeShallow()
@@ -134,7 +119,7 @@
     ColumnFamily cloneMe()
     {
         ColumnFamily cf = cloneMeShallow();
-        cf.columns_ = columns_.cloneMe();
+        cf.columns_ = columns_.clone();
     	return cf;
     }
 
@@ -149,7 +134,7 @@
     */
     void addColumns(ColumnFamily cf)
     {
-        for (IColumn column : cf.getAllColumns())
+        for (IColumn column : cf.getSortedColumns())
         {
             addColumn(column);
         }
@@ -163,22 +148,17 @@
     int getColumnCount()
     {
     	int count = 0;
-    	Map<String, IColumn> columns = columns_.getColumns();
-    	if( columns != null )
-    	{
-    		if(!isSuper())
-    		{
-    			count = columns.size();
-    		}
-    		else
-    		{
-    			Collection<IColumn> values = columns.values();
-		    	for(IColumn column: values)
-		    	{
-		    		count += column.getObjectCount();
-		    	}
-    		}
-    	}
+        if(!isSuper())
+        {
+            count = columns_.size();
+        }
+        else
+        {
+            for(IColumn column: columns_.values())
+            {
+                count += column.getObjectCount();
+            }
+        }
     	return count;
     }
 
@@ -199,12 +179,28 @@
 		IColumn column;
         if (path.superColumnName == null)
         {
+            try
+            {
+                getComparator().validate(path.columnName);
+            }
+            catch (Exception e)
+            {
+                throw new MarshalException("Invalid column name in " + path.columnFamilyName + " for " + getComparator().getClass().getName());
+            }
             column = new Column(path.columnName, value, timestamp, deleted);
         }
         else
         {
+            try
+            {
+                getComparator().validate(path.superColumnName);
+            }
+            catch (Exception e)
+            {
+                throw new MarshalException("Invalid supercolumn name in " + path.columnFamilyName + " for " + getComparator().getClass().getName());
+            }
             column = new SuperColumn(path.superColumnName);
-            column.addColumn(new Column(path.columnName, value, timestamp, deleted));
+            column.addColumn(new Column(path.columnName, value, timestamp, deleted)); // checks subcolumn name
         }
 		addColumn(column);
     }
@@ -223,7 +219,7 @@
     */
     public void addColumn(IColumn column)
     {
-        String name = column.name();
+        byte[] name = column.name();
         IColumn oldColumn = columns_.get(name);
         if (oldColumn != null)
         {
@@ -249,22 +245,27 @@
         }
     }
 
-    public IColumn getColumn(String name)
+    public IColumn getColumn(byte[] name)
+    {
+        return columns_.get(name);
+    }
+
+    public SortedSet<byte[]> getColumnNames()
     {
-        return columns_.get( name );
+        return columns_.keySet();
     }
 
-    public SortedSet<IColumn> getAllColumns()
+    public Collection<IColumn> getSortedColumns()
     {
-        return columns_.getSortedColumns();
+        return columns_.values();
     }
 
-    public Map<String, IColumn> getColumns()
+    public Map<byte[], IColumn> getColumnsMap()
     {
-        return columns_.getColumns();
+        return columns_;
     }
 
-    public void remove(String columnName)
+    public void remove(byte[] columnName)
     {
     	columns_.remove(columnName);
     }
@@ -301,50 +302,44 @@
         // (don't need to worry about cfNew containing IColumns that are shadowed by
         // the delete tombstone, since cfNew was generated by CF.resolve, which
         // takes care of those for us.)
-        Map<String, IColumn> columns = cfComposite.getColumns();
-        Set<String> cNames = columns.keySet();
-        for ( String cName : cNames )
-        {
-        	IColumn columnInternal = columns_.get(cName);
-        	IColumn columnExternal = columns.get(cName);
-        	if( columnInternal == null )
-        	{
-        		cfDiff.addColumn(columnExternal);
-        	}
-        	else
-        	{
-            	IColumn columnDiff = columnInternal.diff(columnExternal);
-        		if(columnDiff != null)
-        		{
-        			cfDiff.addColumn(columnDiff);
-        		}
-        	}
+        Map<byte[], IColumn> columns = cfComposite.getColumnsMap();
+        Set<byte[]> cNames = columns.keySet();
+        for (byte[] cName : cNames)
+        {
+            IColumn columnInternal = columns_.get(cName);
+            IColumn columnExternal = columns.get(cName);
+            if (columnInternal == null)
+            {
+                cfDiff.addColumn(columnExternal);
+            }
+            else
+            {
+                IColumn columnDiff = columnInternal.diff(columnExternal);
+                if (columnDiff != null)
+                {
+                    cfDiff.addColumn(columnDiff);
+                }
+            }
         }
 
-        if (!cfDiff.getColumns().isEmpty() || cfDiff.isMarkedForDelete())
+        if (!cfDiff.getColumnsMap().isEmpty() || cfDiff.isMarkedForDelete())
         	return cfDiff;
         else
         	return null;
     }
 
-    private Comparator<IColumn> getComparator()
+    public AbstractType getComparator()
     {
-        return columns_.getComparator();
-    }
-
-    public ColumnComparatorFactory.ComparatorType getComparatorType()
-    {
-        return ColumnComparatorFactory.ComparatorType.NAME;
+        return (AbstractType)columns_.comparator();
     }
 
     int size()
     {
-        if ( size_.get() == 0 )
+        if (size_.get() == 0)
         {
-            Set<String> cNames = columns_.getColumns().keySet();
-            for ( String cName : cNames )
+            for (IColumn column : columns_.values())
             {
-                size_.addAndGet(columns_.get(cName).size());
+                size_.addAndGet(column.size());
             }
         }
         return size_.get();
@@ -374,7 +369,7 @@
         }
 
     	sb.append(" [");
-        sb.append(StringUtils.join(getAllColumns(), ", "));
+        sb.append(getComparator().getColumnsString(getSortedColumns()));
         sb.append("])");
 
     	return sb.toString();
@@ -382,20 +377,19 @@
 
     public byte[] digest()
     {
-    	Set<IColumn> columns = columns_.getSortedColumns();
-    	byte[] xorHash = ArrayUtils.EMPTY_BYTE_ARRAY;
-        for(IColumn column : columns)
-    	{
-    		if(xorHash.length == 0)
-    		{
-    			xorHash = column.digest();
-    		}
-    		else
-    		{
+        byte[] xorHash = ArrayUtils.EMPTY_BYTE_ARRAY;
+        for (IColumn column : columns_.values())
+        {
+            if (xorHash.length == 0)
+            {
+                xorHash = column.digest();
+            }
+            else
+            {
                 xorHash = FBUtilities.xor(xorHash, column.digest());
-    		}
-    	}
-    	return xorHash;
+            }
+        }
+        return xorHash;
     }
 
     public long getMarkedForDeleteAt()
@@ -462,11 +456,11 @@
         */
         public void serialize(ColumnFamily columnFamily, DataOutputStream dos) throws IOException
         {
-            Collection<IColumn> columns = columnFamily.getAllColumns();
+            Collection<IColumn> columns = columnFamily.getSortedColumns();
 
             dos.writeUTF(columnFamily.name());
             dos.writeUTF(columnFamily.type_);
-            dos.writeInt(columnFamily.getComparatorType().ordinal());
+            dos.writeUTF(columnFamily.getComparator().getClass().getCanonicalName());
             dos.writeInt(columnFamily.localDeletionTime);
             dos.writeLong(columnFamily.markedForDeleteAt);
 
@@ -481,7 +475,7 @@
         {
             ColumnFamily cf = new ColumnFamily(dis.readUTF(),
                                                dis.readUTF(),
-                                               ColumnComparatorFactory.ComparatorType.values()[dis.readInt()]);
+                                               readComparator(dis));
             cf.delete(dis.readInt(), dis.readLong());
             int size = dis.readInt();
             IColumn column;
@@ -492,6 +486,23 @@
             }
             return cf;
         }
+
+        private AbstractType readComparator(DataInputStream dis) throws IOException
+        {
+            String className = dis.readUTF();
+            try
+            {
+                return (AbstractType)Class.forName(className).getConstructor().newInstance();
+            }
+            catch (ClassNotFoundException e)
+            {
+                throw new RuntimeException("Unable to load comparator class '" + className + "'.  probably this means you have obsolete sstables lying around", e);
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
     }
 }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue Jul 21 01:36:52 2009
@@ -40,6 +40,7 @@
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.marshal.AbstractType;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.collections.IteratorUtils;
@@ -534,9 +535,9 @@
         // either way (tombstone or non- getting priority) would be fine,
         // but we picked this way because it makes removing delivered hints
         // easier for HintedHandoffManager.
-        for (String cname : new ArrayList<String>(cf.getColumns().keySet()))
+        for (byte[] cname : cf.getColumnNames())
         {
-            IColumn c = cf.getColumns().get(cname);
+            IColumn c = cf.getColumnsMap().get(cname);
             if (c instanceof SuperColumn)
             {
                 long minTimestamp = Math.max(c.getMarkedForDeleteAt(), cf.getMarkedForDeleteAt());
@@ -1411,7 +1412,7 @@
         return writeStats_.mean();
     }
 
-    public ColumnFamily getColumnFamily(String key, QueryPath path, String start, String finish, boolean isAscending, int limit) throws IOException
+    public ColumnFamily getColumnFamily(String key, QueryPath path, byte[] start, byte[] finish, boolean isAscending, int limit) throws IOException
     {
         return getColumnFamily(new SliceQueryFilter(key, path, start, finish, isAscending, limit));
     }
@@ -1428,14 +1429,17 @@
      */
     public ColumnFamily getColumnFamily(QueryFilter filter, int gcBefore) throws IOException
     {
+        assert columnFamily_.equals(filter.getColumnFamilyName());
+
         // if we are querying subcolumns of a supercolumn, fetch the supercolumn with NQF, then filter in-memory.
         if (filter.path.superColumnName != null)
         {
-            QueryFilter nameFilter = new NamesQueryFilter(filter.key, new QueryPath(filter.path.columnFamilyName), filter.path.superColumnName);
+            AbstractType comparator = DatabaseDescriptor.getType(table_, columnFamily_);
+            QueryFilter nameFilter = new NamesQueryFilter(filter.key, new QueryPath(columnFamily_), filter.path.superColumnName);
             ColumnFamily cf = getColumnFamily(nameFilter);
             if (cf != null)
             {
-                for (IColumn column : cf.getAllColumns())
+                for (IColumn column : cf.getSortedColumns())
                 {
                     filter.filterSuperColumn((SuperColumn) column);
                 }
@@ -1455,7 +1459,7 @@
             memtableLock_.readLock().lock();
             try
             {
-                iter = filter.getMemColumnIterator(memtable_);
+                iter = filter.getMemColumnIterator(memtable_, getComparator());
                 returnCF = iter.getColumnFamily();
             }
             finally
@@ -1468,7 +1472,7 @@
             List<Memtable> memtables = getUnflushedMemtables(filter.getColumnFamilyName());
             for (Memtable memtable:memtables)
             {
-                iter = filter.getMemColumnIterator(memtable);
+                iter = filter.getMemColumnIterator(memtable, getComparator());
                 returnCF.delete(iter.getColumnFamily());
                 iterators.add(iter);
             }
@@ -1477,7 +1481,7 @@
             List<SSTableReader> sstables = new ArrayList<SSTableReader>(ssTables_.values());
             for (SSTableReader sstable : sstables)
             {
-                iter = filter.getSSTableColumnIterator(sstable);
+                iter = filter.getSSTableColumnIterator(sstable, getComparator());
                 if (iter.hasNext()) // initializes iter.CF
                 {
                     returnCF.delete(iter.getColumnFamily());
@@ -1485,7 +1489,7 @@
                 iterators.add(iter);
             }
 
-            Comparator<IColumn> comparator = filter.getColumnComparator();
+            Comparator<IColumn> comparator = filter.getColumnComparator(getComparator());
             Iterator collated = IteratorUtils.collatedIterator(comparator, iterators);
             if (!collated.hasNext())
                 return null;
@@ -1513,6 +1517,11 @@
         }
     }
 
+    public AbstractType getComparator()
+    {
+        return DatabaseDescriptor.getType(table_, columnFamily_);
+    }
+
     /**
      * for testing.  no effort is made to clear historical memtables.
      */

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnIndexer.java Tue Jul 21 01:36:52 2009
@@ -28,11 +28,11 @@
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.io.IndexHelper;
 import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.db.marshal.AbstractType;
 
 
 /**
  * Help to create an index for a column family based on size of columns
- * Author : Karthik Ranganathan ( kranganathan@facebook.com )
  */
 
 public class ColumnIndexer
@@ -46,7 +46,7 @@
 	 */
     public static void serialize(ColumnFamily columnFamily, DataOutputStream dos) throws IOException
 	{
-        Collection<IColumn> columns = columnFamily.getAllColumns();
+        Collection<IColumn> columns = columnFamily.getSortedColumns();
         BloomFilter bf = createColumnBloomFilter(columns);                    
         /* Write out the bloom filter. */
         DataOutputBuffer bufOut = new DataOutputBuffer(); 
@@ -57,7 +57,7 @@
         dos.write(bufOut.getData(), 0, bufOut.getLength());
 
         /* Do the indexing */
-        doIndexing(columnFamily.getComparatorType(), columns, dos);
+        doIndexing(columnFamily.getComparator(), columns, dos);
 	}
     
     /**
@@ -69,20 +69,20 @@
     private static BloomFilter createColumnBloomFilter(Collection<IColumn> columns)
     {
         int columnCount = 0;
-        for ( IColumn column : columns )
+        for (IColumn column : columns)
         {
             columnCount += column.getObjectCount();
         }
-        
+
         BloomFilter bf = new BloomFilter(columnCount, 4);
-        for ( IColumn column : columns )
+        for (IColumn column : columns)
         {
             bf.add(column.name());
             /* If this is SuperColumn type Column Family we need to get the subColumns too. */
-            if ( column instanceof SuperColumn )
+            if (column instanceof SuperColumn)
             {
                 Collection<IColumn> subColumns = column.getSubColumns();
-                for ( IColumn subColumn : subColumns )
+                for (IColumn subColumn : subColumns)
                 {
                     bf.add(subColumn.name());
                 }
@@ -90,17 +90,6 @@
         }
         return bf;
     }
-    
-    private static IndexHelper.ColumnIndexInfo getColumnIndexInfo(ColumnComparatorFactory.ComparatorType typeInfo, IColumn column)
-    {
-        IndexHelper.ColumnIndexInfo cIndexInfo = null;
-        
-        cIndexInfo = IndexHelper.ColumnIndexFactory.instance(typeInfo);
-        cIndexInfo.set(typeInfo == ColumnComparatorFactory.ComparatorType.NAME
-                       ? column.name() : column.timestamp());
-
-        return cIndexInfo;
-    }
 
     /**
      * Given the collection of columns in the Column Family,
@@ -111,7 +100,7 @@
      *            to be written.
      * @throws IOException
      */
-    private static void doIndexing(ColumnComparatorFactory.ComparatorType typeInfo, Collection<IColumn> columns, DataOutputStream dos) throws IOException
+    private static void doIndexing(AbstractType comparator, Collection<IColumn> columns, DataOutputStream dos) throws IOException
     {
         /* we are going to write column indexes */
         int numColumns = 0;
@@ -139,7 +128,7 @@
                  * SuperColumn always use the name indexing scheme for
                  * the SuperColumns. We will fix this later.
                  */
-                IndexHelper.ColumnIndexInfo cIndexInfo = getColumnIndexInfo(typeInfo, column);                
+                IndexHelper.ColumnIndexInfo cIndexInfo = new IndexHelper.ColumnIndexInfo(column.name(), 0, 0, comparator);
                 cIndexInfo.position(position);
                 cIndexInfo.count(numColumns);                
                 columnIndexList.add(cIndexInfo);

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java?rev=796108&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java Tue Jul 21 01:36:52 2009
@@ -0,0 +1,45 @@
+package org.apache.cassandra.db;
+
+import java.io.*;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class ColumnSerializer implements ICompactSerializer<IColumn>
+{
+    public static void writeName(byte[] name, DataOutput out) throws IOException
+    {
+        int length = name.length;
+        assert length <= 65535;
+        out.writeByte((length >> 8) & 0xFF);
+        out.writeByte(length & 0xFF);
+        out.write(name);
+    }
+
+    public static byte[] readName(DataInput in) throws IOException
+    {
+        int length = 0;
+        length |= (in.readByte() << 8);
+        length |= in.readByte();
+        byte[] bytes = new byte[length];
+        in.readFully(bytes);
+        return bytes;
+    }
+
+    public void serialize(IColumn column, DataOutputStream dos) throws IOException
+    {
+        ColumnSerializer.writeName(column.name(), dos);
+        dos.writeBoolean(column.isMarkedForDelete());
+        dos.writeLong(column.timestamp());
+        FBUtilities.writeByteArray(column.value(), dos);
+    }
+
+    public Column deserialize(DataInputStream dis) throws IOException
+    {
+        byte[] name = ColumnSerializer.readName(dis);
+        boolean delete = dis.readBoolean();
+        long ts = dis.readLong();
+        byte[] value = FBUtilities.readByteArray(dis);
+        return new Column(name, value, ts, delete);
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Tue Jul 21 01:36:52 2009
@@ -101,7 +101,7 @@
         return quorumResponseHandler.get();
     }
 
-    private static void deleteEndPoint(String endpointAddress, String tableName, String key, long timestamp) throws IOException
+    private static void deleteEndPoint(byte[] endpointAddress, String tableName, byte[] key, long timestamp) throws IOException
     {
         RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, tableName);
         rm.delete(new QueryPath(HINTS_CF, key, endpointAddress), timestamp);
@@ -124,16 +124,15 @@
         Collection<ColumnFamily> cfs = row.getColumnFamilies();
         for (ColumnFamily cf : cfs)
         {
-            Set<IColumn> columns = cf.getAllColumns();
             long maxTS = Long.MIN_VALUE;
             if (!cf.isSuper())
             {
-                for (IColumn col : columns)
+                for (IColumn col : cf.getSortedColumns())
                     maxTS = Math.max(maxTS, col.timestamp());
             }
             else
             {
-                for (IColumn col : columns)
+                for (IColumn col : cf.getSortedColumns())
                 {
                     maxTS = Math.max(maxTS, col.timestamp());
                     Collection<IColumn> subColumns = col.getSubColumns();
@@ -166,15 +165,17 @@
             {
                 continue;
             }
-            Collection<IColumn> keys = hintColumnFamily.getAllColumns();
+            Collection<IColumn> keys = hintColumnFamily.getSortedColumns();
 
             for (IColumn keyColumn : keys)
             {
                 Collection<IColumn> endpoints = keyColumn.getSubColumns();
+                String keyStr = new String(keyColumn.name(), "UTF-8");
                 int deleted = 0;
                 for (IColumn endpoint : endpoints)
                 {
-                    if (sendMessage(endpoint.name(), tableName, keyColumn.name()))
+                    String endpointStr = new String(endpoint.name(), "UTF-8");
+                    if (sendMessage(endpointStr, tableName, keyStr))
                     {
                         deleteEndPoint(endpoint.name(), tableName, keyColumn.name(), keyColumn.timestamp());
                         deleted++;
@@ -182,7 +183,7 @@
                 }
                 if (deleted == endpoints.size())
                 {
-                    deleteHintedData(tableName, keyColumn.name());
+                    deleteHintedData(tableName, keyStr);
                 }
             }
         }
@@ -198,6 +199,7 @@
         if (logger_.isDebugEnabled())
           logger_.debug("Started hinted handoff for endPoint " + endPoint.getHost());
 
+        String targetEPBytes = endPoint.getHost();
         // 1. Scan through all the keys that we need to handoff
         // 2. For each key read the list of recipients if the endpoint matches send
         // 3. Delete that recipient from the key if write was successful
@@ -209,19 +211,20 @@
             {
                 continue;
             }
-            Collection<IColumn> keys = hintedColumnFamily.getAllColumns();
+            Collection<IColumn> keys = hintedColumnFamily.getSortedColumns();
 
             for (IColumn keyColumn : keys)
             {
+                String keyStr = new String(keyColumn.name(), "UTF-8");
                 Collection<IColumn> endpoints = keyColumn.getSubColumns();
-                for (IColumn endpoint : endpoints)
+                for (IColumn hintEndPoint : endpoints)
                 {
-                    if (endpoint.name().equals(endPoint.getHost()) && sendMessage(endpoint.name(), null, keyColumn.name()))
+                    if (hintEndPoint.name().equals(targetEPBytes) && sendMessage(endPoint.getHost(), null, keyStr))
                     {
-                        deleteEndPoint(endpoint.name(), tableName, keyColumn.name(), keyColumn.timestamp());
+                        deleteEndPoint(hintEndPoint.name(), tableName, keyColumn.name(), keyColumn.timestamp());
                         if (endpoints.size() == 1)
                         {
-                            deleteHintedData(tableName, keyColumn.name());
+                            deleteHintedData(tableName, keyStr);
                         }
                     }
                 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java Tue Jul 21 01:36:52 2009
@@ -19,7 +19,8 @@
 package org.apache.cassandra.db;
 
 import java.util.Collection;
-import java.util.Map;
+
+import org.apache.cassandra.db.marshal.AbstractType;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -30,18 +31,19 @@
     public static short UtfPrefix_ = 2;
     public boolean isMarkedForDelete();
     public long getMarkedForDeleteAt();
-    public String name();
+    public byte[] name();
     public int size();
     public int serializedSize();
     public long timestamp();
-    public long timestamp(String key);
+    public long timestamp(byte[] columnName);
     public byte[] value();
-    public byte[] value(String key);
+    public byte[] value(byte[] columnName);
     public Collection<IColumn> getSubColumns();
-    public IColumn getSubColumn(String columnName);
+    public IColumn getSubColumn(byte[] columnName);
     public void addColumn(IColumn column);
     public IColumn diff(IColumn column);
     public int getObjectCount();
     public byte[] digest();
     public int getLocalDeletionTime(); // for tombstone GC, so int is sufficient granularity
+    public String getString(AbstractType comparator);
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Tue Jul 21 01:36:52 2009
@@ -33,6 +33,7 @@
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.DestructivePQIterator;
 import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.marshal.AbstractType;
 
 import org.apache.log4j.Logger;
 
@@ -279,12 +280,12 @@
     /**
      * obtain an iterator of columns in this memtable in the specified order starting from a given column.
      */
-    public ColumnIterator getSliceIterator(SliceQueryFilter filter)
+    public ColumnIterator getSliceIterator(SliceQueryFilter filter, AbstractType typeComparator)
     {
         ColumnFamily cf = columnFamilies_.get(filter.key);
         final ColumnFamily columnFamily = cf == null ? ColumnFamily.create(table_, filter.getColumnFamilyName()) : cf.cloneMeShallow();
 
-        final IColumn columns[] = (cf == null ? columnFamily : cf).getAllColumns().toArray(new IColumn[columnFamily.getAllColumns().size()]);
+        final IColumn columns[] = (cf == null ? columnFamily : cf).getSortedColumns().toArray(new IColumn[columnFamily.getSortedColumns().size()]);
         // TODO if we are dealing with supercolumns, we need to clone them while we have the read lock since they can be modified later
         if (!filter.isAscending)
             ArrayUtils.reverse(columns);
@@ -296,7 +297,7 @@
 
         // can't use a ColumnComparatorFactory comparator since those compare on both name and time (and thus will fail to match
         // our dummy column, since the time there is arbitrary).
-        Comparator<IColumn> comparator = filter.getColumnComparator();
+        Comparator<IColumn> comparator = filter.getColumnComparator(typeComparator);
         int index = Arrays.binarySearch(columns, startIColumn, comparator);
         final int startIndex = index < 0 ? -(index + 1) : index;
 
@@ -328,8 +329,8 @@
 
         return new SimpleAbstractColumnIterator()
         {
-            private Iterator<String> iter = filter.columns.iterator();
-            private String current;
+            private Iterator<byte[]> iter = filter.columns.iterator();
+            private byte[] current;
 
             public ColumnFamily getColumnFamily()
             {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java Tue Jul 21 01:36:52 2009
@@ -28,6 +28,8 @@
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.config.DatabaseDescriptor;
 
 
 public abstract class ReadCommand
@@ -80,6 +82,11 @@
     public abstract ReadCommand copy();
 
     public abstract Row getRow(Table table) throws IOException;
+
+    protected AbstractType getComparator()
+    {
+        return DatabaseDescriptor.getType(table, getColumnFamilyName());
+    }
 }
 
 class ReadCommandSerializer implements ICompactSerializer<ReadCommand>

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Tue Jul 21 01:36:52 2009
@@ -40,8 +40,10 @@
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.BatchMutationSuper;
 import org.apache.cassandra.service.BatchMutation;
+import org.apache.cassandra.service.InvalidRequestException;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.marshal.MarshalException;
 
 
 /**
@@ -67,11 +69,6 @@
     private String key_;
     protected Map<String, ColumnFamily> modifications_ = new HashMap<String, ColumnFamily>();
 
-    /* Ctor for JAXB */
-    private RowMutation()
-    {
-    }
-
     public RowMutation(String table, String key)
     {
         table_ = table;
@@ -110,9 +107,9 @@
         return modifications_.keySet();
     }
 
-    void addHints(String hint) throws IOException
+    void addHints(String key, String host) throws IOException
     {
-        QueryPath path = new QueryPath(HintedHandOffManager.HINTS_CF, null, hint);
+        QueryPath path = new QueryPath(HintedHandOffManager.HINTS_CF, key.getBytes("UTF-8"), host.getBytes("UTF-8"));
         add(path, ArrayUtils.EMPTY_BYTE_ARRAY, System.currentTimeMillis());
     }
 
@@ -261,7 +258,7 @@
         return rm;
     }
 
-    public static RowMutation getRowMutation(String table, BatchMutationSuper batchMutationSuper)
+    public static RowMutation getRowMutation(String table, BatchMutationSuper batchMutationSuper) throws InvalidRequestException
     {
         RowMutation rm = new RowMutation(table, batchMutationSuper.key.trim());
         for (String cfName : batchMutationSuper.cfmap.keySet())
@@ -270,7 +267,14 @@
             {
                 for (org.apache.cassandra.service.Column column : super_column.columns)
                 {
-                    rm.add(new QueryPath(cfName, super_column.name, column.name), column.value, column.timestamp);
+                    try
+                    {
+                        rm.add(new QueryPath(cfName, super_column.name, column.name), column.value, column.timestamp);
+                    }
+                    catch (MarshalException e)
+                    {
+                        throw new InvalidRequestException(e.getMessage());
+                    }
                 }
             }
         }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Tue Jul 21 01:36:52 2009
@@ -73,7 +73,7 @@
                   logger_.debug("Adding hint for " + hint);
                 /* add necessary hints to this mutation */
                 RowMutation hintedMutation = new RowMutation(Table.SYSTEM_TABLE, rm.table());
-                hintedMutation.addHints(rm.key() + ":" + hint.getHost());
+                hintedMutation.addHints(rm.key(), hint.getHost());
                 hintedMutation.apply();
             }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Scanner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Scanner.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Scanner.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Scanner.java Tue Jul 21 01:36:52 2009
@@ -21,8 +21,6 @@
 import java.util.*;
 import java.io.IOException;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
-
 
 /**
  * This class is used to loop through a retrieved column family
@@ -66,7 +64,7 @@
             ColumnFamily columnFamily = table.get(key, cf);
             if ( columnFamily != null )
             {
-                Collection<IColumn> columns = columnFamily.getAllColumns();            
+                Collection<IColumn> columns = columnFamily.getSortedColumns();
                 columnIt_ = columns.iterator();
             }
         }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java Tue Jul 21 01:36:52 2009
@@ -27,22 +27,25 @@
 import org.apache.cassandra.service.ColumnParent;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.config.DatabaseDescriptor;
 
 public class SliceByNamesReadCommand extends ReadCommand
 {
     public final QueryPath columnParent;
-    public final SortedSet<String> columnNames;
+    public final SortedSet<byte[]> columnNames;
 
-    public SliceByNamesReadCommand(String table, String key, ColumnParent column_parent, Collection<String> columnNames)
+    public SliceByNamesReadCommand(String table, String key, ColumnParent column_parent, Collection<byte[]> columnNames)
     {
         this(table, key, new QueryPath(column_parent), columnNames);
     }
 
-    public SliceByNamesReadCommand(String table, String key, QueryPath path, Collection<String> columnNames)
+    public SliceByNamesReadCommand(String table, String key, QueryPath path, Collection<byte[]> columnNames)
     {
         super(table, key, CMD_TYPE_GET_SLICE_BY_NAMES);
         this.columnParent = path;
-        this.columnNames = new TreeSet<String>(columnNames);
+        this.columnNames = new TreeSet<byte[]>(getComparator());
+        this.columnNames.addAll(columnNames);
     }
 
     @Override
@@ -72,7 +75,7 @@
                "table='" + table + '\'' +
                ", key='" + key + '\'' +
                ", columnParent='" + columnParent + '\'' +
-               ", columns=[" + StringUtils.join(columnNames, ", ") + "]" +
+               ", columns=[" + getComparator().getString(columnNames) + "]" +
                ')';
     }
 
@@ -91,10 +94,9 @@
         dos.writeInt(realRM.columnNames.size());
         if (realRM.columnNames.size() > 0)
         {
-            for (String cName : realRM.columnNames)
+            for (byte[] cName : realRM.columnNames)
             {
-                dos.writeInt(cName.getBytes().length);
-                dos.write(cName.getBytes());
+                ColumnSerializer.writeName(cName, dos);
             }
         }
     }
@@ -108,12 +110,10 @@
         QueryPath columnParent = QueryPath.deserialize(dis);
 
         int size = dis.readInt();
-        List<String> columns = new ArrayList<String>();
+        List<byte[]> columns = new ArrayList<byte[]>();
         for (int i = 0; i < size; ++i)
         {
-            byte[] bytes = new byte[dis.readInt()];
-            dis.readFully(bytes);
-            columns.add(new String(bytes));
+            columns.add(ColumnSerializer.readName(dis));
         }
         SliceByNamesReadCommand rm = new SliceByNamesReadCommand(table, key, columnParent, columns);
         rm.setDigestQuery(isDigest);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceFromReadCommand.java Tue Jul 21 01:36:52 2009
@@ -28,16 +28,16 @@
 public class SliceFromReadCommand extends ReadCommand
 {
     public final QueryPath column_parent;
-    public final String start, finish;
+    public final byte[] start, finish;
     public final boolean isAscending;
     public final int count;
 
-    public SliceFromReadCommand(String table, String key, ColumnParent column_parent, String start, String finish, boolean isAscending, int count)
+    public SliceFromReadCommand(String table, String key, ColumnParent column_parent, byte[] start, byte[] finish, boolean isAscending, int count)
     {
         this(table, key, new QueryPath(column_parent), start, finish, isAscending, count);
     }
 
-    public SliceFromReadCommand(String table, String key, QueryPath columnParent, String start, String finish, boolean isAscending, int count)
+    public SliceFromReadCommand(String table, String key, QueryPath columnParent, byte[] start, byte[] finish, boolean isAscending, int count)
     {
         super(table, key, CMD_TYPE_GET_SLICE);
         this.column_parent = columnParent;
@@ -74,8 +74,8 @@
                "table='" + table + '\'' +
                ", key='" + key + '\'' +
                ", column_parent='" + column_parent + '\'' +
-               ", start='" + start + '\'' +
-               ", finish='" + finish + '\'' +
+               ", start='" + getComparator().getString(start) + '\'' +
+               ", finish='" + getComparator().getString(finish) + '\'' +
                ", isAscending=" + isAscending +
                ", count=" + count +
                ')';
@@ -92,8 +92,8 @@
         dos.writeUTF(realRM.table);
         dos.writeUTF(realRM.key);
         realRM.column_parent.serialize(dos);
-        dos.writeUTF(realRM.start);
-        dos.writeUTF(realRM.finish);
+        ColumnSerializer.writeName(realRM.start, dos);
+        ColumnSerializer.writeName(realRM.finish, dos);
         dos.writeBoolean(realRM.isAscending);
         dos.writeInt(realRM.count);
     }
@@ -102,7 +102,13 @@
     public ReadCommand deserialize(DataInputStream dis) throws IOException
     {
         boolean isDigest = dis.readBoolean();
-        SliceFromReadCommand rm = new SliceFromReadCommand(dis.readUTF(), dis.readUTF(), QueryPath.deserialize(dis), dis.readUTF(), dis.readUTF(), dis.readBoolean(), dis.readInt());
+        SliceFromReadCommand rm = new SliceFromReadCommand(dis.readUTF(),
+                                                           dis.readUTF(),
+                                                           QueryPath.deserialize(dis),
+                                                           ColumnSerializer.readName(dis),
+                                                           ColumnSerializer.readName(dis),
+                                                           dis.readBoolean(), 
+                                                           dis.readInt());
         rm.setDigestQuery(isDigest);
         return rm;
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Tue Jul 21 01:36:52 2009
@@ -23,17 +23,18 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collection;
-import java.util.Set;
-import java.util.Map;
+import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.ConcurrentSkipListMap;
 
 import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
 
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.MarshalException;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -49,19 +50,21 @@
         return serializer_;
     }
 
-	private String name_;
-    private ConcurrentSkipListMap<String, IColumn> columns_ = new ConcurrentSkipListMap<String, IColumn>();
+    private byte[] name_;
+    // TODO make subcolumn comparator configurable
+    private ConcurrentSkipListMap<byte[], IColumn> columns_ = new ConcurrentSkipListMap<byte[], IColumn>(new LongType());
     private int localDeletionTime = Integer.MIN_VALUE;
 	private long markedForDeleteAt = Long.MIN_VALUE;
     private AtomicInteger size_ = new AtomicInteger(0);
 
-    SuperColumn()
+    SuperColumn(byte[] name)
     {
+    	name_ = name;
     }
 
-    SuperColumn(String name)
+    public AbstractType getComparator()
     {
-    	name_ = name;
+        return (AbstractType)columns_.comparator();
     }
 
     public SuperColumn cloneMeShallow()
@@ -76,7 +79,7 @@
 		return markedForDeleteAt > Long.MIN_VALUE;
 	}
 
-    public String name()
+    public byte[] name()
     {
     	return name_;
     }
@@ -86,19 +89,13 @@
     	return columns_.values();
     }
 
-    public IColumn getSubColumn(String columnName)
+    public IColumn getSubColumn(byte[] columnName)
     {
         IColumn column = columns_.get(columnName);
         assert column == null || column instanceof Column;
         return column;
     }
 
-    public int compareTo(IColumn superColumn)
-    {
-        return (name_.compareTo(superColumn.name()));
-    }
-
-
     public int size()
     {
         /*
@@ -133,7 +130,7 @@
     	 * We need to keep the way we are calculating the column size in sync with the
     	 * way we are calculating the size for the column family serializer.
     	 */
-    	return IColumn.UtfPrefix_ + FBUtilities.getUTF8Length(name_) + DBConstants.boolSize_ + DBConstants.intSize_ + DBConstants.intSize_ + getSizeOfAllColumns();
+    	return IColumn.UtfPrefix_ + name_.length + DBConstants.boolSize_ + DBConstants.intSize_ + DBConstants.intSize_ + getSizeOfAllColumns();
     }
 
     /**
@@ -150,7 +147,7 @@
         return size;
     }
 
-    public void remove(String columnName)
+    public void remove(byte[] columnName)
     {
     	columns_.remove(columnName);
     }
@@ -160,9 +157,9 @@
     	throw new UnsupportedOperationException("This operation is not supported for Super Columns.");
     }
 
-    public long timestamp(String key)
+    public long timestamp(byte[] columnName)
     {
-    	IColumn column = columns_.get(key);
+    	IColumn column = columns_.get(columnName);
     	if ( column instanceof SuperColumn )
     		throw new UnsupportedOperationException("A super column cannot hold other super columns.");
     	if ( column != null )
@@ -175,9 +172,9 @@
     	throw new UnsupportedOperationException("This operation is not supported for Super Columns.");
     }
 
-    public byte[] value(String key)
+    public byte[] value(byte[] columnName)
     {
-    	IColumn column = columns_.get(key);
+    	IColumn column = columns_.get(columnName);
     	if ( column != null )
     		return column.value();
     	throw new IllegalArgumentException("Value was requested for a column that does not exist.");
@@ -187,6 +184,14 @@
     {
     	if (!(column instanceof Column))
     		throw new UnsupportedOperationException("A super column can only contain simple columns.");
+        try
+        {
+            getComparator().validate(column.name());
+        }
+        catch (Exception e)
+        {
+            throw new MarshalException("Invalid column name in supercolumn for " + getComparator().getClass().getName());
+        }
     	IColumn oldColumn = columns_.get(column.name());
     	if ( oldColumn == null )
         {
@@ -213,10 +218,14 @@
      */
     public void putColumn(IColumn column)
     {
-    	if ( !(column instanceof SuperColumn))
-    		throw new UnsupportedOperationException("Only Super column objects should be put here");
-    	if( !name_.equals(column.name()))
-    		throw new IllegalArgumentException("The name should match the name of the current column or super column");
+        if (!(column instanceof SuperColumn))
+        {
+            throw new UnsupportedOperationException("Only Super column objects should be put here");
+        }
+        if (!Arrays.equals(name_, column.name()))
+        {
+            throw new IllegalArgumentException("The name should match the name of the current column or super column");
+        }
 
         for (IColumn subColumn : column.getSubColumns())
         {
@@ -281,7 +290,7 @@
     	byte[] xorHash = ArrayUtils.EMPTY_BYTE_ARRAY;
     	if(name_ == null)
     		return xorHash;
-    	xorHash = name_.getBytes();
+    	xorHash = name_.clone();
     	for(IColumn column : columns_.values())
     	{
 			xorHash = FBUtilities.xor(xorHash, column.digest());
@@ -289,19 +298,18 @@
     	return xorHash;
     }
 
-
-    public String toString()
+    public String getString(AbstractType comparator)
     {
     	StringBuilder sb = new StringBuilder();
         sb.append("SuperColumn(");
-    	sb.append(name_);
+    	sb.append(comparator.getString(name_));
 
         if (isMarkedForDelete()) {
-            sb.append(" -delete at " + getMarkedForDeleteAt() + "-");
+            sb.append(" -delete at ").append(getMarkedForDeleteAt()).append("-");
         }
 
         sb.append(" [");
-        sb.append(StringUtils.join(getSubColumns(), ", "));
+        sb.append(getComparator().getColumnsString(columns_.values()));
         sb.append("])");
 
         return sb.toString();
@@ -324,7 +332,7 @@
     public void serialize(IColumn column, DataOutputStream dos) throws IOException
     {
     	SuperColumn superColumn = (SuperColumn)column;
-        dos.writeUTF(superColumn.name());
+        ColumnSerializer.writeName(column.name(), dos);
         dos.writeInt(superColumn.getLocalDeletionTime());
         dos.writeLong(superColumn.getMarkedForDeleteAt());
 
@@ -341,7 +349,7 @@
 
     public IColumn deserialize(DataInputStream dis) throws IOException
     {
-        String name = dis.readUTF();
+        byte[] name = ColumnSerializer.readName(dis);
         SuperColumn superColumn = new SuperColumn(name);
         superColumn.markForDeleteAt(dis.readInt(), dis.readLong());
         assert dis.available() > 0;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java Tue Jul 21 01:36:52 2009
@@ -19,6 +19,7 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 
 import org.apache.log4j.Logger;
 
@@ -26,9 +27,9 @@
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.utils.BasicUtilities;
-import org.apache.cassandra.db.filter.IdentityQueryFilter;
 import org.apache.cassandra.db.filter.NamesQueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.filter.QueryFilter;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -39,8 +40,20 @@
     private static Logger logger_ = Logger.getLogger(SystemTable.class);
     public static final String LOCATION_CF = "LocationInfo";
     private static final String LOCATION_KEY = "L"; // only one row in Location CF
-    private static final String TOKEN = "Token";
-    private static final String GENERATION = "Generation";
+    private static final byte[] TOKEN = utf8("Token");
+    private static final byte[] GENERATION = utf8("Generation");
+
+    private static byte[] utf8(String str)
+    {
+        try
+        {
+            return str.getBytes("UTF-8");
+        }
+        catch (UnsupportedEncodingException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
 
     /*
      * This method is used to update the SystemTable with the new token.
@@ -50,7 +63,8 @@
         IPartitioner p = StorageService.getPartitioner();
         Table table = Table.open(Table.SYSTEM_TABLE);
         /* Retrieve the "LocationInfo" column family */
-        ColumnFamily cf = table.getColumnFamilyStore(LOCATION_CF).getColumnFamily(new NamesQueryFilter(LOCATION_KEY, new QueryPath(LOCATION_KEY), TOKEN));
+        QueryFilter filter = new NamesQueryFilter(LOCATION_KEY, new QueryPath(LOCATION_CF), TOKEN);
+        ColumnFamily cf = table.getColumnFamilyStore(LOCATION_CF).getColumnFamily(filter);
         long oldTokenColumnTimestamp = cf.getColumn(SystemTable.TOKEN).timestamp();
         /* create the "Token" whose value is the new token. */
         IColumn tokenColumn = new Column(SystemTable.TOKEN, p.getTokenFactory().toByteArray(token), oldTokenColumnTimestamp + 1);
@@ -74,7 +88,8 @@
     {
         /* Read the system table to retrieve the storage ID and the generation */
         Table table = Table.open(Table.SYSTEM_TABLE);
-        ColumnFamily cf = table.getColumnFamilyStore(LOCATION_CF).getColumnFamily(new NamesQueryFilter(LOCATION_KEY, new QueryPath(LOCATION_KEY), GENERATION));
+        QueryFilter filter = new NamesQueryFilter(LOCATION_KEY, new QueryPath(LOCATION_CF), GENERATION);
+        ColumnFamily cf = table.getColumnFamilyStore(LOCATION_CF).getColumnFamily(filter);
 
         IPartitioner p = StorageService.getPartitioner();
         if (cf == null)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Tue Jul 21 01:36:52 2009
@@ -27,6 +27,7 @@
 
 import org.apache.commons.collections.IteratorUtils;
 import org.apache.commons.collections.Predicate;
+import org.apache.commons.lang.ArrayUtils;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.BootstrapInitiateMessage;
@@ -591,7 +592,7 @@
                 
         for (ColumnFamily columnFamily : row.getColumnFamilies())
         {
-            Collection<IColumn> columns = columnFamily.getAllColumns();
+            Collection<IColumn> columns = columnFamily.getSortedColumns();
             for(IColumn column : columns)
             {
                 ColumnFamilyStore cfStore = columnFamilyStores_.get(column.name());
@@ -704,7 +705,8 @@
                 }
                 // make sure there is actually non-tombstone content associated w/ this key
                 // TODO record the key source(s) somehow and only check that source (e.g., memtable or sstable)
-                if (cfs.getColumnFamily(new SliceQueryFilter(current, new QueryPath(cfName), "", "", true, 1), Integer.MAX_VALUE) != null)
+                QueryFilter filter = new SliceQueryFilter(current, new QueryPath(cfName), ArrayUtils.EMPTY_BYTE_ARRAY, ArrayUtils.EMPTY_BYTE_ARRAY, true, 1);
+                if (cfs.getColumnFamily(filter, Integer.MAX_VALUE) != null)
                 {
                     keys.add(current);
                 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/IdentityQueryFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/IdentityQueryFilter.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/IdentityQueryFilter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/IdentityQueryFilter.java Tue Jul 21 01:36:52 2009
@@ -1,5 +1,7 @@
 package org.apache.cassandra.db.filter;
 
+import org.apache.commons.lang.ArrayUtils;
+
 import org.apache.cassandra.db.SuperColumn;
 
 public class IdentityQueryFilter extends SliceQueryFilter
@@ -9,7 +11,7 @@
      */
     public IdentityQueryFilter(String key, QueryPath path)
     {
-        super(key, path, "", "", true, Integer.MAX_VALUE);
+        super(key, path, ArrayUtils.EMPTY_BYTE_ARRAY, ArrayUtils.EMPTY_BYTE_ARRAY, true, Integer.MAX_VALUE);
     }
 
     @Override

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java Tue Jul 21 01:36:52 2009
@@ -4,36 +4,51 @@
 import java.util.SortedSet;
 import java.util.Arrays;
 import java.util.TreeSet;
+import java.util.Comparator;
 
 import org.apache.cassandra.io.SSTableReader;
 import org.apache.cassandra.utils.ReducingIterator;
-import org.apache.cassandra.db.filter.ColumnIterator;
 import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.SuperColumn;
+import org.apache.cassandra.db.marshal.AbstractType;
 
 public class NamesQueryFilter extends QueryFilter
 {
-    public final SortedSet<String> columns;
+    public final SortedSet<byte[]> columns;
 
-    public NamesQueryFilter(String key, QueryPath columnParent, SortedSet<String> columns)
+    public NamesQueryFilter(String key, QueryPath columnParent, SortedSet<byte[]> columns)
     {
         super(key, columnParent);
         this.columns = columns;
     }
 
-    public NamesQueryFilter(String key, QueryPath columnParent, String column)
+    public NamesQueryFilter(String key, QueryPath columnParent, byte[] column)
     {
-        this(key, columnParent, new TreeSet<String>(Arrays.asList(column)));
+        this(key, columnParent, getSingleColumnSet(column));
     }
 
-    public ColumnIterator getMemColumnIterator(Memtable memtable)
+    private static TreeSet<byte[]> getSingleColumnSet(byte[] column)
+    {
+        Comparator<byte[]> singleColumnComparator = new Comparator<byte[]>()
+        {
+            public int compare(byte[] o1, byte[] o2)
+            {
+                return Arrays.equals(o1, o2) ? 0 : -1;
+            }
+        };
+        TreeSet<byte[]> set = new TreeSet<byte[]>(singleColumnComparator);
+        set.add(column);
+        return set;
+    }
+
+    public ColumnIterator getMemColumnIterator(Memtable memtable, AbstractType comparator)
     {
         return memtable.getNamesIterator(this);
     }
 
-    public ColumnIterator getSSTableColumnIterator(SSTableReader sstable) throws IOException
+    public ColumnIterator getSSTableColumnIterator(SSTableReader sstable, AbstractType comparator) throws IOException
     {
         return new SSTableNamesIterator(sstable.getFilename(), key, getColumnFamilyName(), columns);
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java Tue Jul 21 01:36:52 2009
@@ -3,10 +3,12 @@
 import java.io.IOException;
 import java.util.Comparator;
 import java.util.Iterator;
+import java.util.Arrays;
 
 import org.apache.cassandra.io.SSTableReader;
 import org.apache.cassandra.utils.ReducingIterator;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.AbstractType;
 
 public abstract class QueryFilter
 {
@@ -23,13 +25,13 @@
      * returns an iterator that returns columns from the given memtable
      * matching the Filter criteria in sorted order.
      */
-    public abstract ColumnIterator getMemColumnIterator(Memtable memtable);
+    public abstract ColumnIterator getMemColumnIterator(Memtable memtable, AbstractType comparator);
 
     /**
      * returns an iterator that returns columns from the given SSTable
      * matching the Filter criteria in sorted order.
      */
-    public abstract ColumnIterator getSSTableColumnIterator(SSTableReader sstable) throws IOException;
+    public abstract ColumnIterator getSSTableColumnIterator(SSTableReader sstable, AbstractType comparator) throws IOException;
 
     /**
      * collects columns from reducedColumns into returnCF.  Termination is determined
@@ -44,13 +46,13 @@
      */
     public abstract void filterSuperColumn(SuperColumn superColumn);
 
-    public Comparator<IColumn> getColumnComparator()
+    public Comparator<IColumn> getColumnComparator(final AbstractType comparator)
     {
         return new Comparator<IColumn>()
         {
             public int compare(IColumn c1, IColumn c2)
             {
-                return c1.name().compareTo(c2.name());
+                return comparator.compare(c1.name(), c2.name());
             }
         };
     }
@@ -63,9 +65,9 @@
         {
             ColumnFamily curCF = returnCF.cloneMeShallow();
 
-            protected Object getKey(IColumn o)
+            protected boolean isEqual(IColumn o1, IColumn o2)
             {
-                return o == null ? null : o.name();
+                return Arrays.equals(o1.name(), o2.name());
             }
 
             public void reduce(IColumn current)
@@ -75,7 +77,7 @@
 
             protected IColumn getReduced()
             {
-                IColumn c = curCF.getAllColumns().first();
+                IColumn c = curCF.getSortedColumns().iterator().next();
                 curCF.clear();
                 return c;
             }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryPath.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryPath.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryPath.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryPath.java Tue Jul 21 01:36:52 2009
@@ -4,23 +4,22 @@
 import java.io.IOException;
 import java.io.DataInputStream;
 
+import org.apache.commons.lang.ArrayUtils;
+
 import org.apache.cassandra.service.ColumnParent;
 import org.apache.cassandra.service.ColumnPath;
 import org.apache.cassandra.service.ColumnPathOrParent;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.db.ColumnSerializer;
 
 public class QueryPath
 {
     public final String columnFamilyName;
-    public final String superColumnName;
-    public final String columnName;
+    public final byte[] superColumnName;
+    public final byte[] columnName;
 
-    public QueryPath(String columnFamilyName, String superColumnName, String columnName)
+    public QueryPath(String columnFamilyName, byte[] superColumnName, byte[] columnName)
     {
-        // TODO remove these when we're sure the last vestiges of the old api are gone
-        assert columnFamilyName == null || !columnFamilyName.contains(":");
-        assert superColumnName == null || !superColumnName.contains(":");
-        assert columnName == null || !columnName.contains(":");
-
         this.columnFamilyName = columnFamilyName;
         this.superColumnName = superColumnName;
         this.columnName = columnName;
@@ -31,7 +30,7 @@
         this(columnParent.column_family, columnParent.super_column, null);
     }
 
-    public QueryPath(String columnFamilyName, String superColumnName)
+    public QueryPath(String columnFamilyName, byte[] superColumnName)
     {
         this(columnFamilyName, superColumnName, null);
     }
@@ -51,7 +50,7 @@
         this(column_path_or_parent.column_family, column_path_or_parent.super_column, column_path_or_parent.column);
     }
 
-    public static QueryPath column(String columnName)
+    public static QueryPath column(byte[] columnName)
     {
         return new QueryPath(null, null, columnName);
     }
@@ -69,18 +68,18 @@
     public void serialize(DataOutputStream dos) throws IOException
     {
         assert !"".equals(columnFamilyName);
-        assert !"".equals(superColumnName);
-        assert !"".equals(columnName);
+        assert superColumnName == null || superColumnName.length > 0;
+        assert columnName == null || columnName.length > 0;
         dos.writeUTF(columnFamilyName == null ? "" : columnFamilyName);
-        dos.writeUTF(superColumnName == null ? "" : superColumnName);
-        dos.writeUTF(columnName == null ? "" : columnName);
+        ColumnSerializer.writeName(superColumnName == null ? ArrayUtils.EMPTY_BYTE_ARRAY : superColumnName, dos);
+        ColumnSerializer.writeName(columnName == null ? ArrayUtils.EMPTY_BYTE_ARRAY : columnName, dos);
     }
 
     public static QueryPath deserialize(DataInputStream din) throws IOException
     {
         String cfName = din.readUTF();
-        String scName = din.readUTF();
-        String cName = din.readUTF();
-        return new QueryPath(cfName.isEmpty() ? null : cfName, scName.isEmpty() ? null : scName, cName.isEmpty() ? null : cName);
+        byte[] scName = ColumnSerializer.readName(din);
+        byte[] cName = ColumnSerializer.readName(din);
+        return new QueryPath(cfName.isEmpty() ? null : cfName, scName.length == 0 ? null : scName, cName.length == 0 ? null : cName);
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableNamesIterator.java Tue Jul 21 01:36:52 2009
@@ -13,10 +13,10 @@
 {
     private ColumnFamily cf;
     private Iterator<IColumn> iter;
-    public final SortedSet<String> columns;
+    public final SortedSet<byte[]> columns;
 
     // TODO make this actually iterate so we don't have to read + deserialize + filter data that we don't need due to merging other sstables
-    public SSTableNamesIterator(String filename, String key, String cfName, SortedSet<String> columns) throws IOException
+    public SSTableNamesIterator(String filename, String key, String cfName, SortedSet<byte[]> columns) throws IOException
     {
         this.columns = columns;
         SSTableReader ssTable = SSTableReader.open(filename);
@@ -24,7 +24,7 @@
         if (buffer.getLength() > 0)
         {
             cf = ColumnFamily.serializer().deserialize(buffer);
-            iter = cf.getAllColumns().iterator();
+            iter = cf.getSortedColumns().iterator();
         }
     }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SSTableSliceIterator.java Tue Jul 21 01:36:52 2009
@@ -5,6 +5,7 @@
 
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.SequenceFile;
@@ -17,20 +18,22 @@
 class SSTableSliceIterator extends AbstractIterator<IColumn> implements ColumnIterator
 {
     protected boolean isAscending;
-    private String startColumn;
+    private byte[] startColumn;
     private DataOutputBuffer outBuf = new DataOutputBuffer();
     private DataInputBuffer inBuf = new DataInputBuffer();
     private int curColumnIndex;
     private ColumnFamily curCF = null;
     private ArrayList<IColumn> curColumns = new ArrayList<IColumn>();
     private SequenceFile.ColumnGroupReader reader;
+    private AbstractType comparator;
 
-    public SSTableSliceIterator(String filename, String key, String cfName, String startColumn, boolean isAscending)
+    public SSTableSliceIterator(String filename, String key, String cfName, AbstractType comparator, byte[] startColumn, boolean isAscending)
     throws IOException
     {
         this.isAscending = isAscending;
         SSTableReader ssTable = SSTableReader.open(filename);
         reader = ssTable.getColumnGroupReader(key, cfName, startColumn, isAscending);
+        this.comparator = comparator;
         this.startColumn = startColumn;
         curColumnIndex = isAscending ? 0 : -1;
     }
@@ -38,9 +41,9 @@
     private boolean isColumnNeeded(IColumn column)
     {
         if (isAscending)
-            return (column.name().compareTo(startColumn) >= 0);
+            return comparator.compare(column.name(), startColumn) >= 0;
         else
-            return (column.name().compareTo(startColumn) <= 0);
+            return comparator.compare(column.name(), startColumn) <= 0;
     }
 
     private void getColumnsFromBuffer() throws IOException
@@ -51,7 +54,7 @@
         if (curCF == null)
             curCF = columnFamily.cloneMeShallow();
         curColumns.clear();
-        for (IColumn column : columnFamily.getAllColumns())
+        for (IColumn column : columnFamily.getSortedColumns())
             if (isColumnNeeded(column))
                 curColumns.add(column);
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java?rev=796108&r1=796107&r2=796108&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java Tue Jul 21 01:36:52 2009
@@ -8,14 +8,15 @@
 import org.apache.cassandra.io.SSTableReader;
 import org.apache.cassandra.utils.ReducingIterator;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.AbstractType;
 
 public class SliceQueryFilter extends QueryFilter
 {
-    public final String start, finish;
+    public final byte[] start, finish;
     public final boolean isAscending;
     public final int count;
 
-    public SliceQueryFilter(String key, QueryPath columnParent, String start, String finish, boolean ascending, int count)
+    public SliceQueryFilter(String key, QueryPath columnParent, byte[] start, byte[] finish, boolean ascending, int count)
     {
         super(key, columnParent);
         this.start = start;
@@ -24,14 +25,14 @@
         this.count = count;
     }
 
-    public ColumnIterator getMemColumnIterator(Memtable memtable)
+    public ColumnIterator getMemColumnIterator(Memtable memtable, AbstractType comparator)
     {
-        return memtable.getSliceIterator(this);
+        return memtable.getSliceIterator(this, comparator);
     }
 
-    public ColumnIterator getSSTableColumnIterator(SSTableReader sstable) throws IOException
+    public ColumnIterator getSSTableColumnIterator(SSTableReader sstable, AbstractType comparator) throws IOException
     {
-        return new SSTableSliceIterator(sstable.getFilename(), key, getColumnFamilyName(), start, isAscending);
+        return new SSTableSliceIterator(sstable.getFilename(), key, getColumnFamilyName(), comparator, start, isAscending);
     }
 
     public void filterSuperColumn(SuperColumn superColumn)
@@ -41,23 +42,23 @@
     }
 
     @Override
-    public Comparator<IColumn> getColumnComparator()
+    public Comparator<IColumn> getColumnComparator(AbstractType comparator)
     {
-        Comparator<IColumn> comparator = super.getColumnComparator();
-        return isAscending ? comparator : new ReverseComparator(comparator);
+        return isAscending ? super.getColumnComparator(comparator) : new ReverseComparator(super.getColumnComparator(comparator));
     }
 
     public void collectColumns(ColumnFamily returnCF, ReducingIterator<IColumn> reducedColumns, int gcBefore)
     {
         int liveColumns = 0;
+        AbstractType comparator = returnCF.getComparator();
 
         for (IColumn column : reducedColumns)
         {
             if (liveColumns >= count)
                 break;
-            if (!finish.isEmpty()
-                && ((isAscending && column.name().compareTo(finish) > 0))
-                    || (!isAscending && column.name().compareTo(finish) < 0))
+            if (finish.length > 0
+                && ((isAscending && comparator.compare(column.name(), finish) > 0))
+                    || (!isAscending && comparator.compare(column.name(), finish) < 0))
                 break;
 
             if (!column.isMarkedForDelete())

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java?rev=796108&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractType.java Tue Jul 21 01:36:52 2009
@@ -0,0 +1,41 @@
+package org.apache.cassandra.db.marshal;
+
+import java.util.Comparator;
+import java.util.Collection;
+
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.SuperColumn;
+
+public abstract class AbstractType implements Comparator<byte[]>
+{
+    /** get a string representation of the bytes suitable for log messages */
+    public abstract String getString(byte[] bytes);
+
+    /** validate that the byte array is a valid sequence for the type we are supposed to be comparing */
+    public void validate(byte[] bytes)
+    {
+        getString(bytes);
+    }
+
+    /** convenience method */
+    public String getString(Collection<byte[]> names)
+    {
+        StringBuilder builder = new StringBuilder();
+        for (byte[] name : names)
+        {
+            builder.append(getString(name)).append(",");
+        }
+        return builder.toString();
+    }
+
+    /** convenience method */
+    public String getColumnsString(Collection<IColumn> columns)
+    {
+        StringBuilder builder = new StringBuilder();
+        for (IColumn column : columns)
+        {
+            builder.append(getString(column.name())).append(",");
+        }
+        return builder.toString();
+    }
+}

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java?rev=796108&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AsciiType.java Tue Jul 21 01:36:52 2009
@@ -0,0 +1,42 @@
+package org.apache.cassandra.db.marshal;
+
+import java.io.UnsupportedEncodingException;
+
+public class AsciiType extends AbstractType
+{
+    public int compare(byte[] o1, byte[] o2)
+    {
+        int length = Math.max(o1.length, o2.length);
+        for (int i = 0; i < length; i++)
+        {
+            int index = i + 1;
+            if (index > o1.length && index <= o2.length)
+            {
+                return -1;
+            }
+            if (index > o2.length && index <= o1.length)
+            {
+                return 1;
+            }
+
+            int delta = o1[i] - o2[i];
+            if (delta != 0)
+            {
+                return delta;
+            }
+        }
+        return 0;
+    }
+
+    public String getString(byte[] bytes)
+    {
+        try
+        {
+            return new String(bytes, "US-ASCII");
+        }
+        catch (UnsupportedEncodingException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}