You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/01/27 01:29:40 UTC

svn commit: r1063928 [1/2] - in /cassandra/trunk: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/context/ src/java/org/apache/cassandra/db/marshal/ src/java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/utils/ test/u...

Author: jbellis
Date: Thu Jan 27 00:29:39 2011
New Revision: 1063928

URL: http://svn.apache.org/viewvc?rev=1063928&view=rev
Log:
Fit partitioned counter directly into CounterColumn.value
patch by slebresne; reviewed by Kelvin Kakugawa for CASSANDRA-1936

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/DeletedColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java
    cassandra/trunk/src/java/org/apache/cassandra/db/context/IContext.java
    cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java
    cassandra/trunk/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java
    cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/CounterColumnTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/SuperColumnTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterAESCommutativeTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
    cassandra/trunk/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1063928&r1=1063927&r2=1063928&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Jan 27 00:29:39 2011
@@ -1,7 +1,7 @@
 0.8-dev
  * avoid double RowMutation serialization on write path (CASSANDRA-1800)
  * adds support for columns that act as incr/decr counters 
-   (CASSANDRA-1072, 1937, 1944)
+   (CASSANDRA-1072, 1937, 1944, 1936)
  * make NetworkTopologyStrategy the default (CASSANDRA-1960)
  * configurable internode encryption (CASSANDRA-1567)
  * human readable column names in sstable2json output (CASSANDRA-1933)

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Column.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Column.java?rev=1063928&r1=1063927&r2=1063928&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Column.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Column.java Thu Jan 27 00:29:39 2011
@@ -132,6 +132,11 @@ public class Column implements IColumn
         return size();
     }
 
+    public int serializationFlags()
+    {
+        return 0;
+    }
+
     public void addColumn(IColumn column)
     {
         throw new UnsupportedOperationException("This operation is not supported for simple columns.");
@@ -155,7 +160,7 @@ public class Column implements IColumn
         try
         {
             buffer.writeLong(timestamp);
-            buffer.writeByte((isMarkedForDelete()) ? ColumnSerializer.DELETION_MASK : 0);
+            buffer.writeByte(serializationFlags());
         }
         catch (IOException e)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java?rev=1063928&r1=1063927&r2=1063928&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnSerializer.java Thu Jan 27 00:29:39 2011
@@ -37,9 +37,10 @@ public class ColumnSerializer implements
 {
     private static final Logger logger = LoggerFactory.getLogger(ColumnSerializer.class);
     
-    public final static int DELETION_MASK = 0x01;
-    public final static int EXPIRATION_MASK = 0x02;
-    public final static int COUNTER_MASK    = 0x04;
+    public final static int DELETION_MASK       = 0x01;
+    public final static int EXPIRATION_MASK     = 0x02;
+    public final static int COUNTER_MASK        = 0x04;
+    public final static int COUNTER_UPDATE_MASK = 0x08;
 
     public void serialize(IColumn column, DataOutput dos)
     {
@@ -47,22 +48,16 @@ public class ColumnSerializer implements
         ByteBufferUtil.writeWithShortLength(column.name(), dos);
         try
         {
+            dos.writeByte(column.serializationFlags());
             if (column instanceof CounterColumn)
             {
-                dos.writeByte(COUNTER_MASK);
                 dos.writeLong(((CounterColumn)column).timestampOfLastDelete());
-                ByteBufferUtil.writeWithShortLength(ByteBuffer.wrap(((CounterColumn)column).partitionedCounter()), dos);
             }
             else if (column instanceof ExpiringColumn)
             {
-                dos.writeByte(EXPIRATION_MASK);
                 dos.writeInt(((ExpiringColumn) column).getTimeToLive());
                 dos.writeInt(column.getLocalDeletionTime());
             }
-            else
-            {
-                dos.writeByte((column.isMarkedForDelete()) ? DELETION_MASK : 0);
-            }
             dos.writeLong(column.timestamp());
             ByteBufferUtil.writeWithLength(column.value(), dos);
         }
@@ -82,11 +77,9 @@ public class ColumnSerializer implements
         if ((b & COUNTER_MASK) != 0)
         {
             long timestampOfLastDelete = dis.readLong();
-            ByteBuffer pc = ByteBufferUtil.readWithShortLength(dis);
-            byte[] partitionedCounter = ByteBufferUtil.getArray(pc);
-            long timestamp = dis.readLong();
+            long ts = dis.readLong();
             ByteBuffer value = ByteBufferUtil.readWithLength(dis);
-            return new CounterColumn(name, value, timestamp, partitionedCounter, timestampOfLastDelete);
+            return new CounterColumn(name, value, ts, timestampOfLastDelete);
         }
         else if ((b & EXPIRATION_MASK) != 0)
         {
@@ -112,9 +105,11 @@ public class ColumnSerializer implements
         {
             long ts = dis.readLong();
             ByteBuffer value = ByteBufferUtil.readWithLength(dis);
-            return (b & DELETION_MASK) == 0
-                   ? new Column(name, value, ts)
-                   : new DeletedColumn(name, value, ts);
+            return (b & COUNTER_UPDATE_MASK) != 0
+                   ? new CounterUpdateColumn(name, value, ts)
+                   : ((b & DELETION_MASK) == 0
+                      ? new Column(name, value, ts)
+                      : new DeletedColumn(name, value, ts));
         }
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java?rev=1063928&r1=1063927&r2=1063928&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterColumn.java Thu Jan 27 00:29:39 2011
@@ -40,54 +40,47 @@ public class CounterColumn extends Colum
 
     private static CounterContext contextManager = CounterContext.instance();
 
-    protected ByteBuffer value;                 // NOT final: delta OR total of partitioned counter
-    protected byte[] partitionedCounter;        // NOT final: only modify inline, carefully
     protected final long timestampOfLastDelete;
 
-    public CounterColumn(ByteBuffer name, ByteBuffer value, long timestamp)
+    public CounterColumn(ByteBuffer name, long value, long timestamp)
     {
-      this(name, value, timestamp, contextManager.create());
+        this(name, contextManager.create(value), timestamp);
     }
 
-    public CounterColumn(ByteBuffer name, ByteBuffer value, long timestamp, byte[] partitionedCounter)
+    public CounterColumn(ByteBuffer name, long value, long timestamp, long timestampOfLastDelete)
     {
-      this(name, value, timestamp, partitionedCounter, Long.MIN_VALUE);
+        this(name, contextManager.create(value), timestamp, timestampOfLastDelete);
     }
 
-    public CounterColumn(ByteBuffer name, ByteBuffer value, long timestamp, byte[] partitionedCounter, long timestampOfLastDelete)
+    public CounterColumn(ByteBuffer name, ByteBuffer value, long timestamp)
     {
-        super(name, ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp);
-        this.value = value;
-        this.partitionedCounter = partitionedCounter;
-        this.timestampOfLastDelete = timestampOfLastDelete;
+        this(name, value, timestamp, Long.MIN_VALUE);
     }
 
-    @Override
-    public ByteBuffer value()
+    public CounterColumn(ByteBuffer name, ByteBuffer value, long timestamp, long timestampOfLastDelete)
     {
-        return value;
+        super(name, value, timestamp);
+        this.timestampOfLastDelete = timestampOfLastDelete;
     }
 
-    public byte[] partitionedCounter()
+    public long timestampOfLastDelete()
     {
-        return partitionedCounter;
+        return timestampOfLastDelete;
     }
 
-    public long timestampOfLastDelete()
+    public long total()
     {
-        return timestampOfLastDelete;
+        return contextManager.total(value);
     }
 
     @Override
     public int size()
     {
         /*
-         * An expired column adds to a Column : 
-         *    4 bytes for length of partitionedCounter
-         *  + length of partitionedCounter
+         * A counter column adds to a Column :
          *  + 8 bytes for timestampOfLastDelete
          */
-        return super.size() + DBConstants.intSize_ + partitionedCounter.length + DBConstants.tsSize_;
+        return super.size() + DBConstants.tsSize_;
     }
 
     @Override
@@ -99,9 +92,7 @@ public class CounterColumn extends Colum
             return column;
         if (timestampOfLastDelete() < ((CounterColumn)column).timestampOfLastDelete())
             return column;
-        ContextRelationship rel = contextManager.diff(
-            ((CounterColumn)column).partitionedCounter(),
-            partitionedCounter());
+        ContextRelationship rel = contextManager.diff(column.value(), value());
         if (ContextRelationship.GREATER_THAN == rel || ContextRelationship.DISJOINT == rel)
             return column;
         return null;
@@ -110,10 +101,7 @@ public class CounterColumn extends Colum
     @Override
     public void updateDigest(MessageDigest digest)
     {
-        digest.update(name.duplicate());
-        digest.update(value.duplicate());
-        digest.update(ByteBufferUtil.bytes(timestamp));
-        digest.update(partitionedCounter);
+        super.updateDigest(digest);
         digest.update(ByteBufferUtil.bytes(timestampOfLastDelete));
     }
 
@@ -140,12 +128,7 @@ public class CounterColumn extends Colum
                     return column;
                 }
                 // tombstone > live last delete
-                return new CounterColumn(
-                    column.name(),
-                    column.value(),
-                    column.timestamp(),
-                    ((CounterColumn)column).partitionedCounter(),
-                    timestamp());
+                return new CounterColumn(column.name(), column.value(), column.timestamp(), timestamp());
             }
         }
         else if (column.isMarkedForDelete()) // live + tombstone: track last tombstone
@@ -160,63 +143,27 @@ public class CounterColumn extends Colum
                 return this;
             }
             // live last delete < tombstone
-            return new CounterColumn(
-                name(),
-                value(),
-                timestamp(),
-                partitionedCounter(),
-                column.timestamp());
+            return new CounterColumn(name(), value(), timestamp(), column.timestamp());
         }
         // live + live: merge clocks; update value
-        byte[] mergedPartitionedCounter = contextManager.merge(
-            partitionedCounter(),
-            ((CounterColumn)column).partitionedCounter());
-		ByteBuffer byteBufferValue;
-		if (0 == mergedPartitionedCounter.length)
-		{
-			long mergedValue = value().getLong(value().arrayOffset()) +
-                               column.value().getLong(column.value().arrayOffset());
-			byteBufferValue = ByteBufferUtil.bytes(mergedValue);
-		} else
-			byteBufferValue = ByteBuffer.wrap(contextManager.total(mergedPartitionedCounter));
         return new CounterColumn(
             name(),
-			byteBufferValue,
+            contextManager.merge(value(), column.value()),
             Math.max(timestamp(), column.timestamp()),
-            mergedPartitionedCounter,
             Math.max(timestampOfLastDelete(), ((CounterColumn)column).timestampOfLastDelete()));
     }
 
     @Override
     public boolean equals(Object o)
     {
-        if (this == o)
-            return true;
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        CounterColumn column = (CounterColumn)o;
-
-        if (timestamp != column.timestamp)
-            return false;
-
-        if (timestampOfLastDelete != column.timestampOfLastDelete)
-            return false;
-
-        if (!Arrays.equals(partitionedCounter, column.partitionedCounter))
-            return false;
-
-        if (!name.equals(column.name))
-            return false;
-
-        return value.equals(column.value);
+        // super.equals() returns false if o is not a CounterColumn
+        return super.equals(o) && timestampOfLastDelete == ((CounterColumn)o).timestampOfLastDelete;
     }
 
     @Override
     public int hashCode()
     {
         int result = super.hashCode();
-        result = 31 * result + (partitionedCounter != null ? Arrays.hashCode(partitionedCounter) : 0);
         result = 31 * result + (int)(timestampOfLastDelete ^ (timestampOfLastDelete >>> 32));
         return result;
     }
@@ -228,7 +175,6 @@ public class CounterColumn extends Colum
             ByteBufferUtil.clone(name),
             ByteBufferUtil.clone(value),
             timestamp,
-            partitionedCounter,
             timestampOfLastDelete);
     }
 
@@ -240,46 +186,31 @@ public class CounterColumn extends Colum
         sb.append(":");
         sb.append(isMarkedForDelete());
         sb.append(":");
-        sb.append(value.getLong(value.arrayOffset()));
+        sb.append(contextManager.toString(value));
         sb.append("@");
         sb.append(timestamp());
         sb.append("!");
         sb.append(timestampOfLastDelete);
-        sb.append("@");
-        sb.append(contextManager.toString(partitionedCounter));
         return sb.toString();
     }
 
-    private void updateValue()
-    {
-        value = ByteBuffer.wrap(contextManager.total(partitionedCounter));
-    }
-
-    public void update(InetAddress node)
+    @Override
+    public int serializationFlags()
     {
-        long delta = value.getLong(value.arrayOffset());
-        partitionedCounter = contextManager.update(partitionedCounter, node, delta);
-        updateValue();
+        return ColumnSerializer.COUNTER_MASK;
     }
 
     public CounterColumn cleanNodeCounts(InetAddress node)
     {
-        //XXX: inline modification non-destructive; cases:
+        // use cases:
         //     1) AES post-stream
         //     2) RRR, after CF.cloneMe()
         //     3) RRR, after CF.diff() which creates a new CF
-        byte[] cleanPartitionedCounter  = contextManager.cleanNodeCounts(partitionedCounter, node);
-        if (cleanPartitionedCounter == partitionedCounter)
+        ByteBuffer cleanedValue = contextManager.cleanNodeCounts(value, node);
+        if (cleanedValue == value) // reference equality is enough
             return this;
-        if (0 == cleanPartitionedCounter.length)
+        if (0 == value.remaining())
             return null;
-        return new CounterColumn(
-            name,
-            ByteBuffer.wrap(contextManager.total(cleanPartitionedCounter)),
-            timestamp,
-            cleanPartitionedCounter,
-            timestampOfLastDelete
-            );
+        return new CounterColumn(name, cleanedValue, timestamp, timestampOfLastDelete);
     }
 }
-

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java?rev=1063928&r1=1063927&r2=1063928&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CounterMutation.java Thu Jan 27 00:29:39 2011
@@ -38,6 +38,7 @@ import org.apache.cassandra.db.marshal.A
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.thrift.ConsistencyLevel;
 
@@ -163,9 +164,41 @@ public class CounterMutation implements 
 
     public void apply() throws IOException
     {
-        rowMutation.updateCommutativeTypes(FBUtilities.getLocalAddress());
+        // We need to transform all CounterUpdateColumn to CounterColumn and we need to deepCopy. Both are done 
+        // below since CUC.asCounterColumn() does a deep copy.
+        RowMutation rm = new RowMutation(rowMutation.getTable(), ByteBufferUtil.clone(rowMutation.key()));
 
-        rowMutation.deepCopy().apply();
+        for (ColumnFamily cf_ : rowMutation.getColumnFamilies())
+        {
+            ColumnFamily cf = cf_.cloneMeShallow();
+            if (cf_.isSuper())
+            {
+                for (IColumn column : cf_.getSortedColumns())
+                {
+                    IColumn sc = ((SuperColumn)column).shallowCopy();
+                    for (IColumn c : column.getSubColumns())
+                    {
+                        if (c instanceof CounterUpdateColumn)
+                            sc.addColumn(((CounterUpdateColumn) c).asCounterColumn());
+                        else
+                            sc.addColumn(c.deepCopy());
+                    }
+                    cf.addColumn(sc);
+                }
+            }
+            else
+            {
+                for (IColumn column : cf_.getSortedColumns())
+                {
+                    if (column instanceof CounterUpdateColumn)
+                        cf.addColumn(((CounterUpdateColumn) column).asCounterColumn());
+                    else
+                        cf.addColumn(column.deepCopy());
+                }
+            }
+            rm.add(cf);
+        }
+        rm.apply();
     }
 
     @Override

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DeletedColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DeletedColumn.java?rev=1063928&r1=1063927&r2=1063928&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DeletedColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DeletedColumn.java Thu Jan 27 00:29:39 2011
@@ -70,4 +70,10 @@ public class DeletedColumn extends Colum
     {
         return new DeletedColumn(ByteBufferUtil.clone(name), ByteBufferUtil.clone(value), timestamp);
     }
+
+    @Override
+    public int serializationFlags()
+    {
+        return ColumnSerializer.DELETION_MASK;
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java?rev=1063928&r1=1063927&r2=1063928&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ExpiringColumn.java Thu Jan 27 00:29:39 2011
@@ -91,7 +91,7 @@ public class ExpiringColumn extends Colu
         try
         {
             buffer.writeLong(timestamp);
-            buffer.writeByte(ColumnSerializer.EXPIRATION_MASK);
+            buffer.writeByte(serializationFlags());
             buffer.writeInt(timeToLive);
         }
         catch (IOException e)
@@ -135,4 +135,10 @@ public class ExpiringColumn extends Colu
             throw new IllegalStateException("column is not marked for delete");
         }
     }
+
+    @Override
+    public int serializationFlags()
+    {
+        return ColumnSerializer.EXPIRATION_MASK;
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java?rev=1063928&r1=1063927&r2=1063928&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/IColumn.java Thu Jan 27 00:29:39 2011
@@ -35,6 +35,7 @@ public interface IColumn
     public ByteBuffer name();
     public int size();
     public int serializedSize();
+    public int serializationFlags();
     public long timestamp();
     public ByteBuffer value();
     public Collection<IColumn> getSubColumns();

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1063928&r1=1063927&r2=1063928&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Thu Jan 27 00:29:39 2011
@@ -308,20 +308,6 @@ public class RowMutation implements IMut
         }
     }
 
-    /**
-     * Update the context of all CounterColumns in this RowMutation
-     */
-    public void updateCommutativeTypes(InetAddress node)
-    {
-        for (ColumnFamily cf : modifications_.values())
-        {
-            AbstractType defaultValidator = cf.metadata().getDefaultValidator();
-            if (!defaultValidator.isCommutative())
-                continue;
-            ((AbstractCommutativeType)defaultValidator).update(cf, node);
-        }
-    }
-
     static RowMutation fromBytes(byte[] raw) throws IOException
     {
         RowMutation rm = serializer_.deserialize(new DataInputStream(new ByteArrayInputStream(raw)));

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=1063928&r1=1063927&r2=1063928&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Thu Jan 27 00:29:39 2011
@@ -295,6 +295,14 @@ public class SuperColumn implements ICol
         this.localDeletionTime.set(localDeleteTime);
         this.markedForDeleteAt.set(timestamp);
     }
+
+    public IColumn shallowCopy()
+    {
+        SuperColumn sc = new SuperColumn(ByteBufferUtil.clone(name_), this.getComparator());
+        sc.localDeletionTime = localDeletionTime;
+        sc.markedForDeleteAt = markedForDeleteAt;
+        return sc;
+    }
     
     public IColumn deepCopy()
     {
@@ -314,6 +322,11 @@ public class SuperColumn implements ICol
     {
         throw new UnsupportedOperationException("This operation is unsupported on super columns.");
     }
+
+    public int serializationFlags()
+    {
+        throw new UnsupportedOperationException("Super columns don't have a serialization mask");
+    }
 }
 
 class SuperColumnSerializer implements ICompactSerializer2<IColumn>

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java?rev=1063928&r1=1063927&r2=1063928&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/context/CounterContext.java Thu Jan 27 00:29:39 2011
@@ -19,11 +19,13 @@ package org.apache.cassandra.db.context;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.apache.commons.lang.ArrayUtils;
 
 import org.apache.cassandra.db.DBConstants;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -47,6 +49,7 @@ public class CounterContext implements I
 {
     private static final int idLength;
     private static final byte[] localId;
+    private static final ByteBuffer wrappedLocalId;
     private static final int clockLength = DBConstants.longSize_;
     private static final int countLength = DBConstants.longSize_;
     private static final int stepLength; // length: id + logical clock + count
@@ -60,6 +63,7 @@ public class CounterContext implements I
     static
     {
         localId  = FBUtilities.getLocalAddress().getAddress();
+        wrappedLocalId = ByteBuffer.wrap(localId);
         idLength   = localId.length;
         stepLength = idLength + clockLength + countLength;
     }
@@ -70,69 +74,33 @@ public class CounterContext implements I
     }
 
     /**
-     * Creates an initial counter context.
+     * Creates an initial counter context with an initial value for the local node with.
+     *
+     * @param value the value for this initial update
      *
      * @return an empty counter context.
      */
-    public byte[] create()
-    {
-        return new byte[0];
-    }
-    
-    // write a tuple (node id, clock, count) at the front
-    protected static void writeElement(byte[] context, byte[] id, long clock, long count)
-    {
-        writeElementAtStepOffset(context, 0, id, clock, count);
-    }
-
-    // write a tuple (node id, clock, count) at step offset
-    protected static void writeElementAtStepOffset(byte[] context, int stepOffset, byte[] id, long clock, long count)
+    public ByteBuffer create(long value)
     {
-        int offset = stepOffset * stepLength;
-        System.arraycopy(id, 0, context, offset, idLength);
-        FBUtilities.copyIntoBytes(context, offset + idLength, clock);
-        FBUtilities.copyIntoBytes(context, offset + idLength + clockLength, count);
+        ByteBuffer context = ByteBuffer.allocate(stepLength);
+        writeElementAtOffset(context, 0, localId, 1L, value);
+        return context;
     }
 
-    public byte[] insertElementAtStepOffset(byte[] context, int stepOffset, byte[] id, long clock, long count)
+    // For testing purposes
+    public ByteBuffer create(byte[] id, long clock, long value)
     {
-        int offset = stepOffset * stepLength;
-        byte[] newContext = new byte[context.length + stepLength];
-        System.arraycopy(context, 0, newContext, 0, offset);
-        writeElementAtStepOffset(newContext, stepOffset, id, clock, count);
-        System.arraycopy(context, offset, newContext, offset + stepLength, context.length - offset);
-        return newContext;
+        ByteBuffer context = ByteBuffer.allocate(stepLength);
+        writeElementAtOffset(context, 0, id, clock, value);
+        return context;
     }
 
-    public byte[] update(byte[] context, InetAddress node, long delta)
+    // write a tuple (node id, clock, count) at offset
+    protected static void writeElementAtOffset(ByteBuffer context, int offset, byte[] id, long clock, long count)
     {
-        // calculate node id
-        byte[] nodeId = node.getAddress();
-        int idCount = context.length / stepLength;
-
-        // look for this node id
-        for (int stepOffset = 0; stepOffset < idCount; ++stepOffset)
-        {
-            int offset = stepOffset * stepLength;
-            int cmp = FBUtilities.compareByteSubArrays(nodeId, 0, context, offset, idLength);
-            if (cmp == 0)
-            {
-                // node id found: increment clock, update count; shift to front
-                long clock = FBUtilities.byteArrayToLong(context, offset + idLength);
-                long count = FBUtilities.byteArrayToLong(context, offset + idLength + clockLength);
-
-                writeElementAtStepOffset(context, stepOffset, nodeId, clock + 1L, count + delta);
-                return context;
-            }
-            if (cmp < 0)
-            {
-                // id at offset is greater that the one we are updating, inserting
-                return insertElementAtStepOffset(context, stepOffset, nodeId, 1L, delta);
-            }
-        }
-
-        // node id not found: adding at the end
-        return insertElementAtStepOffset(context, idCount, nodeId, 1L, delta);
+        System.arraycopy(id, 0, context.array(), offset + context.arrayOffset(), idLength);
+        context.putLong(offset + idLength, clock);
+        context.putLong(offset + idLength + clockLength, count);
     }
 
     /**
@@ -152,20 +120,20 @@ public class CounterContext implements I
      *            counter context.
      * @return the ContextRelationship between the contexts.
      */
-    public ContextRelationship diff(byte[] left, byte[] right)
+    public ContextRelationship diff(ByteBuffer left, ByteBuffer right)
     {
         ContextRelationship relationship = ContextRelationship.EQUAL;
 
-        int leftIndex  = 0;
-        int rightIndex = 0;
-        while (leftIndex < left.length && rightIndex < right.length)
+        int leftIndex  = left.position();
+        int rightIndex = right.position();
+        while (leftIndex < left.remaining() && rightIndex < right.remaining())
         {
             // compare id bytes
-            int compareId = FBUtilities.compareByteSubArrays(left, leftIndex, right, rightIndex, idLength);
+            int compareId = ByteBufferUtil.compareSubArrays(left, leftIndex, right, rightIndex, idLength);
             if (compareId == 0)
             {
-                long leftClock  = FBUtilities.byteArrayToLong(left,  leftIndex + idLength);
-                long rightClock = FBUtilities.byteArrayToLong(right, rightIndex + idLength);
+                long leftClock  = left.getLong(leftIndex + idLength);
+                long rightClock = right.getLong(rightIndex + idLength);
 
                 // advance indexes
                 leftIndex  += stepLength;
@@ -252,7 +220,7 @@ public class CounterContext implements I
         }
 
         // check final lengths
-        if (leftIndex < left.length)
+        if (leftIndex < left.remaining())
         {
             if (relationship == ContextRelationship.EQUAL)
             {
@@ -263,7 +231,7 @@ public class CounterContext implements I
                 return ContextRelationship.DISJOINT;
             }
         }
-        else if (rightIndex < right.length)
+        else if (rightIndex < right.remaining())
         {
             if (relationship == ContextRelationship.EQUAL)
             {
@@ -278,38 +246,6 @@ public class CounterContext implements I
         return relationship;
     }
 
-    private class CounterNode
-    {
-        public final long clock;
-        public final long count;
-
-        public CounterNode(long clock, long count)
-        {
-            this.clock = clock;
-            this.count = count;
-        }
-
-        public int compareClockTo(CounterNode o)
-        {
-            if (clock == o.clock)
-            {
-                return 0;
-            }
-            else if (clock > o.clock)
-            {
-                return 1;
-            }
-            // clock < o.clock
-            return -1;
-        }
-
-        @Override
-        public String toString()
-        {
-            return "(" + clock + "," + count + ")";
-        }
-    }
-
     /**
      * Return a context w/ an aggregated count for each node id.
      *
@@ -318,22 +254,15 @@ public class CounterContext implements I
      * @param right
      *            counter context.
      */
-    public byte[] merge(byte[] left, byte[] right)
+    public ByteBuffer merge(ByteBuffer left, ByteBuffer right)
     {
-        if (left.length > right.length)
-        {
-            byte[] tmp = right;
-            right = left;
-            left = tmp;
-        }
-
         // Compute size of result
         int size = 0;
-        int leftOffset  = 0;
-        int rightOffset = 0;
-        while ((leftOffset < left.length) && (rightOffset < right.length))
+        int leftOffset  = left.position();
+        int rightOffset = right.position();
+        while ((leftOffset < left.limit()) && (rightOffset < right.limit()))
         {
-            int cmp = FBUtilities.compareByteSubArrays(left, leftOffset, right, rightOffset, idLength);
+            int cmp = ByteBufferUtil.compareSubArrays(left, leftOffset, right, rightOffset, idLength);
             if (cmp == 0)
             {
                 ++size;
@@ -351,35 +280,37 @@ public class CounterContext implements I
                 leftOffset += stepLength;
             }
         }
-        size += (left.length  - leftOffset)  / stepLength;
-        size += (right.length - rightOffset) / stepLength;
+        size += (left.limit() - leftOffset)  / stepLength;
+        size += (right.limit() - rightOffset) / stepLength;
 
-        byte[] merged = new byte[size * stepLength];
+        ByteBuffer merged = ByteBuffer.allocate(size * stepLength);
 
         // Do the actual merge:
         //   a) local id:  sum clocks, counts
         //   b) remote id: keep highest clock, count (reconcile)
-        int mergedOffset = 0; leftOffset = 0; rightOffset = 0;
-        while ((leftOffset < left.length) && (rightOffset < right.length))
+        int mergedOffset = merged.position();
+        leftOffset = left.position();
+        rightOffset = right.position();
+        while ((leftOffset < left.limit()) && (rightOffset < right.limit()))
         {
-            int cmp = FBUtilities.compareByteSubArrays(left, leftOffset, right, rightOffset, idLength);
+            int cmp = ByteBufferUtil.compareSubArrays(left, leftOffset, right, rightOffset, idLength);
             if (cmp == 0)
             {
                 // sum for local id, keep highest othewise
-                long leftClock = FBUtilities.byteArrayToLong(left, leftOffset + idLength);
-                long rightClock = FBUtilities.byteArrayToLong(right, rightOffset + idLength);
-                if (FBUtilities.compareByteSubArrays(left, leftOffset, localId, 0, idLength) == 0)
-                {
-                    long leftCount = FBUtilities.byteArrayToLong(left, leftOffset + idLength + clockLength);
-                    long rightCount = FBUtilities.byteArrayToLong(right, rightOffset + idLength + clockLength);
-                    writeElementAtStepOffset(merged, mergedOffset / stepLength, localId, leftClock + rightClock, leftCount + rightCount);
+                long leftClock = left.getLong(leftOffset + idLength);
+                long rightClock = right.getLong(rightOffset + idLength);
+                if (ByteBufferUtil.compareSubArrays(left, leftOffset, wrappedLocalId, 0, idLength) == 0)
+                {
+                    long leftCount = left.getLong(leftOffset + idLength + clockLength);
+                    long rightCount = right.getLong(rightOffset + idLength + clockLength);
+                    writeElementAtOffset(merged, mergedOffset, localId, leftClock + rightClock, leftCount + rightCount);
                 }
                 else
                 {
                     if (leftClock >= rightClock)
-                        System.arraycopy(left, leftOffset, merged, mergedOffset, stepLength);
+                        ByteBufferUtil.arrayCopy(left, leftOffset, merged, mergedOffset, stepLength);
                     else
-                        System.arraycopy(right, rightOffset, merged, mergedOffset, stepLength);
+                        ByteBufferUtil.arrayCopy(right, rightOffset, merged, mergedOffset, stepLength);
                 }
                 mergedOffset += stepLength;
                 rightOffset += stepLength;
@@ -387,31 +318,31 @@ public class CounterContext implements I
             }
             else if (cmp > 0)
             {
-                System.arraycopy(right, rightOffset, merged, mergedOffset, stepLength);
+                ByteBufferUtil.arrayCopy(right, rightOffset, merged, mergedOffset, stepLength);
                 mergedOffset += stepLength;
                 rightOffset += stepLength;
             }
             else // cmp < 0
             {
-                System.arraycopy(left, leftOffset, merged, mergedOffset, stepLength);
+                ByteBufferUtil.arrayCopy(left, leftOffset, merged, mergedOffset, stepLength);
                 mergedOffset += stepLength;
                 leftOffset += stepLength;
             }
         }
-        if (leftOffset < left.length)
-            System.arraycopy(
+        if (leftOffset < left.limit())
+            ByteBufferUtil.arrayCopy(
                 left,
                 leftOffset,
                 merged,
                 mergedOffset,
-                left.length - leftOffset);
-        if (rightOffset < right.length)
-            System.arraycopy(
+                left.limit() - leftOffset);
+        if (rightOffset < right.limit())
+            ByteBufferUtil.arrayCopy(
                 right,
                 rightOffset,
                 merged,
                 mergedOffset,
-                right.length - rightOffset);
+                right.limit() - rightOffset);
 
         return merged;
     }
@@ -423,21 +354,22 @@ public class CounterContext implements I
      *            version context.
      * @return a human-readable String of the context.
      */
-    public String toString(byte[] context)
+    public String toString(ByteBuffer context)
     {
         StringBuilder sb = new StringBuilder();
         sb.append("[");
-        for (int offset = 0; offset < context.length; offset += stepLength)
+        for (int offset = context.position(); offset < context.limit(); offset += stepLength)
         {
-            if (offset > 0)
+            if (offset > context.position())
             {
                 sb.append(",");
             }
             sb.append("{");
             try
             {
+                int absOffset = context.arrayOffset() + offset;
                 InetAddress address = InetAddress.getByAddress(
-                            ArrayUtils.subarray(context, offset, offset + idLength));
+                            ArrayUtils.subarray(context.array(), absOffset, absOffset + idLength));
                 sb.append(address.getHostAddress());
             }
             catch (UnknownHostException uhe)
@@ -445,9 +377,9 @@ public class CounterContext implements I
                 sb.append("?.?.?.?");
             }
             sb.append(", ");
-            sb.append(FBUtilities.byteArrayToLong(context, offset + idLength));
+            sb.append(context.getLong(offset + idLength));
             sb.append(", ");
-            sb.append(FBUtilities.byteArrayToLong(context, offset + idLength + clockLength));
+            sb.append(context.getLong(offset + idLength + clockLength));
 
             sb.append("}");
         }
@@ -456,42 +388,42 @@ public class CounterContext implements I
     }
 
     // return an aggregated count across all node ids
-    public byte[] total(byte[] context)
+    public long total(ByteBuffer context)
     {
         long total = 0L;
 
-        for (int offset = 0; offset < context.length; offset += stepLength)
+        for (int offset = context.position(); offset < context.limit(); offset += stepLength)
         {
-            long count = FBUtilities.byteArrayToLong(context, offset + idLength + clockLength);
+            long count = context.getLong(offset + idLength + clockLength);
             total += count;
         }
 
-        return FBUtilities.toByteArray(total);
+        return total;
     }
 
     // remove the count for a given node id
-    public byte[] cleanNodeCounts(byte[] context, InetAddress node)
+    public ByteBuffer cleanNodeCounts(ByteBuffer context, InetAddress node)
     {
         // calculate node id
-        byte[] nodeId = node.getAddress();
+        ByteBuffer nodeId = ByteBuffer.wrap(node.getAddress());
 
         // look for this node id
-        for (int offset = 0; offset < context.length; offset += stepLength)
+        for (int offset = 0; offset < context.remaining(); offset += stepLength)
         {
-            int cmp = FBUtilities.compareByteSubArrays(context, offset, nodeId, 0, idLength);
+            int cmp = ByteBufferUtil.compareSubArrays(context, context.position() + offset, nodeId, 0, idLength);
             if (cmp < 0)
                 continue;
             else if (cmp == 0)
             {
                 // node id found: remove node count
-                byte[] truncatedContext = new byte[context.length - stepLength];
-                System.arraycopy(context, 0, truncatedContext, 0, offset);
-                System.arraycopy(
+                ByteBuffer truncatedContext = ByteBuffer.allocate(context.remaining() - stepLength);
+                ByteBufferUtil.arrayCopy(context, context.position(), truncatedContext, 0, offset);
+                ByteBufferUtil.arrayCopy(
                         context,
-                        offset + stepLength,
+                        context.position() + offset + stepLength,
                         truncatedContext,
                         offset,
-                        context.length - (offset + stepLength));
+                        context.remaining() - (offset + stepLength));
                 return truncatedContext;
             }
             else // cmp > 0

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/context/IContext.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/context/IContext.java?rev=1063928&r1=1063927&r2=1063928&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/context/IContext.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/context/IContext.java Thu Jan 27 00:29:39 2011
@@ -17,10 +17,12 @@
  */
 package org.apache.cassandra.db.context;
 
+import java.nio.ByteBuffer;
+
 /**
  * An opaque commutative context.
  *
- * Maintains a byte[] context that represents a partitioned commutative value.
+ * Maintains a ByteBuffer context that represents a partitioned commutative value.
  */
 public interface IContext
 {
@@ -33,13 +35,6 @@ public interface IContext
     };
 
     /**
-     * Creates an initial context.
-     *
-     * @return the initial context.
-     */
-    public byte[] create();
-
-    /**
      * Determine the relationship between two contexts.
      *
      * EQUAL:        Equal set of nodes and every count is equal.
@@ -53,7 +48,7 @@ public interface IContext
      *            context.
      * @return the ContextRelationship between the contexts.
      */
-    public ContextRelationship diff(byte[] left, byte[] right);
+    public ContextRelationship diff(ByteBuffer left, ByteBuffer right);
 
     /**
      * Return a context w/ an aggregated count for each node id.
@@ -63,7 +58,7 @@ public interface IContext
      * @param right
      *            context.
      */
-    public byte[] merge(byte[] left, byte[] right);
+    public ByteBuffer merge(ByteBuffer left, ByteBuffer right);
 
     /**
      * Human-readable String from context.
@@ -72,5 +67,5 @@ public interface IContext
      *            context.
      * @return a human-readable String of the context.
      */
-    public String toString(byte[] context);
+    public String toString(ByteBuffer context);
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java?rev=1063928&r1=1063927&r2=1063928&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/AbstractCommutativeType.java Thu Jan 27 00:29:39 2011
@@ -39,11 +39,6 @@ public abstract class AbstractCommutativ
     public abstract Column createColumn(ByteBuffer name, ByteBuffer value, long timestamp);
 
     /**
-     * update commutative columns for target node
-     */
-    public abstract void update(IColumnContainer cc, InetAddress node);
-
-    /**
      * remove target node from commutative columns
      */
     public abstract void cleanContext(IColumnContainer cc, InetAddress node);

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java?rev=1063928&r1=1063927&r2=1063928&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java Thu Jan 27 00:29:39 2011
@@ -56,7 +56,7 @@ public class CounterColumnType extends A
         {
             throw new MarshalException("A long is exactly 8 bytes");
         }
-        return String.valueOf(bytes.getLong(bytes.position()+bytes.arrayOffset()));
+        return String.valueOf(bytes.getLong(bytes.position()));
     }
 
     /**
@@ -64,27 +64,7 @@ public class CounterColumnType extends A
      */
     public Column createColumn(ByteBuffer name, ByteBuffer value, long timestamp)
     {
-        return new CounterColumn(name, value, timestamp);
-    }
-
-    /**
-     * update commutative columns for target node
-     */
-    public void update(IColumnContainer cc, InetAddress node)
-    {
-        for (IColumn column : cc.getSortedColumns())
-        {
-            if (column instanceof SuperColumn)
-            {
-                update((IColumnContainer)column, node);
-                continue;
-            }
-            
-            if (column instanceof DeletedColumn)
-                continue;
-            
-            ((CounterColumn)column).update(node);
-        }
+        return new CounterUpdateColumn(name, value, timestamp);
     }
 
     /**

Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1063928&r1=1063927&r2=1063928&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Thu Jan 27 00:29:39 2011
@@ -44,6 +44,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.db.migration.*;
+import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.DynamicEndpointSnitch;
@@ -1059,12 +1060,12 @@ public class CassandraServer implements 
     private Counter getCounter(ColumnOrSuperColumn cosc)
     {
         if (cosc.isSetColumn()) {
-            return new Counter().setColumn(new CounterColumn(cosc.column.name, cosc.column.value.getLong(cosc.column.value.arrayOffset())));
+            return new Counter().setColumn(new CounterColumn(cosc.column.name, CounterContext.instance().total(cosc.column.value)));
         } else if(cosc.isSetSuper_column()) {
             List<CounterColumn> cc = new ArrayList<CounterColumn>(cosc.super_column.columns.size());
             for (Column col : cosc.super_column.columns)
             {
-                cc.add(new CounterColumn(col.name, col.value.getLong(col.value.arrayOffset())));
+                cc.add(new CounterColumn(col.name, CounterContext.instance().total(col.value)));
             }
             return new Counter().setSuper_column(new CounterSuperColumn(cosc.super_column.name, cc));
         }

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java?rev=1063928&r1=1063927&r2=1063928&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java Thu Jan 27 00:29:39 2011
@@ -235,6 +235,38 @@ public class ByteBufferUtil
         }
     }
 
+    /**
+     * Transfer bytes from one ByteBuffer to another.
+     * This function acts as System.arrayCopy() but for ByteBuffers.
+     *
+     * @param src the source ByteBuffer
+     * @param srcPos starting position in the source ByteBuffer
+     * @param dst the destination ByteBuffer
+     * @param dstPos starting position in the destination ByteBuffer
+     * @param length the number of bytes to copy
+     */
+    public static void arrayCopy(ByteBuffer src, int srcPos, ByteBuffer dst, int dstPos, int length)
+    {
+        if (src.hasArray() && dst.hasArray())
+        {
+            System.arraycopy(src.array(),
+                             src.arrayOffset() + srcPos,
+                             dst.array(),
+                             dst.arrayOffset() + dstPos,
+                             length);
+        }
+        else
+        {
+            if (src.limit() - srcPos < length || dst.limit() - dstPos < length)
+                throw new IndexOutOfBoundsException();
+
+            for (int i = 0; i < length; i++)
+            {
+                dst.put(dstPos++, src.get(srcPos++));
+            }
+        }
+    }
+
     public static void writeWithLength(ByteBuffer bytes, DataOutput out) throws IOException
     {
         out.writeInt(bytes.remaining());
@@ -399,4 +431,37 @@ public class ByteBufferUtil
     {
         return ByteBuffer.wrap(FBUtilities.hexToBytes(str));
     }
+
+    /**
+     * Compare two ByteBuffer at specified offsets for length.
+     * Compares the non equal bytes as unsigned.
+     * @param bytes1 First byte buffer to compare.
+     * @param offset1 Position to start the comparison at in the first array.
+     * @param bytes2 Second byte buffer to compare.
+     * @param offset2 Position to start the comparison at in the second array.
+     * @param length How many bytes to compare?
+     * @return -1 if byte1 is less than byte2, 1 if byte2 is less than byte1 or 0 if equal.
+     */
+    public static int compareSubArrays(ByteBuffer bytes1, int offset1, ByteBuffer bytes2, int offset2, int length)
+    {
+        if ( null == bytes1 )
+        {
+            if ( null == bytes2) return 0;
+            else return -1;
+        }
+        if (null == bytes2 ) return 1;
+
+        assert bytes1.limit() >= offset1 + length : "The first byte array isn't long enough for the specified offset and length.";
+        assert bytes2.limit() >= offset2 + length : "The second byte array isn't long enough for the specified offset and length.";
+        for ( int i = 0; i < length; i++ )
+        {
+            byte byte1 = bytes1.get(offset1 + i);
+            byte byte2 = bytes2.get(offset2 + i);
+            if ( byte1 == byte2 )
+                continue;
+            // compare non-equal bytes as unsigned
+            return (byte1 & 0xFF) < (byte2 & 0xFF) ? -1 : 1;
+        }
+        return 0;
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1063928&r1=1063927&r2=1063928&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Thu Jan 27 00:29:39 2011
@@ -173,26 +173,6 @@ public class FBUtilities
     }
 
     /**
-     * @param bytes A byte array containing a serialized integer.
-     * @param offset Start position of the integer in the array.
-     * @return The integer value contained in the byte array.
-     */
-    public static int byteArrayToInt(byte[] bytes, int offset)
-    {
-        if (bytes.length - offset < 4)
-        {
-            throw new IllegalArgumentException("An integer must be 4 bytes in size.");
-        }
-        int n = 0;
-        for ( int i = 0; i < 4; ++i )
-        {
-            n <<= 8;
-            n |= bytes[offset + i] & 0xFF;
-        }
-        return n;
-    }
-
-    /**
      * Copy bytes from long into bytes starting from offset.
      * @param bytes Target array
      * @param offset Offset into the array
@@ -221,36 +201,6 @@ public class FBUtilities
         return bytes;
     }
 
-    /**
-     * @param bytes A byte array containing a serialized long.
-     * @return The long value contained in the byte array.
-     */
-    public static long byteArrayToLong(byte[] bytes)
-    {
-        return byteArrayToLong(bytes, 0);
-    }
-
-    /**
-     * @param bytes A byte array containing a serialized long.
-     * @param offset Start position of the long in the array.
-     * @return The long value contained in the byte array.
-     */
-    public static long byteArrayToLong(byte[] bytes, int offset)
-    {
-        if (bytes.length - offset < 8)
-        {
-            throw new IllegalArgumentException("A long must be 8 bytes in size.");
-        }
-        long n = 0;
-        for ( int i = 0; i < 8; ++i )
-        {
-            n <<= 8;
-
-            n |= bytes[offset + i] & 0xFF;
-        }
-        return n;
-    }
-
     public static int compareUnsigned(byte[] bytes1, byte[] bytes2, int offset1, int offset2, int len1, int len2)
     {
         if (bytes1 == null)
@@ -272,38 +222,6 @@ public class FBUtilities
     }
   
     /**
-     * Compare two byte[] at specified offsets for length. Compares the non equal bytes as unsigned.
-     * @param bytes1 First array to compare.
-     * @param offset1 Position to start the comparison at in the first array.
-     * @param bytes2 Second array to compare.
-     * @param offset2 Position to start the comparison at in the second array.
-     * @param length How many bytes to compare?
-     * @return -1 if byte1 is less than byte2, 1 if byte2 is less than byte1 or 0 if equal.
-     */
-    public static int compareByteSubArrays(byte[] bytes1, int offset1, byte[] bytes2, int offset2, int length)
-    {
-        if ( null == bytes1 )
-        {
-            if ( null == bytes2) return 0;
-            else return -1;
-        }
-        if (null == bytes2 ) return 1;
-
-        assert bytes1.length >= (offset1 + length) : "The first byte array isn't long enough for the specified offset and length.";
-        assert bytes2.length >= (offset2 + length) : "The second byte array isn't long enough for the specified offset and length.";
-        for ( int i = 0; i < length; i++ )
-        {
-            byte byte1 = bytes1[offset1+i];
-            byte byte2 = bytes2[offset2+i];
-            if ( byte1 == byte2 )
-                continue;
-            // compare non-equal bytes as unsigned
-            return (byte1 & 0xFF) < (byte2 & 0xFF) ? -1 : 1;
-        }
-        return 0;
-    }
-
-    /**
      * @return The bitwise XOR of the inputs. The output will be the same length as the
      * longer input, but if either input is null, the output will be null.
      */

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/CounterColumnTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/CounterColumnTest.java?rev=1063928&r1=1063927&r2=1063928&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/CounterColumnTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/CounterColumnTest.java Thu Jan 27 00:29:39 2011
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 
 import org.apache.commons.lang.ArrayUtils;
 
@@ -63,56 +64,12 @@ public class CounterColumnTest
     {
         AbstractCommutativeType type = CounterColumnType.instance;
         long delta = 3L;
-        CounterColumn column = (CounterColumn)type.createColumn(ByteBufferUtil.bytes("x"), ByteBufferUtil.bytes(delta), 1L);
-        assert delta == column.value().getLong(column.value().arrayOffset());
-        assert 0 == column.partitionedCounter().length;
-
-        InetAddress node = InetAddress.getByAddress(FBUtilities.toByteArray(1));
-        column.update(node);
-        assert delta == column.value().getLong(column.value().arrayOffset());
-        assert  1 == FBUtilities.byteArrayToInt( column.partitionedCounter(), 0*stepLength);
-        assert 1L == FBUtilities.byteArrayToLong(column.partitionedCounter(), 0*stepLength + idLength);
-        assert 3L == FBUtilities.byteArrayToLong(column.partitionedCounter(), 0*stepLength + idLength + clockLength);
-    }
-
-    @Test
-    public void testUpdate() throws UnknownHostException
-    {
-        CounterColumn c = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBufferUtil.bytes(0L), 0L);
-        assert 0L == c.value().getLong(c.value().arrayOffset());
-
-        assert c.partitionedCounter().length == 0 : "badly formatted initial context";
-
-        c.value = ByteBufferUtil.bytes(1L);
-        c.update(InetAddress.getByAddress(FBUtilities.toByteArray(1)));
-        assert 1L == c.value().getLong(c.value().arrayOffset());
-
-        assert c.partitionedCounter().length == stepLength;
-
-        assert  1 == FBUtilities.byteArrayToInt( c.partitionedCounter(), 0*stepLength);
-        assert 1L == FBUtilities.byteArrayToLong(c.partitionedCounter(), 0*stepLength + idLength);
-        assert 1L == FBUtilities.byteArrayToLong(c.partitionedCounter(), 0*stepLength + idLength + clockLength);
-
-        c.value = ByteBufferUtil.bytes(3L);
-        c.update(InetAddress.getByAddress(FBUtilities.toByteArray(2)));
-
-        c.value = ByteBufferUtil.bytes(2L);
-        c.update(InetAddress.getByAddress(FBUtilities.toByteArray(2)));
+        CounterColumn column = new CounterColumn(ByteBufferUtil.bytes("x"), delta, 1L);
 
-        c.value = ByteBufferUtil.bytes(9L);
-        c.update(InetAddress.getByAddress(FBUtilities.toByteArray(2)));
-
-        assert 15L == c.value().getLong(c.value().arrayOffset());
-
-        assert c.partitionedCounter().length == (2 * stepLength);
-
-        assert  1 == FBUtilities.byteArrayToInt(c.partitionedCounter(),  0*stepLength);
-        assert 1L == FBUtilities.byteArrayToLong(c.partitionedCounter(), 0*stepLength + idLength);
-        assert 1L == FBUtilities.byteArrayToLong(c.partitionedCounter(), 0*stepLength + idLength + clockLength);
-
-        assert   2 == FBUtilities.byteArrayToInt(c.partitionedCounter(),  1*stepLength);
-        assert  3L == FBUtilities.byteArrayToLong(c.partitionedCounter(), 1*stepLength + idLength);
-        assert 14L == FBUtilities.byteArrayToLong(c.partitionedCounter(), 1*stepLength + idLength + clockLength);
+        assert delta == column.total();
+        assert Arrays.equals(FBUtilities.getLocalAddress().getAddress(), ArrayUtils.subarray(column.value().array(), 0, idLength));
+        assert 1L == column.value().getLong(0*stepLength + idLength);
+        assert delta == column.value().getLong(0*stepLength + idLength + clockLength);
     }
 
     @Test
@@ -131,178 +88,171 @@ public class CounterColumnTest
 
         // tombstone > live
         left  = new DeletedColumn(ByteBufferUtil.bytes("x"), 1, 2L);
-        right = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBufferUtil.bytes(0L), 1L);
+        right = new CounterColumn(ByteBufferUtil.bytes("x"), 0L, 1L);
 
         assert left.reconcile(right) == left;
 
         // tombstone < live last delete
         left  = new DeletedColumn(ByteBufferUtil.bytes("x"), 1, 1L);
-        right = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBufferUtil.bytes(0L), 4L, new byte[0], 2L);
+        right = new CounterColumn(ByteBufferUtil.bytes("x"), 0L, 4L, 2L);
 
         assert left.reconcile(right) == right;
 
         // tombstone == live last delete
         left  = new DeletedColumn(ByteBufferUtil.bytes("x"), 1, 2L);
-        right = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBufferUtil.bytes(0L), 4L, new byte[0], 2L);
+        right = new CounterColumn(ByteBufferUtil.bytes("x"), 0L, 4L, 2L);
 
         assert left.reconcile(right) == right;
 
         // tombstone > live last delete
         left  = new DeletedColumn(ByteBufferUtil.bytes("x"), 1, 4L);
-        right = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBufferUtil.bytes(0L), 9L, new byte[0], 1L);
+        right = new CounterColumn(ByteBufferUtil.bytes("x"), 0L, 9L, 1L);
 
         reconciled = left.reconcile(right);
         assert reconciled.name() == right.name();
         assert reconciled.value() == right.value();
         assert reconciled.timestamp() == right.timestamp();
-        assert ((CounterColumn)reconciled).partitionedCounter() == ((CounterColumn)right).partitionedCounter();
         assert ((CounterColumn)reconciled).timestampOfLastDelete() == left.timestamp();
 
         // live < tombstone
-        left  = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBufferUtil.bytes(0L), 1L);
+        left  = new CounterColumn(ByteBufferUtil.bytes("x"), 0L, 1L);
         right = new DeletedColumn(ByteBufferUtil.bytes("x"), 1, 2L);
 
         assert left.reconcile(right) == right;
 
         // live last delete > tombstone
-        left  = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBufferUtil.bytes(0L), 4L, new byte[0], 2L);
+        left  = new CounterColumn(ByteBufferUtil.bytes("x"), 0L, 4L, 2L);
         right = new DeletedColumn(ByteBufferUtil.bytes("x"), 1, 1L);
 
         assert left.reconcile(right) == left;
 
         // live last delete == tombstone
-        left  = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBufferUtil.bytes(0L), 4L, new byte[0], 2L);
+        left  = new CounterColumn(ByteBufferUtil.bytes("x"), 0L, 4L, 2L);
         right = new DeletedColumn(ByteBufferUtil.bytes("x"), 1, 2L);
 
         assert left.reconcile(right) == left;
 
         // live last delete < tombstone
-        left  = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBufferUtil.bytes(0L), 9L, new byte[0], 1L);
+        left  = new CounterColumn(ByteBufferUtil.bytes("x"), 0L, 9L, 1L);
         right = new DeletedColumn(ByteBufferUtil.bytes("x"), 1, 4L);
 
         reconciled = left.reconcile(right);
         assert reconciled.name() == left.name();
         assert reconciled.value() == left.value();
         assert reconciled.timestamp() == left.timestamp();
-        assert ((CounterColumn)reconciled).partitionedCounter() == ((CounterColumn)left).partitionedCounter();
         assert ((CounterColumn)reconciled).timestampOfLastDelete() == right.timestamp();
 
         // live + live
-        byte[] context;
 
-        context = new byte[0];
-        context = cc.update(context, InetAddress.getByAddress(FBUtilities.toByteArray(1)), 1L);
-        context = cc.update(context, InetAddress.getByAddress(FBUtilities.toByteArray(1)), 0L);
-        context = cc.update(context, InetAddress.getByAddress(FBUtilities.toByteArray(1)), 1L);
-        context = cc.update(context, InetAddress.getByAddress(FBUtilities.toByteArray(2)), 0L);
-        context = cc.update(context, InetAddress.getByAddress(FBUtilities.toByteArray(2)), 5L);
-        left  = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(context)), 9L, context, 1L);
-
-        context = new byte[0];
-        context = cc.update(context, InetAddress.getByAddress(FBUtilities.toByteArray(2)), 4L);
-        context = cc.update(context, InetAddress.getByAddress(FBUtilities.toByteArray(3)), 2L);
-        right = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(context)), 3L, context, 4L);
+        left = new CounterColumn(ByteBufferUtil.bytes("x"), cc.create(FBUtilities.toByteArray(1), 1L, 1L), 4L);
+        right = new CounterColumn(ByteBufferUtil.bytes("x"), cc.create(FBUtilities.toByteArray(1), 2L, 3L), 1L);
 
         reconciled = left.reconcile(right);
+        assert reconciled.name().equals(left.name());
+        assert ((CounterColumn)reconciled).total() == 3L;
+        assert reconciled.timestamp() == 4L;
 
-        assert reconciled.name() == left.name();
-        assert 9L == reconciled.value().getLong(reconciled.value().position());
-        assert reconciled.timestamp() == 9L;
+        left = reconciled;
+        right = new CounterColumn(ByteBufferUtil.bytes("x"), cc.create(FBUtilities.toByteArray(2), 1L, 5L), 2L);
 
-        context = ((CounterColumn)reconciled).partitionedCounter();
-        assert 3 * stepLength == context.length;
+        reconciled = left.reconcile(right);
 
-        assert  1 == FBUtilities.byteArrayToInt(context,  0*stepLength);
-        assert 3L == FBUtilities.byteArrayToLong(context, 0*stepLength + idLength);
-        assert 2L == FBUtilities.byteArrayToLong(context, 0*stepLength + idLength + clockLength);
+        assert reconciled.name().equals(left.name());
+        assert ((CounterColumn)reconciled).total() == 8L;
+        assert reconciled.timestamp() == 4L;
 
-        assert  2 == FBUtilities.byteArrayToInt(context,  1*stepLength);
-        assert 2L == FBUtilities.byteArrayToLong(context, 1*stepLength + idLength);
-        assert 5L == FBUtilities.byteArrayToLong(context, 1*stepLength + idLength + clockLength);
+        left = reconciled;
+        right = new CounterColumn(ByteBufferUtil.bytes("x"), cc.create(FBUtilities.toByteArray(2), 2L, 2L), 6L);
 
-        assert  3 == FBUtilities.byteArrayToInt(context,  2*stepLength);
-        assert 1L == FBUtilities.byteArrayToLong(context, 2*stepLength + idLength);
-        assert 2L == FBUtilities.byteArrayToLong(context, 2*stepLength + idLength + clockLength);
+        reconciled = left.reconcile(right);
+        assert reconciled.name().equals(left.name());
+        assert ((CounterColumn)reconciled).total() == 5L;
+        assert reconciled.timestamp() == 6L;
+
+        ByteBuffer context = reconciled.value();
+        assert 2 * stepLength == context.remaining();
+
+        assert  1 == context.getInt(0*stepLength);
+        assert 2L == context.getLong(0*stepLength + idLength);
+        assert 3L == context.getLong(0*stepLength + idLength + clockLength);
+
+        assert  2 == context.getInt(1*stepLength);
+        assert 2L == context.getLong(1*stepLength + idLength);
+        assert 2L == context.getLong(1*stepLength + idLength + clockLength);
 
-        assert ((CounterColumn)reconciled).timestampOfLastDelete() == 4L;
+        assert ((CounterColumn)reconciled).timestampOfLastDelete() == Long.MIN_VALUE;
     }
 
     @Test
     public void testDiff() throws UnknownHostException
     {
-        byte[] left;
-        byte[] right;
+        ByteBuffer left;
+        ByteBuffer right;
 
         CounterColumn leftCol;
         CounterColumn rightCol;
 
         // timestamp
-        left    = new byte[0];
-        leftCol = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(left)), 1L, left);
-
-        right    = new byte[0];
-        rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(right)), 2L, right);
+        leftCol = new CounterColumn(ByteBufferUtil.bytes("x"), 0, 1L);
+        rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), 0, 2L);
 
         assert rightCol == leftCol.diff(rightCol);
         assert null     == rightCol.diff(leftCol);
 
         // timestampOfLastDelete
-        left    = new byte[0];
-        leftCol = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(left)), 1L, left, 1L);
-
-        right    = new byte[0];
-        rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(right)), 1L, right, 2L);
+        leftCol = new CounterColumn(ByteBufferUtil.bytes("x"), 0, 1L, 1L);
+        rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), 0, 1L, 2L);
 
         assert rightCol == leftCol.diff(rightCol);
         assert null     == rightCol.diff(leftCol);
 
         // equality: equal nodes, all counts same
-        left = Util.concatByteArrays(
+        left = ByteBuffer.wrap(Util.concatByteArrays(
             FBUtilities.toByteArray(3), FBUtilities.toByteArray(3L), FBUtilities.toByteArray(0L),
             FBUtilities.toByteArray(6), FBUtilities.toByteArray(2L), FBUtilities.toByteArray(0L),
             FBUtilities.toByteArray(9), FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L)
-            );
-        left = cc.update(left, InetAddress.getByAddress(FBUtilities.toByteArray(3)), 0L);
-        right = ArrayUtils.clone(left);
+            ));
+        //left = cc.update(left, InetAddress.getByAddress(FBUtilities.toByteArray(3)), 0L);
+        right = ByteBufferUtil.clone(left);
 
-        leftCol  = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(left)),  1L, left);
-        rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(right)), 1L, right);
+        leftCol  = new CounterColumn(ByteBufferUtil.bytes("x"), left,  1L);
+        rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), right, 1L);
         assert null == leftCol.diff(rightCol);
 
         // greater than: left has superset of nodes (counts equal)
-        left = Util.concatByteArrays(
+        left = ByteBuffer.wrap(Util.concatByteArrays(
             FBUtilities.toByteArray(3),  FBUtilities.toByteArray(3L), FBUtilities.toByteArray(0L),
             FBUtilities.toByteArray(6),  FBUtilities.toByteArray(2L), FBUtilities.toByteArray(0L),
             FBUtilities.toByteArray(9),  FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L),
             FBUtilities.toByteArray(12), FBUtilities.toByteArray(0L), FBUtilities.toByteArray(0L)
-            );
-        right = Util.concatByteArrays(
+            ));
+        right = ByteBuffer.wrap(Util.concatByteArrays(
             FBUtilities.toByteArray(3),  FBUtilities.toByteArray(3L), FBUtilities.toByteArray(0L),
             FBUtilities.toByteArray(6),  FBUtilities.toByteArray(2L), FBUtilities.toByteArray(0L),
             FBUtilities.toByteArray(9),  FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L)
-            );
+            ));
 
-        leftCol  = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(left)),  1L, left);
-        rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(right)), 1L, right);
+        leftCol  = new CounterColumn(ByteBufferUtil.bytes("x"), left,  1L);
+        rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), right, 1L);
         assert null == leftCol.diff(rightCol);
 
         // less than: right has subset of nodes (counts equal)
         assert leftCol == rightCol.diff(leftCol);
 
         // disjoint: right and left have disjoint node sets
-        left = Util.concatByteArrays(
+        left = ByteBuffer.wrap(Util.concatByteArrays(
             FBUtilities.toByteArray(3),  FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L),
             FBUtilities.toByteArray(4),  FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L),
             FBUtilities.toByteArray(9),  FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L)
-            );
-        right = Util.concatByteArrays(
+            ));
+        right = ByteBuffer.wrap(Util.concatByteArrays(
             FBUtilities.toByteArray(3),  FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L),
             FBUtilities.toByteArray(6),  FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L),
             FBUtilities.toByteArray(9),  FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L)
-            );
+            ));
 
-        leftCol  = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(left)),  1L, left);
-        rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(right)), 1L, right);
+        leftCol  = new CounterColumn(ByteBufferUtil.bytes("x"), left,  1L);
+        rightCol = new CounterColumn(ByteBufferUtil.bytes("x"), right, 1L);
         assert rightCol == leftCol.diff(rightCol);
         assert leftCol  == rightCol.diff(leftCol);
     }
@@ -310,17 +260,17 @@ public class CounterColumnTest
     @Test
     public void testCleanNodeCounts() throws UnknownHostException
     {
-        byte[] context = Util.concatByteArrays(
+        ByteBuffer context = ByteBuffer.wrap(Util.concatByteArrays(
             FBUtilities.toByteArray(1),  FBUtilities.toByteArray(1L), FBUtilities.toByteArray(1L),
             FBUtilities.toByteArray(2),  FBUtilities.toByteArray(2L), FBUtilities.toByteArray(2L),
             FBUtilities.toByteArray(4),  FBUtilities.toByteArray(3L), FBUtilities.toByteArray(3L),
             FBUtilities.toByteArray(8),  FBUtilities.toByteArray(4L), FBUtilities.toByteArray(4L)
-            );
-        CounterColumn c = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(context)), 1L, context);
+            ));
+        CounterColumn c = new CounterColumn(ByteBufferUtil.bytes("x"), context, 1L);
 
         CounterColumn d = c.cleanNodeCounts(InetAddress.getByAddress(FBUtilities.toByteArray(4)));
 
-        assertEquals(7L, d.value().getLong(d.value().arrayOffset()));
+        assertEquals(7L, d.total());
     }
 
     @Test
@@ -328,13 +278,13 @@ public class CounterColumnTest
     {
         ColumnFamily cf;
 
-        byte[] context = Util.concatByteArrays(
+        ByteBuffer context = ByteBuffer.wrap(Util.concatByteArrays(
             FBUtilities.toByteArray(1),  FBUtilities.toByteArray(1L), FBUtilities.toByteArray(1L),
             FBUtilities.toByteArray(2),  FBUtilities.toByteArray(2L), FBUtilities.toByteArray(2L),
             FBUtilities.toByteArray(4),  FBUtilities.toByteArray(3L), FBUtilities.toByteArray(3L),
             FBUtilities.toByteArray(8),  FBUtilities.toByteArray(4L), FBUtilities.toByteArray(4L)
-            );
-        CounterColumn original = new CounterColumn(ByteBufferUtil.bytes("x"), ByteBuffer.wrap(cc.total(context)), 1L, context);
+            ));
+        CounterColumn original = new CounterColumn(ByteBufferUtil.bytes("x"), context, 1L);
 
         DataOutputBuffer bufOut = new DataOutputBuffer();
         Column.serializer().serialize(original, bufOut);

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/SuperColumnTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/SuperColumnTest.java?rev=1063928&r1=1063927&r2=1063928&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/SuperColumnTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/SuperColumnTest.java Thu Jan 27 00:29:39 2011
@@ -55,21 +55,21 @@ public class SuperColumnTest
             FBUtilities.toByteArray(4), FBUtilities.toByteArray(2L), FBUtilities.toByteArray(9L),
             FBUtilities.getLocalAddress().getAddress(), FBUtilities.toByteArray(3L), FBUtilities.toByteArray(3L)
             );
-    	sc.addColumn(new CounterColumn(getBytes(1), ByteBuffer.wrap(cc.total(context)), 3L, context, 0L));
+        sc.addColumn(new CounterColumn(getBytes(1), ByteBuffer.wrap(context), 3L, 0L));
         context = concatByteArrays(
             FBUtilities.toByteArray(2), FBUtilities.toByteArray(3L), FBUtilities.toByteArray(4L),
             FBUtilities.toByteArray(4), FBUtilities.toByteArray(4L), FBUtilities.toByteArray(1L),
             FBUtilities.toByteArray(8), FBUtilities.toByteArray(9L), FBUtilities.toByteArray(0L),
             FBUtilities.getLocalAddress().getAddress(), FBUtilities.toByteArray(9L), FBUtilities.toByteArray(5L)
             );
-    	sc.addColumn(new CounterColumn(getBytes(1), ByteBuffer.wrap(cc.total(context)), 10L, context, 0L));
+        sc.addColumn(new CounterColumn(getBytes(1), ByteBuffer.wrap(context), 10L, 0L));
 
         context = concatByteArrays(
             FBUtilities.toByteArray(2), FBUtilities.toByteArray(1L), FBUtilities.toByteArray(0L),
             FBUtilities.toByteArray(3), FBUtilities.toByteArray(6L), FBUtilities.toByteArray(0L),
             FBUtilities.toByteArray(7), FBUtilities.toByteArray(3L), FBUtilities.toByteArray(0L)
             );
-    	sc.addColumn(new CounterColumn(getBytes(2), ByteBuffer.wrap(cc.total(context)), 9L, context, 0L));
+        sc.addColumn(new CounterColumn(getBytes(2), ByteBuffer.wrap(context), 9L, 0L));
                     
     	assertNotNull(sc.getSubColumn(getBytes(1)));
     	assertNull(sc.getSubColumn(getBytes(3)));
@@ -82,10 +82,10 @@ public class SuperColumnTest
                 FBUtilities.toByteArray(8), FBUtilities.toByteArray(9L), FBUtilities.toByteArray(0L),
                 FBUtilities.getLocalAddress().getAddress(), FBUtilities.toByteArray(12L), FBUtilities.toByteArray(8L)
                 );
-        assert 0 == FBUtilities.compareByteSubArrays(
-            ((CounterColumn)sc.getSubColumn(getBytes(1))).partitionedCounter(),
+        assert 0 == ByteBufferUtil.compareSubArrays(
+            ((CounterColumn)sc.getSubColumn(getBytes(1))).value(),
             0,
-            c1,
+            ByteBuffer.wrap(c1),
             0,
             c1.length);
 
@@ -95,10 +95,10 @@ public class SuperColumnTest
                 FBUtilities.toByteArray(3), FBUtilities.toByteArray(6L), FBUtilities.toByteArray(0L),
                 FBUtilities.toByteArray(7), FBUtilities.toByteArray(3L), FBUtilities.toByteArray(0L)
                 );
-        assert 0 == FBUtilities.compareByteSubArrays(
-            ((CounterColumn)sc.getSubColumn(getBytes(2))).partitionedCounter(),
+        assert 0 == ByteBufferUtil.compareSubArrays(
+            ((CounterColumn)sc.getSubColumn(getBytes(2))).value(),
             0,
-            c2,
+            ByteBuffer.wrap(c2),
             0,
             c2.length);