You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/12/17 17:53:13 UTC

[5/6] Rename Column to Cell

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index c2134c2..2ea60f1 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -54,7 +54,7 @@ import org.apache.cassandra.utils.*;
  * Whether the implementation is thread safe or not is left to the
  * implementing classes.
  */
-public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
+public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry
 {
     /* The column serializer for this Column Family. Create based on config. */
     public static final ColumnFamilySerializer serializer = new ColumnFamilySerializer();
@@ -102,20 +102,20 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
         return metadata;
     }
 
-    public void addIfRelevant(Column column, DeletionInfo.InOrderTester tester, int gcBefore)
+    public void addIfRelevant(Cell cell, DeletionInfo.InOrderTester tester, int gcBefore)
     {
-        // the column itself must be not gc-able (it is live, or a still relevant tombstone), (1)
-        // and if its container is deleted, the column must be changed more recently than the container tombstone (2)
-        if ((column.getLocalDeletionTime() >= gcBefore) // (1)
-            && (!tester.isDeleted(column.name(), column.timestamp())))                                // (2)
+        // the cell itself must be not gc-able (it is live, or a still relevant tombstone), (1)
+        // and if its container is deleted, the cell must be changed more recently than the container tombstone (2)
+        if ((cell.getLocalDeletionTime() >= gcBefore) // (1)
+            && (!tester.isDeleted(cell.name(), cell.timestamp())))                                // (2)
         {
-            addColumn(column);
+            addColumn(cell);
         }
     }
 
-    public void addColumn(Column column)
+    public void addColumn(Cell cell)
     {
-        addColumn(column, HeapAllocator.instance);
+        addColumn(cell, HeapAllocator.instance);
     }
 
     public void addColumn(CellName name, ByteBuffer value, long timestamp)
@@ -126,30 +126,30 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
     public void addColumn(CellName name, ByteBuffer value, long timestamp, int timeToLive)
     {
         assert !metadata().getDefaultValidator().isCommutative();
-        Column column = Column.create(name, value, timestamp, timeToLive, metadata());
-        addColumn(column);
+        Cell cell = Cell.create(name, value, timestamp, timeToLive, metadata());
+        addColumn(cell);
     }
 
     public void addCounter(CellName name, long value)
     {
-        addColumn(new CounterUpdateColumn(name, value, System.currentTimeMillis()));
+        addColumn(new CounterUpdateCell(name, value, System.currentTimeMillis()));
     }
 
     public void addTombstone(CellName name, ByteBuffer localDeletionTime, long timestamp)
     {
-        addColumn(new DeletedColumn(name, localDeletionTime, timestamp));
+        addColumn(new DeletedCell(name, localDeletionTime, timestamp));
     }
 
     public void addTombstone(CellName name, int localDeletionTime, long timestamp)
     {
-        addColumn(new DeletedColumn(name, localDeletionTime, timestamp));
+        addColumn(new DeletedCell(name, localDeletionTime, timestamp));
     }
 
     public void addAtom(OnDiskAtom atom)
     {
-        if (atom instanceof Column)
+        if (atom instanceof Cell)
         {
-            addColumn((Column)atom);
+            addColumn((Cell)atom);
         }
         else
         {
@@ -194,35 +194,35 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
     public abstract void purgeTombstones(int gcBefore);
 
     /**
-     * Adds a column to this column map.
-     * If a column with the same name is already present in the map, it will
-     * be replaced by the newly added column.
+     * Adds a cell to this cell map.
+     * If a cell with the same name is already present in the map, it will
+     * be replaced by the newly added cell.
      */
-    public abstract void addColumn(Column column, Allocator allocator);
+    public abstract void addColumn(Cell cell, Allocator allocator);
 
     /**
      * Adds all the columns of a given column map to this column map.
      * This is equivalent to:
      *   <code>
-     *   for (Column c : cm)
+     *   for (Cell c : cm)
      *      addColumn(c, ...);
      *   </code>
      *  but is potentially faster.
      */
-    public abstract void addAll(ColumnFamily cm, Allocator allocator, Function<Column, Column> transformation);
+    public abstract void addAll(ColumnFamily cm, Allocator allocator, Function<Cell, Cell> transformation);
 
     /**
-     * Replace oldColumn if present by newColumn.
-     * Returns true if oldColumn was present and thus replaced.
-     * oldColumn and newColumn should have the same name.
+     * Replace oldCell if present by newCell.
+     * Returns true if oldCell was present and thus replaced.
+     * oldCell and newCell should have the same name.
      */
-    public abstract boolean replace(Column oldColumn, Column newColumn);
+    public abstract boolean replace(Cell oldCell, Cell newCell);
 
     /**
      * Get a column given its name, returning null if the column is not
      * present.
      */
-    public abstract Column getColumn(CellName name);
+    public abstract Cell getColumn(CellName name);
 
     /**
      * Returns an iterable with the names of columns in this column map in the same order
@@ -235,14 +235,14 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
      * The columns in the returned collection should be sorted as the columns
      * in this map.
      */
-    public abstract Collection<Column> getSortedColumns();
+    public abstract Collection<Cell> getSortedColumns();
 
     /**
      * Returns the columns of this column map as a collection.
      * The columns in the returned collection should be sorted in reverse
      * order of the columns in this map.
      */
-    public abstract Collection<Column> getReverseSortedColumns();
+    public abstract Collection<Cell> getReverseSortedColumns();
 
     /**
      * Returns the number of columns in this map.
@@ -261,13 +261,13 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
      * Returns an iterator over the columns of this map that returns only the matching @param slices.
      * The provided slices must be in order and must be non-overlapping.
      */
-    public abstract Iterator<Column> iterator(ColumnSlice[] slices);
+    public abstract Iterator<Cell> iterator(ColumnSlice[] slices);
 
     /**
      * Returns a reversed iterator over the columns of this map that returns only the matching @param slices.
      * The provided slices must be in reversed order and must be non-overlapping.
      */
-    public abstract Iterator<Column> reverseIterator(ColumnSlice[] slices);
+    public abstract Iterator<Cell> reverseIterator(ColumnSlice[] slices);
 
     /**
      * Returns if this map only support inserts in reverse order.
@@ -284,7 +284,7 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
 
     public void addAll(ColumnFamily cf, Allocator allocator)
     {
-        addAll(cf, allocator, Functions.<Column>identity());
+        addAll(cf, allocator, Functions.<Cell>identity());
     }
 
     /*
@@ -300,20 +300,20 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
         // (don't need to worry about cfNew containing Columns that are shadowed by
         // the delete tombstone, since cfNew was generated by CF.resolve, which
         // takes care of those for us.)
-        for (Column columnExternal : cfComposite)
+        for (Cell cellExternal : cfComposite)
         {
-            CellName cName = columnExternal.name();
-            Column columnInternal = getColumn(cName);
-            if (columnInternal == null)
+            CellName cName = cellExternal.name();
+            Cell cellInternal = getColumn(cName);
+            if (cellInternal == null)
             {
-                cfDiff.addColumn(columnExternal);
+                cfDiff.addColumn(cellExternal);
             }
             else
             {
-                Column columnDiff = columnInternal.diff(columnExternal);
-                if (columnDiff != null)
+                Cell cellDiff = cellInternal.diff(cellExternal);
+                if (cellDiff != null)
                 {
-                    cfDiff.addColumn(columnDiff);
+                    cfDiff.addColumn(cellDiff);
                 }
             }
         }
@@ -326,16 +326,16 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
     public long dataSize()
     {
         long size = 0;
-        for (Column column : this)
-            size += column.dataSize();
+        for (Cell cell : this)
+            size += cell.dataSize();
         return size;
     }
 
     public long maxTimestamp()
     {
         long maxTimestamp = deletionInfo().maxTimestamp();
-        for (Column column : this)
-            maxTimestamp = Math.max(maxTimestamp, column.maxTimestamp());
+        for (Cell cell : this)
+            maxTimestamp = Math.max(maxTimestamp, cell.maxTimestamp());
         return maxTimestamp;
     }
 
@@ -345,8 +345,8 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
         HashCodeBuilder builder = new HashCodeBuilder(373, 75437)
                 .append(metadata)
                 .append(deletionInfo());
-        for (Column column : this)
-            builder.append(column);
+        for (Cell cell : this)
+            builder.append(cell);
         return builder.toHashCode();
     }
 
@@ -388,8 +388,8 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
 
     public void updateDigest(MessageDigest digest)
     {
-        for (Column column : this)
-            column.updateDigest(digest);
+        for (Cell cell : this)
+            cell.updateDigest(digest);
     }
 
     public static ColumnFamily diff(ColumnFamily cf1, ColumnFamily cf2)
@@ -420,16 +420,16 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
         int maxLocalDeletionTime = Integer.MIN_VALUE;
         List<ByteBuffer> minColumnNamesSeen = Collections.emptyList();
         List<ByteBuffer> maxColumnNamesSeen = Collections.emptyList();
-        for (Column column : this)
+        for (Cell cell : this)
         {
-            minTimestampSeen = Math.min(minTimestampSeen, column.minTimestamp());
-            maxTimestampSeen = Math.max(maxTimestampSeen, column.maxTimestamp());
-            maxLocalDeletionTime = Math.max(maxLocalDeletionTime, column.getLocalDeletionTime());
-            int deletionTime = column.getLocalDeletionTime();
+            minTimestampSeen = Math.min(minTimestampSeen, cell.minTimestamp());
+            maxTimestampSeen = Math.max(maxTimestampSeen, cell.maxTimestamp());
+            maxLocalDeletionTime = Math.max(maxLocalDeletionTime, cell.getLocalDeletionTime());
+            int deletionTime = cell.getLocalDeletionTime();
             if (deletionTime < Integer.MAX_VALUE)
                 tombstones.update(deletionTime);
-            minColumnNamesSeen = ColumnNameHelper.minComponents(minColumnNamesSeen, column.name, metadata.comparator);
-            maxColumnNamesSeen = ColumnNameHelper.maxComponents(maxColumnNamesSeen, column.name, metadata.comparator);
+            minColumnNamesSeen = ColumnNameHelper.minComponents(minColumnNamesSeen, cell.name, metadata.comparator);
+            maxColumnNamesSeen = ColumnNameHelper.maxComponents(maxColumnNamesSeen, cell.name, metadata.comparator);
         }
         return new ColumnStats(getColumnCount(), minTimestampSeen, maxTimestampSeen, maxLocalDeletionTime, tombstones, minColumnNamesSeen, maxColumnNamesSeen);
     }
@@ -449,18 +449,18 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
 
     public boolean hasOnlyTombstones(long now)
     {
-        for (Column column : this)
-            if (column.isLive(now))
+        for (Cell cell : this)
+            if (cell.isLive(now))
                 return false;
         return true;
     }
 
-    public Iterator<Column> iterator()
+    public Iterator<Cell> iterator()
     {
         return getSortedColumns().iterator();
     }
 
-    public Iterator<Column> reverseIterator()
+    public Iterator<Cell> reverseIterator()
     {
         return getReverseSortedColumns().iterator();
     }
@@ -473,8 +473,8 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
 
         // Do we have colums that are either deleted by the container or gcable tombstone?
         DeletionInfo.InOrderTester tester = inOrderDeletionTester();
-        for (Column column : this)
-            if (tester.isDeleted(column) || column.hasIrrelevantData(gcBefore))
+        for (Cell cell : this)
+            if (tester.isDeleted(cell) || cell.hasIrrelevantData(gcBefore))
                 return true;
 
         return false;
@@ -483,8 +483,8 @@ public abstract class ColumnFamily implements Iterable<Column>, IRowCacheEntry
     public Map<CellName, ByteBuffer> asMap()
     {
         ImmutableMap.Builder<CellName, ByteBuffer> builder = ImmutableMap.builder();
-        for (Column column : this)
-            builder.put(column.name, column.value);
+        for (Cell cell : this)
+            builder.put(cell.name, cell.value);
         return builder.build();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
index 13ec6fc..92aa955 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilySerializer.java
@@ -72,9 +72,9 @@ public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily
             int count = cf.getColumnCount();
             out.writeInt(count);
             int written = 0;
-            for (Column column : cf)
+            for (Cell cell : cf)
             {
-                columnSerializer.serialize(column, out);
+                columnSerializer.serialize(cell, out);
                 written++;
             }
             assert count == written: "Column family had " + count + " columns, but " + written + " written";
@@ -131,8 +131,8 @@ public class ColumnFamilySerializer implements IVersionedSerializer<ColumnFamily
             size += cf.getComparator().deletionInfoSerializer().serializedSize(cf.deletionInfo(), typeSizes, version);
             size += typeSizes.sizeof(cf.getColumnCount());
             ColumnSerializer columnSerializer = cf.getComparator().columnSerializer();
-            for (Column column : cf)
-                size += columnSerializer.serializedSize(column, typeSizes);
+            for (Cell cell : cf)
+                size += columnSerializer.serializedSize(cell, typeSizes);
         }
         return size;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index f00e281..80f77bf 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -903,12 +903,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      */
     public static ColumnFamily removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer)
     {
-        Iterator<Column> iter = cf.iterator();
+        Iterator<Cell> iter = cf.iterator();
         DeletionInfo.InOrderTester tester = cf.inOrderDeletionTester();
         boolean hasDroppedColumns = !cf.metadata.getDroppedColumns().isEmpty();
         while (iter.hasNext())
         {
-            Column c = iter.next();
+            Cell c = iter.next();
             // remove columns if
             // (a) the column itself is gcable or
             // (b) the column is shadowed by a CF tombstone
@@ -926,7 +926,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     // returns true if
     // 1. this column has been dropped from schema and
     // 2. if it has been re-added since then, this particular column was inserted before the last drop
-    private static boolean isDroppedColumn(Column c, CFMetaData meta)
+    private static boolean isDroppedColumn(Cell c, CFMetaData meta)
     {
         Long droppedAt = meta.getDroppedColumns().get(c.name().cql3ColumnName());
         return droppedAt != null && c.timestamp() <= droppedAt;
@@ -937,7 +937,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         if (cf == null || cf.metadata.getDroppedColumns().isEmpty())
             return;
 
-        Iterator<Column> iter = cf.iterator();
+        Iterator<Cell> iter = cf.iterator();
         while (iter.hasNext())
             if (isDroppedColumn(iter.next(), metadata))
                 iter.remove();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 3e6d55d..f00f958 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -126,7 +126,7 @@ public class ColumnIndex
             Iterator<RangeTombstone> rangeIter = cf.deletionInfo().rangeIterator();
             RangeTombstone tombstone = rangeIter.hasNext() ? rangeIter.next() : null;
 
-            for (Column c : cf)
+            for (Cell c : cf)
             {
                 while (tombstone != null && comparator.compare(c.name(), tombstone.min) >= 0)
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/ColumnSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnSerializer.java b/src/java/org/apache/cassandra/db/ColumnSerializer.java
index a1c6ebd..3cfd900 100644
--- a/src/java/org/apache/cassandra/db/ColumnSerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnSerializer.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-public class ColumnSerializer implements ISerializer<Column>
+public class ColumnSerializer implements ISerializer<Cell>
 {
     public final static int DELETION_MASK        = 0x01;
     public final static int EXPIRATION_MASK      = 0x02;
@@ -61,24 +61,24 @@ public class ColumnSerializer implements ISerializer<Column>
         this.type = type;
     }
 
-    public void serialize(Column column, DataOutput out) throws IOException
+    public void serialize(Cell cell, DataOutput out) throws IOException
     {
-        assert !column.name().isEmpty();
-        type.cellSerializer().serialize(column.name(), out);
+        assert !cell.name().isEmpty();
+        type.cellSerializer().serialize(cell.name(), out);
         try
         {
-            out.writeByte(column.serializationFlags());
-            if (column instanceof CounterColumn)
+            out.writeByte(cell.serializationFlags());
+            if (cell instanceof CounterCell)
             {
-                out.writeLong(((CounterColumn)column).timestampOfLastDelete());
+                out.writeLong(((CounterCell) cell).timestampOfLastDelete());
             }
-            else if (column instanceof ExpiringColumn)
+            else if (cell instanceof ExpiringCell)
             {
-                out.writeInt(((ExpiringColumn) column).getTimeToLive());
-                out.writeInt(column.getLocalDeletionTime());
+                out.writeInt(((ExpiringCell) cell).getTimeToLive());
+                out.writeInt(cell.getLocalDeletionTime());
             }
-            out.writeLong(column.timestamp());
-            ByteBufferUtil.writeWithLength(column.value(), out);
+            out.writeLong(cell.timestamp());
+            ByteBufferUtil.writeWithLength(cell.value(), out);
         }
         catch (IOException e)
         {
@@ -86,7 +86,7 @@ public class ColumnSerializer implements ISerializer<Column>
         }
     }
 
-    public Column deserialize(DataInput in) throws IOException
+    public Cell deserialize(DataInput in) throws IOException
     {
         return deserialize(in, Flag.LOCAL);
     }
@@ -96,12 +96,12 @@ public class ColumnSerializer implements ISerializer<Column>
      * deserialize comes from a remote host. If it does, then we must clear
      * the delta.
      */
-    public Column deserialize(DataInput in, ColumnSerializer.Flag flag) throws IOException
+    public Cell deserialize(DataInput in, ColumnSerializer.Flag flag) throws IOException
     {
         return deserialize(in, flag, Integer.MIN_VALUE);
     }
 
-    public Column deserialize(DataInput in, ColumnSerializer.Flag flag, int expireBefore) throws IOException
+    public Cell deserialize(DataInput in, ColumnSerializer.Flag flag, int expireBefore) throws IOException
     {
         CellName name = type.cellSerializer().deserialize(in);
 
@@ -109,14 +109,14 @@ public class ColumnSerializer implements ISerializer<Column>
         return deserializeColumnBody(in, name, b, flag, expireBefore);
     }
 
-    Column deserializeColumnBody(DataInput in, CellName name, int mask, ColumnSerializer.Flag flag, int expireBefore) throws IOException
+    Cell deserializeColumnBody(DataInput in, CellName name, int mask, ColumnSerializer.Flag flag, int expireBefore) throws IOException
     {
         if ((mask & COUNTER_MASK) != 0)
         {
             long timestampOfLastDelete = in.readLong();
             long ts = in.readLong();
             ByteBuffer value = ByteBufferUtil.readWithLength(in);
-            return CounterColumn.create(name, value, ts, timestampOfLastDelete, flag);
+            return CounterCell.create(name, value, ts, timestampOfLastDelete, flag);
         }
         else if ((mask & EXPIRATION_MASK) != 0)
         {
@@ -124,17 +124,17 @@ public class ColumnSerializer implements ISerializer<Column>
             int expiration = in.readInt();
             long ts = in.readLong();
             ByteBuffer value = ByteBufferUtil.readWithLength(in);
-            return ExpiringColumn.create(name, value, ts, ttl, expiration, expireBefore, flag);
+            return ExpiringCell.create(name, value, ts, ttl, expiration, expireBefore, flag);
         }
         else
         {
             long ts = in.readLong();
             ByteBuffer value = ByteBufferUtil.readWithLength(in);
             return (mask & COUNTER_UPDATE_MASK) != 0
-                   ? new CounterUpdateColumn(name, value, ts)
+                   ? new CounterUpdateCell(name, value, ts)
                    : ((mask & DELETION_MASK) == 0
-                      ? new Column(name, value, ts)
-                      : new DeletedColumn(name, value, ts));
+                      ? new Cell(name, value, ts)
+                      : new DeletedCell(name, value, ts));
         }
     }
 
@@ -151,9 +151,9 @@ public class ColumnSerializer implements ISerializer<Column>
         FileUtils.skipBytesFully(in, length);
     }
 
-    public long serializedSize(Column column, TypeSizes typeSizes)
+    public long serializedSize(Cell cell, TypeSizes typeSizes)
     {
-        return column.serializedSize(type, typeSizes);
+        return cell.serializedSize(type, typeSizes);
     }
 
     public static class CorruptColumnException extends IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/CounterCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterCell.java b/src/java/org/apache/cassandra/db/CounterCell.java
new file mode 100644
index 0000000..0391eb8
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/CounterCell.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.net.InetAddress;
+import java.security.MessageDigest;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.apache.cassandra.serializers.MarshalException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.context.IContext.ContextRelationship;
+import org.apache.cassandra.exceptions.OverloadedException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.utils.Allocator;
+import org.apache.cassandra.service.AbstractWriteResponseHandler;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.utils.*;
+
+/**
+ * A column that represents a partitioned counter.
+ */
+public class CounterCell extends Cell
+{
+    private static final Logger logger = LoggerFactory.getLogger(CounterCell.class);
+
+    protected static final CounterContext contextManager = CounterContext.instance();
+
+    private final long timestampOfLastDelete;
+
+    public CounterCell(CellName name, long value, long timestamp)
+    {
+        this(name, contextManager.create(value, HeapAllocator.instance), timestamp);
+    }
+
+    public CounterCell(CellName name, long value, long timestamp, long timestampOfLastDelete)
+    {
+        this(name, contextManager.create(value, HeapAllocator.instance), timestamp, timestampOfLastDelete);
+    }
+
+    public CounterCell(CellName name, ByteBuffer value, long timestamp)
+    {
+        this(name, value, timestamp, Long.MIN_VALUE);
+    }
+
+    public CounterCell(CellName name, ByteBuffer value, long timestamp, long timestampOfLastDelete)
+    {
+        super(name, value, timestamp);
+        this.timestampOfLastDelete = timestampOfLastDelete;
+    }
+
+    public static CounterCell create(CellName name, ByteBuffer value, long timestamp, long timestampOfLastDelete, ColumnSerializer.Flag flag)
+    {
+        // #elt being negative means we have to clean delta
+        short count = value.getShort(value.position());
+        if (flag == ColumnSerializer.Flag.FROM_REMOTE || (flag == ColumnSerializer.Flag.LOCAL && count < 0))
+            value = CounterContext.instance().clearAllDelta(value);
+        return new CounterCell(name, value, timestamp, timestampOfLastDelete);
+    }
+
+    @Override
+    public Cell withUpdatedName(CellName newName)
+    {
+        return new CounterCell(newName, value, timestamp, timestampOfLastDelete);
+    }
+
+    public long timestampOfLastDelete()
+    {
+        return timestampOfLastDelete;
+    }
+
+    public long total()
+    {
+        return contextManager.total(value);
+    }
+
+    @Override
+    public int dataSize()
+    {
+        /*
+         * A counter column adds to a Cell :
+         *  + 8 bytes for timestampOfLastDelete
+         */
+        return super.dataSize() + TypeSizes.NATIVE.sizeof(timestampOfLastDelete);
+    }
+
+    @Override
+    public int serializedSize(CellNameType type, TypeSizes typeSizes)
+    {
+        return super.serializedSize(type, typeSizes) + typeSizes.sizeof(timestampOfLastDelete);
+    }
+
+    @Override
+    public Cell diff(Cell cell)
+    {
+        assert (cell instanceof CounterCell) || (cell instanceof DeletedCell) : "Wrong class type: " + cell.getClass();
+
+        if (timestamp() < cell.timestamp())
+            return cell;
+
+        // Note that if at that point, cell can't be a tombstone. Indeed,
+        // cell is the result of merging us with other nodes results, and
+        // merging a CounterCell with a tombstone never return a tombstone
+        // unless that tombstone timestamp is greater that the CounterCell
+        // one.
+        assert !(cell instanceof DeletedCell) : "Wrong class type: " + cell.getClass();
+
+        if (timestampOfLastDelete() < ((CounterCell) cell).timestampOfLastDelete())
+            return cell;
+        ContextRelationship rel = contextManager.diff(cell.value(), value());
+        if (ContextRelationship.GREATER_THAN == rel || ContextRelationship.DISJOINT == rel)
+            return cell;
+        return null;
+    }
+
+    /*
+     * We have to special case digest creation for counter column because
+     * we don't want to include the information about which shard of the
+     * context is a delta or not, since this information differs from node to
+     * node.
+     */
+    @Override
+    public void updateDigest(MessageDigest digest)
+    {
+        digest.update(name.toByteBuffer().duplicate());
+        // We don't take the deltas into account in a digest
+        contextManager.updateDigest(digest, value);
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        try
+        {
+            buffer.writeLong(timestamp);
+            buffer.writeByte(serializationFlags());
+            buffer.writeLong(timestampOfLastDelete);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+        digest.update(buffer.getData(), 0, buffer.getLength());
+    }
+
+    @Override
+    public Cell reconcile(Cell cell, Allocator allocator)
+    {
+        assert (cell instanceof CounterCell) || (cell instanceof DeletedCell) : "Wrong class type: " + cell.getClass();
+
+        // live + tombstone: track last tombstone
+        if (cell.isMarkedForDelete(Long.MIN_VALUE)) // cannot be an expired cell, so the current time is irrelevant
+        {
+            // live < tombstone
+            if (timestamp() < cell.timestamp())
+            {
+                return cell;
+            }
+            // live last delete >= tombstone
+            if (timestampOfLastDelete() >= cell.timestamp())
+            {
+                return this;
+            }
+            // live last delete < tombstone
+            return new CounterCell(name(), value(), timestamp(), cell.timestamp());
+        }
+        // live < live last delete
+        if (timestamp() < ((CounterCell) cell).timestampOfLastDelete())
+            return cell;
+        // live last delete > live
+        if (timestampOfLastDelete() > cell.timestamp())
+            return this;
+        // live + live: merge clocks; update value
+        return new CounterCell(
+            name(),
+            contextManager.merge(value(), cell.value(), allocator),
+            Math.max(timestamp(), cell.timestamp()),
+            Math.max(timestampOfLastDelete(), ((CounterCell) cell).timestampOfLastDelete()));
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        // super.equals() returns false if o is not a CounterCell
+        return super.equals(o) && timestampOfLastDelete == ((CounterCell)o).timestampOfLastDelete;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        int result = super.hashCode();
+        result = 31 * result + (int)(timestampOfLastDelete ^ (timestampOfLastDelete >>> 32));
+        return result;
+    }
+
+    @Override
+    public Cell localCopy(ColumnFamilyStore cfs)
+    {
+        return localCopy(cfs, HeapAllocator.instance);
+    }
+
+    @Override
+    public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
+    {
+        return new CounterCell(name.copy(allocator), allocator.clone(value), timestamp, timestampOfLastDelete);
+    }
+
+    @Override
+    public String getString(CellNameType comparator)
+    {
+        StringBuilder sb = new StringBuilder();
+        sb.append(comparator.getString(name));
+        sb.append(":");
+        sb.append(false);
+        sb.append(":");
+        sb.append(contextManager.toString(value));
+        sb.append("@");
+        sb.append(timestamp());
+        sb.append("!");
+        sb.append(timestampOfLastDelete);
+        return sb.toString();
+    }
+
+    @Override
+    public int serializationFlags()
+    {
+        return ColumnSerializer.COUNTER_MASK;
+    }
+
+    @Override
+    public void validateFields(CFMetaData metadata) throws MarshalException
+    {
+        validateName(metadata);
+        // We cannot use the value validator as for other columns as the CounterColumnType validate a long,
+        // which is not the internal representation of counters
+        contextManager.validateContext(value());
+    }
+
+    /**
+     * Check if a given counterId is found in this CounterCell context.
+     */
+    public boolean hasCounterId(CounterId id)
+    {
+        return contextManager.hasCounterId(value(), id);
+    }
+
+    private CounterCell computeOldShardMerger(int mergeBefore)
+    {
+        ByteBuffer bb = contextManager.computeOldShardMerger(value(), CounterId.getOldLocalCounterIds(), mergeBefore);
+        if (bb == null)
+            return null;
+        else
+            return new CounterCell(name(), bb, timestamp(), timestampOfLastDelete);
+    }
+
+    private CounterCell removeOldShards(int gcBefore)
+    {
+        ByteBuffer bb = contextManager.removeOldShards(value(), gcBefore);
+        if (bb == value())
+            return this;
+        else
+        {
+            return new CounterCell(name(), bb, timestamp(), timestampOfLastDelete);
+        }
+    }
+
+    public static void mergeAndRemoveOldShards(DecoratedKey key, ColumnFamily cf, int gcBefore, int mergeBefore)
+    {
+        mergeAndRemoveOldShards(key, cf, gcBefore, mergeBefore, true);
+    }
+
+    /**
+     * There is two phase to the removal of old shards.
+     * First phase: we merge the old shard value to the current shard and
+     * 'nulify' the old one. We then send the counter context with the old
+     * shard nulified to all other replica.
+     * Second phase: once an old shard has been nulified for longer than
+     * gc_grace (to be sure all other replica had been aware of the merge), we
+     * simply remove that old shard from the context (it's value is 0).
+     * This method does both phases.
+     * (Note that the sendToOtherReplica flag is here only to facilitate
+     * testing. It should be true in real code so use the method above
+     * preferably)
+     */
+    public static void mergeAndRemoveOldShards(DecoratedKey key, ColumnFamily cf, int gcBefore, int mergeBefore, boolean sendToOtherReplica)
+    {
+        ColumnFamily remoteMerger = null;
+
+        for (Cell c : cf)
+        {
+            if (!(c instanceof CounterCell))
+                continue;
+            CounterCell cc = (CounterCell) c;
+            CounterCell shardMerger = cc.computeOldShardMerger(mergeBefore);
+            CounterCell merged = cc;
+            if (shardMerger != null)
+            {
+                merged = (CounterCell) cc.reconcile(shardMerger);
+                if (remoteMerger == null)
+                    remoteMerger = cf.cloneMeShallow();
+                remoteMerger.addColumn(merged);
+            }
+            CounterCell cleaned = merged.removeOldShards(gcBefore);
+            if (cleaned != cc)
+            {
+                cf.replace(cc, cleaned);
+            }
+        }
+
+        if (remoteMerger != null && sendToOtherReplica)
+        {
+            try
+            {
+                sendToOtherReplica(key, remoteMerger);
+            }
+            catch (Exception e)
+            {
+                logger.error("Error while sending shard merger mutation to remote endpoints", e);
+            }
+        }
+    }
+
+    public Cell markDeltaToBeCleared()
+    {
+        return new CounterCell(name, contextManager.markDeltaToBeCleared(value), timestamp, timestampOfLastDelete);
+    }
+
+    private static void sendToOtherReplica(DecoratedKey key, ColumnFamily cf) throws RequestExecutionException
+    {
+        RowMutation rm = new RowMutation(cf.metadata().ksName, key.key, cf);
+
+        final InetAddress local = FBUtilities.getBroadcastAddress();
+        String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(local);
+
+        StorageProxy.performWrite(rm, ConsistencyLevel.ANY, localDataCenter, new StorageProxy.WritePerformer()
+        {
+            public void apply(IMutation mutation, Iterable<InetAddress> targets, AbstractWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level)
+            throws OverloadedException
+            {
+                // We should only send to the remote replica, not the local one
+                Set<InetAddress> remotes = Sets.difference(ImmutableSet.copyOf(targets), ImmutableSet.of(local));
+                // Fake local response to be a good lad but we won't wait on the responseHandler
+                responseHandler.response(null);
+                StorageProxy.sendToHintedEndpoints((RowMutation) mutation, remotes, responseHandler, localDataCenter);
+            }
+        }, null, WriteType.SIMPLE);
+
+        // we don't wait for answers
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/CounterColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterColumn.java b/src/java/org/apache/cassandra/db/CounterColumn.java
deleted file mode 100644
index ac2c88e..0000000
--- a/src/java/org/apache/cassandra/db/CounterColumn.java
+++ /dev/null
@@ -1,372 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.net.InetAddress;
-import java.security.MessageDigest;
-import java.util.Set;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-import org.apache.cassandra.serializers.MarshalException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.context.CounterContext;
-import org.apache.cassandra.db.context.IContext.ContextRelationship;
-import org.apache.cassandra.exceptions.OverloadedException;
-import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.utils.Allocator;
-import org.apache.cassandra.service.AbstractWriteResponseHandler;
-import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.utils.*;
-
-/**
- * A column that represents a partitioned counter.
- */
-public class CounterColumn extends Column
-{
-    private static final Logger logger = LoggerFactory.getLogger(CounterColumn.class);
-
-    protected static final CounterContext contextManager = CounterContext.instance();
-
-    private final long timestampOfLastDelete;
-
-    public CounterColumn(CellName name, long value, long timestamp)
-    {
-        this(name, contextManager.create(value, HeapAllocator.instance), timestamp);
-    }
-
-    public CounterColumn(CellName name, long value, long timestamp, long timestampOfLastDelete)
-    {
-        this(name, contextManager.create(value, HeapAllocator.instance), timestamp, timestampOfLastDelete);
-    }
-
-    public CounterColumn(CellName name, ByteBuffer value, long timestamp)
-    {
-        this(name, value, timestamp, Long.MIN_VALUE);
-    }
-
-    public CounterColumn(CellName name, ByteBuffer value, long timestamp, long timestampOfLastDelete)
-    {
-        super(name, value, timestamp);
-        this.timestampOfLastDelete = timestampOfLastDelete;
-    }
-
-    public static CounterColumn create(CellName name, ByteBuffer value, long timestamp, long timestampOfLastDelete, ColumnSerializer.Flag flag)
-    {
-        // #elt being negative means we have to clean delta
-        short count = value.getShort(value.position());
-        if (flag == ColumnSerializer.Flag.FROM_REMOTE || (flag == ColumnSerializer.Flag.LOCAL && count < 0))
-            value = CounterContext.instance().clearAllDelta(value);
-        return new CounterColumn(name, value, timestamp, timestampOfLastDelete);
-    }
-
-    @Override
-    public Column withUpdatedName(CellName newName)
-    {
-        return new CounterColumn(newName, value, timestamp, timestampOfLastDelete);
-    }
-
-    public long timestampOfLastDelete()
-    {
-        return timestampOfLastDelete;
-    }
-
-    public long total()
-    {
-        return contextManager.total(value);
-    }
-
-    @Override
-    public int dataSize()
-    {
-        /*
-         * A counter column adds to a Column :
-         *  + 8 bytes for timestampOfLastDelete
-         */
-        return super.dataSize() + TypeSizes.NATIVE.sizeof(timestampOfLastDelete);
-    }
-
-    @Override
-    public int serializedSize(CellNameType type, TypeSizes typeSizes)
-    {
-        return super.serializedSize(type, typeSizes) + typeSizes.sizeof(timestampOfLastDelete);
-    }
-
-    @Override
-    public Column diff(Column column)
-    {
-        assert (column instanceof CounterColumn) || (column instanceof DeletedColumn) : "Wrong class type: " + column.getClass();
-
-        if (timestamp() < column.timestamp())
-            return column;
-
-        // Note that if at that point, column can't be a tombstone. Indeed,
-        // column is the result of merging us with other nodes results, and
-        // merging a CounterColumn with a tombstone never return a tombstone
-        // unless that tombstone timestamp is greater that the CounterColumn
-        // one.
-        assert !(column instanceof DeletedColumn) : "Wrong class type: " + column.getClass();
-
-        if (timestampOfLastDelete() < ((CounterColumn)column).timestampOfLastDelete())
-            return column;
-        ContextRelationship rel = contextManager.diff(column.value(), value());
-        if (ContextRelationship.GREATER_THAN == rel || ContextRelationship.DISJOINT == rel)
-            return column;
-        return null;
-    }
-
-    /*
-     * We have to special case digest creation for counter column because
-     * we don't want to include the information about which shard of the
-     * context is a delta or not, since this information differs from node to
-     * node.
-     */
-    @Override
-    public void updateDigest(MessageDigest digest)
-    {
-        digest.update(name.toByteBuffer().duplicate());
-        // We don't take the deltas into account in a digest
-        contextManager.updateDigest(digest, value);
-        DataOutputBuffer buffer = new DataOutputBuffer();
-        try
-        {
-            buffer.writeLong(timestamp);
-            buffer.writeByte(serializationFlags());
-            buffer.writeLong(timestampOfLastDelete);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-        digest.update(buffer.getData(), 0, buffer.getLength());
-    }
-
-    @Override
-    public Column reconcile(Column column, Allocator allocator)
-    {
-        assert (column instanceof CounterColumn) || (column instanceof DeletedColumn) : "Wrong class type: " + column.getClass();
-
-        // live + tombstone: track last tombstone
-        if (column.isMarkedForDelete(Long.MIN_VALUE)) // cannot be an expired column, so the current time is irrelevant
-        {
-            // live < tombstone
-            if (timestamp() < column.timestamp())
-            {
-                return column;
-            }
-            // live last delete >= tombstone
-            if (timestampOfLastDelete() >= column.timestamp())
-            {
-                return this;
-            }
-            // live last delete < tombstone
-            return new CounterColumn(name(), value(), timestamp(), column.timestamp());
-        }
-        // live < live last delete
-        if (timestamp() < ((CounterColumn)column).timestampOfLastDelete())
-            return column;
-        // live last delete > live
-        if (timestampOfLastDelete() > column.timestamp())
-            return this;
-        // live + live: merge clocks; update value
-        return new CounterColumn(
-            name(),
-            contextManager.merge(value(), column.value(), allocator),
-            Math.max(timestamp(), column.timestamp()),
-            Math.max(timestampOfLastDelete(), ((CounterColumn)column).timestampOfLastDelete()));
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        // super.equals() returns false if o is not a CounterColumn
-        return super.equals(o) && timestampOfLastDelete == ((CounterColumn)o).timestampOfLastDelete;
-    }
-
-    @Override
-    public int hashCode()
-    {
-        int result = super.hashCode();
-        result = 31 * result + (int)(timestampOfLastDelete ^ (timestampOfLastDelete >>> 32));
-        return result;
-    }
-
-    @Override
-    public Column localCopy(ColumnFamilyStore cfs)
-    {
-        return localCopy(cfs, HeapAllocator.instance);
-    }
-
-    @Override
-    public Column localCopy(ColumnFamilyStore cfs, Allocator allocator)
-    {
-        return new CounterColumn(name.copy(allocator), allocator.clone(value), timestamp, timestampOfLastDelete);
-    }
-
-    @Override
-    public String getString(CellNameType comparator)
-    {
-        StringBuilder sb = new StringBuilder();
-        sb.append(comparator.getString(name));
-        sb.append(":");
-        sb.append(false);
-        sb.append(":");
-        sb.append(contextManager.toString(value));
-        sb.append("@");
-        sb.append(timestamp());
-        sb.append("!");
-        sb.append(timestampOfLastDelete);
-        return sb.toString();
-    }
-
-    @Override
-    public int serializationFlags()
-    {
-        return ColumnSerializer.COUNTER_MASK;
-    }
-
-    @Override
-    public void validateFields(CFMetaData metadata) throws MarshalException
-    {
-        validateName(metadata);
-        // We cannot use the value validator as for other columns as the CounterColumnType validate a long,
-        // which is not the internal representation of counters
-        contextManager.validateContext(value());
-    }
-
-    /**
-     * Check if a given counterId is found in this CounterColumn context.
-     */
-    public boolean hasCounterId(CounterId id)
-    {
-        return contextManager.hasCounterId(value(), id);
-    }
-
-    private CounterColumn computeOldShardMerger(int mergeBefore)
-    {
-        ByteBuffer bb = contextManager.computeOldShardMerger(value(), CounterId.getOldLocalCounterIds(), mergeBefore);
-        if (bb == null)
-            return null;
-        else
-            return new CounterColumn(name(), bb, timestamp(), timestampOfLastDelete);
-    }
-
-    private CounterColumn removeOldShards(int gcBefore)
-    {
-        ByteBuffer bb = contextManager.removeOldShards(value(), gcBefore);
-        if (bb == value())
-            return this;
-        else
-        {
-            return new CounterColumn(name(), bb, timestamp(), timestampOfLastDelete);
-        }
-    }
-
-    public static void mergeAndRemoveOldShards(DecoratedKey key, ColumnFamily cf, int gcBefore, int mergeBefore)
-    {
-        mergeAndRemoveOldShards(key, cf, gcBefore, mergeBefore, true);
-    }
-
-    /**
-     * There is two phase to the removal of old shards.
-     * First phase: we merge the old shard value to the current shard and
-     * 'nulify' the old one. We then send the counter context with the old
-     * shard nulified to all other replica.
-     * Second phase: once an old shard has been nulified for longer than
-     * gc_grace (to be sure all other replica had been aware of the merge), we
-     * simply remove that old shard from the context (it's value is 0).
-     * This method does both phases.
-     * (Note that the sendToOtherReplica flag is here only to facilitate
-     * testing. It should be true in real code so use the method above
-     * preferably)
-     */
-    public static void mergeAndRemoveOldShards(DecoratedKey key, ColumnFamily cf, int gcBefore, int mergeBefore, boolean sendToOtherReplica)
-    {
-        ColumnFamily remoteMerger = null;
-
-        for (Column c : cf)
-        {
-            if (!(c instanceof CounterColumn))
-                continue;
-            CounterColumn cc = (CounterColumn) c;
-            CounterColumn shardMerger = cc.computeOldShardMerger(mergeBefore);
-            CounterColumn merged = cc;
-            if (shardMerger != null)
-            {
-                merged = (CounterColumn) cc.reconcile(shardMerger);
-                if (remoteMerger == null)
-                    remoteMerger = cf.cloneMeShallow();
-                remoteMerger.addColumn(merged);
-            }
-            CounterColumn cleaned = merged.removeOldShards(gcBefore);
-            if (cleaned != cc)
-            {
-                cf.replace(cc, cleaned);
-            }
-        }
-
-        if (remoteMerger != null && sendToOtherReplica)
-        {
-            try
-            {
-                sendToOtherReplica(key, remoteMerger);
-            }
-            catch (Exception e)
-            {
-                logger.error("Error while sending shard merger mutation to remote endpoints", e);
-            }
-        }
-    }
-
-    public Column markDeltaToBeCleared()
-    {
-        return new CounterColumn(name, contextManager.markDeltaToBeCleared(value), timestamp, timestampOfLastDelete);
-    }
-
-    private static void sendToOtherReplica(DecoratedKey key, ColumnFamily cf) throws RequestExecutionException
-    {
-        RowMutation rm = new RowMutation(cf.metadata().ksName, key.key, cf);
-
-        final InetAddress local = FBUtilities.getBroadcastAddress();
-        String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(local);
-
-        StorageProxy.performWrite(rm, ConsistencyLevel.ANY, localDataCenter, new StorageProxy.WritePerformer()
-        {
-            public void apply(IMutation mutation, Iterable<InetAddress> targets, AbstractWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level)
-            throws OverloadedException
-            {
-                // We should only send to the remote replica, not the local one
-                Set<InetAddress> remotes = Sets.difference(ImmutableSet.copyOf(targets), ImmutableSet.of(local));
-                // Fake local response to be a good lad but we won't wait on the responseHandler
-                responseHandler.response(null);
-                StorageProxy.sendToHintedEndpoints((RowMutation) mutation, remotes, responseHandler, localDataCenter);
-            }
-        }, null, WriteType.SIMPLE);
-
-        // we don't wait for answers
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index eae8e12..f0942e2 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -129,7 +129,7 @@ public class CounterMutation implements IMutation
 
     public void apply()
     {
-        // transform all CounterUpdateColumn to CounterColumn: accomplished by localCopy
+        // transform all CounterUpdateCell to CounterCell: accomplished by localCopy
         RowMutation rm = new RowMutation(rowMutation.getKeyspaceName(), ByteBufferUtil.clone(rowMutation.key()));
         Keyspace keyspace = Keyspace.open(rm.getKeyspaceName());
 
@@ -137,9 +137,9 @@ public class CounterMutation implements IMutation
         {
             ColumnFamily cf = cf_.cloneMeShallow();
             ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cf.id());
-            for (Column column : cf_)
+            for (Cell cell : cf_)
             {
-                cf.addColumn(column.localCopy(cfs), HeapAllocator.instance);
+                cf.addColumn(cell.localCopy(cfs), HeapAllocator.instance);
             }
             rm.add(cf);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/CounterUpdateCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterUpdateCell.java b/src/java/org/apache/cassandra/db/CounterUpdateCell.java
new file mode 100644
index 0000000..8acb854
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/CounterUpdateCell.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.utils.Allocator;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.HeapAllocator;
+
+/**
+ * A counter update while it hasn't been applied yet by the leader replica.
+ *
+ * Contains a single counter update. When applied by the leader replica, this
+ * is transformed to a relevant CounterCell. This Cell is a temporary data
+ * structure that should never be stored inside a memtable or an sstable.
+ */
+public class CounterUpdateCell extends Cell
+{
+    public CounterUpdateCell(CellName name, long value, long timestamp)
+    {
+        this(name, ByteBufferUtil.bytes(value), timestamp);
+    }
+
+    public CounterUpdateCell(CellName name, ByteBuffer value, long timestamp)
+    {
+        super(name, value, timestamp);
+    }
+
+    public long delta()
+    {
+        return value().getLong(value().position());
+    }
+
+    @Override
+    public Cell diff(Cell cell)
+    {
+        // Diff is used during reads, but we should never read those columns
+        throw new UnsupportedOperationException("This operation is unsupported on CounterUpdateCell.");
+    }
+
+    @Override
+    public Cell reconcile(Cell cell, Allocator allocator)
+    {
+        // The only time this could happen is if a batchAdd ships two
+        // increment for the same cell. Hence we simply sums the delta.
+
+        assert (cell instanceof CounterUpdateCell) || (cell instanceof DeletedCell) : "Wrong class type.";
+
+        // tombstones take precedence
+        if (cell.isMarkedForDelete(Long.MIN_VALUE)) // can't be an expired cell, so the current time is irrelevant
+            return timestamp() > cell.timestamp() ? this : cell;
+
+        // neither is tombstoned
+        CounterUpdateCell c = (CounterUpdateCell) cell;
+        return new CounterUpdateCell(name(), delta() + c.delta(), Math.max(timestamp(), c.timestamp()));
+    }
+
+    @Override
+    public int serializationFlags()
+    {
+        return ColumnSerializer.COUNTER_UPDATE_MASK;
+    }
+
+    @Override
+    public CounterCell localCopy(ColumnFamilyStore cfs)
+    {
+        return new CounterCell(name.copy(HeapAllocator.instance),
+                                 CounterContext.instance().create(delta(), HeapAllocator.instance),
+                                 timestamp(),
+                                 Long.MIN_VALUE);
+    }
+
+    @Override
+    public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
+    {
+        return new CounterCell(name.copy(allocator),
+                                 CounterContext.instance().create(delta(), allocator),
+                                 timestamp(),
+                                 Long.MIN_VALUE);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterUpdateColumn.java b/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
deleted file mode 100644
index aaf3307..0000000
--- a/src/java/org/apache/cassandra/db/CounterUpdateColumn.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.context.CounterContext;
-import org.apache.cassandra.utils.Allocator;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.HeapAllocator;
-
-/**
- * A counter update while it hasn't been applied yet by the leader replica.
- *
- * Contains a single counter update. When applied by the leader replica, this
- * is transformed to a relevant CounterColumn. This Column is a temporary data
- * structure that should never be stored inside a memtable or an sstable.
- */
-public class CounterUpdateColumn extends Column
-{
-    public CounterUpdateColumn(CellName name, long value, long timestamp)
-    {
-        this(name, ByteBufferUtil.bytes(value), timestamp);
-    }
-
-    public CounterUpdateColumn(CellName name, ByteBuffer value, long timestamp)
-    {
-        super(name, value, timestamp);
-    }
-
-    public long delta()
-    {
-        return value().getLong(value().position());
-    }
-
-    @Override
-    public Column diff(Column column)
-    {
-        // Diff is used during reads, but we should never read those columns
-        throw new UnsupportedOperationException("This operation is unsupported on CounterUpdateColumn.");
-    }
-
-    @Override
-    public Column reconcile(Column column, Allocator allocator)
-    {
-        // The only time this could happen is if a batchAdd ships two
-        // increment for the same column. Hence we simply sums the delta.
-
-        assert (column instanceof CounterUpdateColumn) || (column instanceof DeletedColumn) : "Wrong class type.";
-
-        // tombstones take precedence
-        if (column.isMarkedForDelete(Long.MIN_VALUE)) // can't be an expired column, so the current time is irrelevant
-            return timestamp() > column.timestamp() ? this : column;
-
-        // neither is tombstoned
-        CounterUpdateColumn c = (CounterUpdateColumn)column;
-        return new CounterUpdateColumn(name(), delta() + c.delta(), Math.max(timestamp(), c.timestamp()));
-    }
-
-    @Override
-    public int serializationFlags()
-    {
-        return ColumnSerializer.COUNTER_UPDATE_MASK;
-    }
-
-    @Override
-    public CounterColumn localCopy(ColumnFamilyStore cfs)
-    {
-        return new CounterColumn(name.copy(HeapAllocator.instance),
-                                 CounterContext.instance().create(delta(), HeapAllocator.instance),
-                                 timestamp(),
-                                 Long.MIN_VALUE);
-    }
-
-    @Override
-    public Column localCopy(ColumnFamilyStore cfs, Allocator allocator)
-    {
-        return new CounterColumn(name.copy(allocator),
-                                 CounterContext.instance().create(delta(), allocator),
-                                 timestamp(),
-                                 Long.MIN_VALUE);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/DefsTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTables.java b/src/java/org/apache/cassandra/db/DefsTables.java
index f30e256..693ef97 100644
--- a/src/java/org/apache/cassandra/db/DefsTables.java
+++ b/src/java/org/apache/cassandra/db/DefsTables.java
@@ -74,7 +74,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
  *
  * Where <key> is a name of keyspace e.g. "ks".
  *
- * Column names where made composite to support 3-level nesting which represents following structure:
+ * Cell names where made composite to support 3-level nesting which represents following structure:
  * "ColumnFamily name":"column name":"column attribute" => "value"
  *
  * Example of schema (using CLI):

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/DeletedCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletedCell.java b/src/java/org/apache/cassandra/db/DeletedCell.java
new file mode 100644
index 0000000..5b89e1d
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/DeletedCell.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.Allocator;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.HeapAllocator;
+
+public class DeletedCell extends Cell
+{
+    public DeletedCell(CellName name, int localDeletionTime, long timestamp)
+    {
+        this(name, ByteBufferUtil.bytes(localDeletionTime), timestamp);
+    }
+
+    public DeletedCell(CellName name, ByteBuffer value, long timestamp)
+    {
+        super(name, value, timestamp);
+    }
+
+    @Override
+    public Cell withUpdatedName(CellName newName)
+    {
+        return new DeletedCell(newName, value, timestamp);
+    }
+
+    @Override
+    public Cell withUpdatedTimestamp(long newTimestamp)
+    {
+        return new DeletedCell(name, value, newTimestamp);
+    }
+
+    @Override
+    public boolean isMarkedForDelete(long now)
+    {
+        return true;
+    }
+
+    @Override
+    public long getMarkedForDeleteAt()
+    {
+        return timestamp;
+    }
+
+    @Override
+    public void updateDigest(MessageDigest digest)
+    {
+        digest.update(name.toByteBuffer().duplicate());
+
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        try
+        {
+            buffer.writeLong(timestamp);
+            buffer.writeByte(serializationFlags());
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+        digest.update(buffer.getData(), 0, buffer.getLength());
+    }
+
+    @Override
+    public int getLocalDeletionTime()
+    {
+       return value.getInt(value.position());
+    }
+
+    @Override
+    public Cell reconcile(Cell cell, Allocator allocator)
+    {
+        if (cell instanceof DeletedCell)
+            return super.reconcile(cell, allocator);
+        return cell.reconcile(this, allocator);
+    }
+
+    @Override
+    public Cell localCopy(ColumnFamilyStore cfs)
+    {
+        return new DeletedCell(name.copy(HeapAllocator.instance), ByteBufferUtil.clone(value), timestamp);
+    }
+
+    @Override
+    public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
+    {
+        return new DeletedCell(name.copy(allocator), allocator.clone(value), timestamp);
+    }
+
+    @Override
+    public int serializationFlags()
+    {
+        return ColumnSerializer.DELETION_MASK;
+    }
+
+    @Override
+    public void validateFields(CFMetaData metadata) throws MarshalException
+    {
+        validateName(metadata);
+        if (value().remaining() != 4)
+            throw new MarshalException("A tombstone value should be 4 bytes long");
+        if (getLocalDeletionTime() < 0)
+            throw new MarshalException("The local deletion time should not be negative");
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/DeletedColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletedColumn.java b/src/java/org/apache/cassandra/db/DeletedColumn.java
deleted file mode 100644
index ec88015..0000000
--- a/src/java/org/apache/cassandra/db/DeletedColumn.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.Allocator;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.HeapAllocator;
-
-public class DeletedColumn extends Column
-{
-    public DeletedColumn(CellName name, int localDeletionTime, long timestamp)
-    {
-        this(name, ByteBufferUtil.bytes(localDeletionTime), timestamp);
-    }
-
-    public DeletedColumn(CellName name, ByteBuffer value, long timestamp)
-    {
-        super(name, value, timestamp);
-    }
-
-    @Override
-    public Column withUpdatedName(CellName newName)
-    {
-        return new DeletedColumn(newName, value, timestamp);
-    }
-
-    @Override
-    public Column withUpdatedTimestamp(long newTimestamp)
-    {
-        return new DeletedColumn(name, value, newTimestamp);
-    }
-
-    @Override
-    public boolean isMarkedForDelete(long now)
-    {
-        return true;
-    }
-
-    @Override
-    public long getMarkedForDeleteAt()
-    {
-        return timestamp;
-    }
-
-    @Override
-    public void updateDigest(MessageDigest digest)
-    {
-        digest.update(name.toByteBuffer().duplicate());
-
-        DataOutputBuffer buffer = new DataOutputBuffer();
-        try
-        {
-            buffer.writeLong(timestamp);
-            buffer.writeByte(serializationFlags());
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-        digest.update(buffer.getData(), 0, buffer.getLength());
-    }
-
-    @Override
-    public int getLocalDeletionTime()
-    {
-       return value.getInt(value.position());
-    }
-
-    @Override
-    public Column reconcile(Column column, Allocator allocator)
-    {
-        if (column instanceof DeletedColumn)
-            return super.reconcile(column, allocator);
-        return column.reconcile(this, allocator);
-    }
-
-    @Override
-    public Column localCopy(ColumnFamilyStore cfs)
-    {
-        return new DeletedColumn(name.copy(HeapAllocator.instance), ByteBufferUtil.clone(value), timestamp);
-    }
-
-    @Override
-    public Column localCopy(ColumnFamilyStore cfs, Allocator allocator)
-    {
-        return new DeletedColumn(name.copy(allocator), allocator.clone(value), timestamp);
-    }
-
-    @Override
-    public int serializationFlags()
-    {
-        return ColumnSerializer.DELETION_MASK;
-    }
-
-    @Override
-    public void validateFields(CFMetaData metadata) throws MarshalException
-    {
-        validateName(metadata);
-        if (value().remaining() != 4)
-            throw new MarshalException("A tombstone value should be 4 bytes long");
-        if (getLocalDeletionTime() < 0)
-            throw new MarshalException("The local deletion time should not be negative");
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java
index 0bd0635..5c62132 100644
--- a/src/java/org/apache/cassandra/db/DeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/DeletionInfo.java
@@ -106,14 +106,14 @@ public class DeletionInfo
     }
 
     /**
-     * Return whether a given column is deleted by the container having this deletion info.
+     * Return whether a given cell is deleted by the container having this deletion info.
      *
-     * @param column the column to check.
-     * @return true if the column is deleted, false otherwise
+     * @param cell the cell to check.
+     * @return true if the cell is deleted, false otherwise
      */
-    public boolean isDeleted(Column column)
+    public boolean isDeleted(Cell cell)
     {
-        return isDeleted(column.name(), column.timestamp());
+        return isDeleted(cell.name(), cell.timestamp());
     }
 
     public boolean isDeleted(Composite name, long timestamp)
@@ -375,9 +375,9 @@ public class DeletionInfo
             this.reversed = reversed;
         }
 
-        public boolean isDeleted(Column column)
+        public boolean isDeleted(Cell cell)
         {
-            return isDeleted(column.name(), column.timestamp());
+            return isDeleted(cell.name(), cell.timestamp());
         }
 
         public boolean isDeleted(Composite name, long timestamp)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/EmptyColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/EmptyColumns.java b/src/java/org/apache/cassandra/db/EmptyColumns.java
index 782ffc9..5498353 100644
--- a/src/java/org/apache/cassandra/db/EmptyColumns.java
+++ b/src/java/org/apache/cassandra/db/EmptyColumns.java
@@ -63,22 +63,22 @@ public class EmptyColumns extends AbstractThreadUnsafeSortedColumns
         return factory;
     }
 
-    public void addColumn(Column column, Allocator allocator)
+    public void addColumn(Cell cell, Allocator allocator)
     {
         throw new UnsupportedOperationException();
     }
 
-    public void addAll(ColumnFamily cm, Allocator allocator, Function<Column, Column> transformation)
+    public void addAll(ColumnFamily cm, Allocator allocator, Function<Cell, Cell> transformation)
     {
         throw new UnsupportedOperationException();
     }
 
-    public boolean replace(Column oldColumn, Column newColumn)
+    public boolean replace(Cell oldCell, Cell newCell)
     {
         throw new UnsupportedOperationException();
     }
 
-    public Column getColumn(CellName name)
+    public Cell getColumn(CellName name)
     {
         throw new UnsupportedOperationException();
     }
@@ -88,12 +88,12 @@ public class EmptyColumns extends AbstractThreadUnsafeSortedColumns
         return Collections.emptyList();
     }
 
-    public Collection<Column> getSortedColumns()
+    public Collection<Cell> getSortedColumns()
     {
         return Collections.emptyList();
     }
 
-    public Collection<Column> getReverseSortedColumns()
+    public Collection<Cell> getReverseSortedColumns()
     {
         return Collections.emptyList();
     }
@@ -103,12 +103,12 @@ public class EmptyColumns extends AbstractThreadUnsafeSortedColumns
         return 0;
     }
 
-    public Iterator<Column> iterator(ColumnSlice[] slices)
+    public Iterator<Cell> iterator(ColumnSlice[] slices)
     {
         return Iterators.emptyIterator();
     }
 
-    public Iterator<Column> reverseIterator(ColumnSlice[] slices)
+    public Iterator<Cell> reverseIterator(ColumnSlice[] slices)
     {
         return Iterators.emptyIterator();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e50d6af1/src/java/org/apache/cassandra/db/ExpiringCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ExpiringCell.java b/src/java/org/apache/cassandra/db/ExpiringCell.java
new file mode 100644
index 0000000..2b9541c
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/ExpiringCell.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.Allocator;
+import org.apache.cassandra.utils.HeapAllocator;
+
+/**
+ * Alternative to Cell that have an expiring time.
+ * ExpiringCell is immutable (as Cell is).
+ *
+ * Note that ExpiringCell does not override Cell.getMarkedForDeleteAt,
+ * which means that it's in the somewhat unintuitive position of being deleted (after its expiration)
+ * without having a time-at-which-it-became-deleted.  (Because ttl is a server-side measurement,
+ * we can't mix it with the timestamp field, which is client-supplied and whose resolution we
+ * can't assume anything about.)
+ */
+public class ExpiringCell extends Cell
+{
+    public static final int MAX_TTL = 20 * 365 * 24 * 60 * 60; // 20 years in seconds
+
+    private final int localExpirationTime;
+    private final int timeToLive;
+
+    public ExpiringCell(CellName name, ByteBuffer value, long timestamp, int timeToLive)
+    {
+      this(name, value, timestamp, timeToLive, (int) (System.currentTimeMillis() / 1000) + timeToLive);
+    }
+
+    public ExpiringCell(CellName name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime)
+    {
+        super(name, value, timestamp);
+        assert timeToLive > 0 : timeToLive;
+        assert localExpirationTime > 0 : localExpirationTime;
+        this.timeToLive = timeToLive;
+        this.localExpirationTime = localExpirationTime;
+    }
+
+    /** @return Either a DeletedCell, or an ExpiringCell. */
+    public static Cell create(CellName name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime, int expireBefore, ColumnSerializer.Flag flag)
+    {
+        if (localExpirationTime >= expireBefore || flag == ColumnSerializer.Flag.PRESERVE_SIZE)
+            return new ExpiringCell(name, value, timestamp, timeToLive, localExpirationTime);
+        // The column is now expired, we can safely return a simple tombstone. Note that
+        // as long as the expiring column and the tombstone put together live longer than GC grace seconds,
+        // we'll fulfil our responsibility to repair.  See discussion at
+        // http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/repair-compaction-and-tombstone-rows-td7583481.html
+        return new DeletedCell(name, localExpirationTime - timeToLive, timestamp);
+    }
+
+    public int getTimeToLive()
+    {
+        return timeToLive;
+    }
+
+    @Override
+    public Cell withUpdatedName(CellName newName)
+    {
+        return new ExpiringCell(newName, value, timestamp, timeToLive, localExpirationTime);
+    }
+
+    @Override
+    public Cell withUpdatedTimestamp(long newTimestamp)
+    {
+        return new ExpiringCell(name, value, newTimestamp, timeToLive, localExpirationTime);
+    }
+
+    @Override
+    public int dataSize()
+    {
+        return super.dataSize() + TypeSizes.NATIVE.sizeof(localExpirationTime) + TypeSizes.NATIVE.sizeof(timeToLive);
+    }
+
+    @Override
+    public int serializedSize(CellNameType type, TypeSizes typeSizes)
+    {
+        /*
+         * An expired column adds to a Cell :
+         *    4 bytes for the localExpirationTime
+         *  + 4 bytes for the timeToLive
+        */
+        return super.serializedSize(type, typeSizes) + typeSizes.sizeof(localExpirationTime) + typeSizes.sizeof(timeToLive);
+    }
+
+    @Override
+    public void updateDigest(MessageDigest digest)
+    {
+        digest.update(name.toByteBuffer().duplicate());
+        digest.update(value.duplicate());
+
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        try
+        {
+            buffer.writeLong(timestamp);
+            buffer.writeByte(serializationFlags());
+            buffer.writeInt(timeToLive);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+        digest.update(buffer.getData(), 0, buffer.getLength());
+    }
+
+    @Override
+    public int getLocalDeletionTime()
+    {
+        return localExpirationTime;
+    }
+
+    @Override
+    public Cell localCopy(ColumnFamilyStore cfs)
+    {
+        return localCopy(cfs, HeapAllocator.instance);
+    }
+
+    @Override
+    public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
+    {
+        return new ExpiringCell(name.copy(allocator), allocator.clone(value), timestamp, timeToLive, localExpirationTime);
+    }
+
+    @Override
+    public String getString(CellNameType comparator)
+    {
+        StringBuilder sb = new StringBuilder();
+        sb.append(super.getString(comparator));
+        sb.append("!");
+        sb.append(timeToLive);
+        return sb.toString();
+    }
+
+    @Override
+    public boolean isMarkedForDelete(long now)
+    {
+        return (int) (now / 1000) >= getLocalDeletionTime();
+    }
+
+    @Override
+    public long getMarkedForDeleteAt()
+    {
+        return timestamp;
+    }
+
+    @Override
+    public int serializationFlags()
+    {
+        return ColumnSerializer.EXPIRATION_MASK;
+    }
+
+    @Override
+    public void validateFields(CFMetaData metadata) throws MarshalException
+    {
+        super.validateFields(metadata);
+        if (timeToLive <= 0)
+            throw new MarshalException("A column TTL should be > 0");
+        if (localExpirationTime < 0)
+            throw new MarshalException("The local expiration time should not be negative");
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        // super.equals() returns false if o is not a CounterCell
+        return super.equals(o)
+            && localExpirationTime == ((ExpiringCell)o).localExpirationTime
+            && timeToLive == ((ExpiringCell)o).timeToLive;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        int result = super.hashCode();
+        result = 31 * result + localExpirationTime;
+        result = 31 * result + timeToLive;
+        return result;
+    }
+}