You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jo...@apache.org on 2010/06/01 17:03:49 UTC

svn commit: r950099 - in /cassandra/trunk: conf/ interface/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/avro/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra...

Author: johan
Date: Tue Jun  1 15:03:48 2010
New Revision: 950099

URL: http://svn.apache.org/viewvc?rev=950099&view=rev
Log:
Add support for reconciler classes that decide how to resolve conflicting columns. Patch by Kelvin Kakugawa and johan, review by jbellis. CASSANDRA-1144

Added:
    cassandra/trunk/src/java/org/apache/cassandra/db/clock/
    cassandra/trunk/src/java/org/apache/cassandra/db/clock/AbstractReconciler.java
    cassandra/trunk/src/java/org/apache/cassandra/db/clock/TimestampReconciler.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/clock/
    cassandra/trunk/test/unit/org/apache/cassandra/db/clock/TimestampReconcilerTest.java
Modified:
    cassandra/trunk/conf/cassandra.yaml
    cassandra/trunk/interface/cassandra.avpr
    cassandra/trunk/interface/cassandra.thrift
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
    cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
    cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
    cassandra/trunk/src/java/org/apache/cassandra/config/ColumnFamily.java
    cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
    cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
    cassandra/trunk/test/conf/cassandra.yaml
    cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/SuperColumnTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java

Modified: cassandra/trunk/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Tue Jun  1 15:03:48 2010
@@ -210,6 +210,8 @@ keyspaces:
 
         - name: StandardByUUID1
           compare_with: TimeUUIDType
+          clock_type: Timestamp
+          reconciler: TimestampReconciler          
 
         - name: Super1
           column_type: Super

Modified: cassandra/trunk/interface/cassandra.avpr
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.avpr?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.avpr (original)
+++ cassandra/trunk/interface/cassandra.avpr Tue Jun  1 15:03:48 2010
@@ -84,6 +84,7 @@
             {"name": "clock_type", "type": ["string", "null"]},
             {"name": "comparator_type", "type": ["string", "null"]},
             {"name": "subcomparator_type", "type": ["string", "null"]},
+            {"name": "reconciler", "type": ["string", "null"]},
             {"name": "comment", "type": ["string", "null"]},
             {"name": "row_cache_size", "type": ["double", "null"]},
             {"name": "preload_row_cache", "type": ["boolean", "null"]},

Modified: cassandra/trunk/interface/cassandra.thrift
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.thrift?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.thrift (original)
+++ cassandra/trunk/interface/cassandra.thrift Tue Jun  1 15:03:48 2010
@@ -322,10 +322,11 @@ struct CfDef {
     4: optional string clock_type="Timestamp",
     5: optional string comparator_type="BytesType",
     6: optional string subcomparator_type="",
-    7: optional string comment="",
-    8: optional double row_cache_size=0,
-    9: optional bool preload_row_cache=0,
-    10: optional double key_cache_size=200000,
+    7: optional string reconciler="",
+    8: optional string comment="",
+    9: optional double row_cache_size=0,
+    10: optional bool preload_row_cache=0,
+    11: optional double key_cache_size=200000
 }
 
 /* describes a keyspace. */

Modified: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java (original)
+++ cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java Tue Jun  1 15:03:48 2010
@@ -53,10 +53,11 @@ public class CfDef implements TBase<CfDe
   private static final TField CLOCK_TYPE_FIELD_DESC = new TField("clock_type", TType.STRING, (short)4);
   private static final TField COMPARATOR_TYPE_FIELD_DESC = new TField("comparator_type", TType.STRING, (short)5);
   private static final TField SUBCOMPARATOR_TYPE_FIELD_DESC = new TField("subcomparator_type", TType.STRING, (short)6);
-  private static final TField COMMENT_FIELD_DESC = new TField("comment", TType.STRING, (short)7);
-  private static final TField ROW_CACHE_SIZE_FIELD_DESC = new TField("row_cache_size", TType.DOUBLE, (short)8);
-  private static final TField PRELOAD_ROW_CACHE_FIELD_DESC = new TField("preload_row_cache", TType.BOOL, (short)9);
-  private static final TField KEY_CACHE_SIZE_FIELD_DESC = new TField("key_cache_size", TType.DOUBLE, (short)10);
+  private static final TField RECONCILER_FIELD_DESC = new TField("reconciler", TType.STRING, (short)7);
+  private static final TField COMMENT_FIELD_DESC = new TField("comment", TType.STRING, (short)8);
+  private static final TField ROW_CACHE_SIZE_FIELD_DESC = new TField("row_cache_size", TType.DOUBLE, (short)9);
+  private static final TField PRELOAD_ROW_CACHE_FIELD_DESC = new TField("preload_row_cache", TType.BOOL, (short)10);
+  private static final TField KEY_CACHE_SIZE_FIELD_DESC = new TField("key_cache_size", TType.DOUBLE, (short)11);
 
   public String table;
   public String name;
@@ -64,6 +65,7 @@ public class CfDef implements TBase<CfDe
   public String clock_type;
   public String comparator_type;
   public String subcomparator_type;
+  public String reconciler;
   public String comment;
   public double row_cache_size;
   public boolean preload_row_cache;
@@ -77,10 +79,11 @@ public class CfDef implements TBase<CfDe
     CLOCK_TYPE((short)4, "clock_type"),
     COMPARATOR_TYPE((short)5, "comparator_type"),
     SUBCOMPARATOR_TYPE((short)6, "subcomparator_type"),
-    COMMENT((short)7, "comment"),
-    ROW_CACHE_SIZE((short)8, "row_cache_size"),
-    PRELOAD_ROW_CACHE((short)9, "preload_row_cache"),
-    KEY_CACHE_SIZE((short)10, "key_cache_size");
+    RECONCILER((short)7, "reconciler"),
+    COMMENT((short)8, "comment"),
+    ROW_CACHE_SIZE((short)9, "row_cache_size"),
+    PRELOAD_ROW_CACHE((short)10, "preload_row_cache"),
+    KEY_CACHE_SIZE((short)11, "key_cache_size");
 
     private static final Map<Integer, _Fields> byId = new HashMap<Integer, _Fields>();
     private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -152,6 +155,8 @@ public class CfDef implements TBase<CfDe
         new FieldValueMetaData(TType.STRING)));
     put(_Fields.SUBCOMPARATOR_TYPE, new FieldMetaData("subcomparator_type", TFieldRequirementType.OPTIONAL, 
         new FieldValueMetaData(TType.STRING)));
+    put(_Fields.RECONCILER, new FieldMetaData("reconciler", TFieldRequirementType.OPTIONAL, 
+        new FieldValueMetaData(TType.STRING)));
     put(_Fields.COMMENT, new FieldMetaData("comment", TFieldRequirementType.OPTIONAL, 
         new FieldValueMetaData(TType.STRING)));
     put(_Fields.ROW_CACHE_SIZE, new FieldMetaData("row_cache_size", TFieldRequirementType.OPTIONAL, 
@@ -175,6 +180,8 @@ public class CfDef implements TBase<CfDe
 
     this.subcomparator_type = "";
 
+    this.reconciler = "";
+
     this.comment = "";
 
     this.row_cache_size = (double)0;
@@ -218,6 +225,9 @@ public class CfDef implements TBase<CfDe
     if (other.isSetSubcomparator_type()) {
       this.subcomparator_type = other.subcomparator_type;
     }
+    if (other.isSetReconciler()) {
+      this.reconciler = other.reconciler;
+    }
     if (other.isSetComment()) {
       this.comment = other.comment;
     }
@@ -379,6 +389,30 @@ public class CfDef implements TBase<CfDe
     }
   }
 
+  public String getReconciler() {
+    return this.reconciler;
+  }
+
+  public CfDef setReconciler(String reconciler) {
+    this.reconciler = reconciler;
+    return this;
+  }
+
+  public void unsetReconciler() {
+    this.reconciler = null;
+  }
+
+  /** Returns true if field reconciler is set (has been asigned a value) and false otherwise */
+  public boolean isSetReconciler() {
+    return this.reconciler != null;
+  }
+
+  public void setReconcilerIsSet(boolean value) {
+    if (!value) {
+      this.reconciler = null;
+    }
+  }
+
   public String getComment() {
     return this.comment;
   }
@@ -522,6 +556,14 @@ public class CfDef implements TBase<CfDe
       }
       break;
 
+    case RECONCILER:
+      if (value == null) {
+        unsetReconciler();
+      } else {
+        setReconciler((String)value);
+      }
+      break;
+
     case COMMENT:
       if (value == null) {
         unsetComment();
@@ -581,6 +623,9 @@ public class CfDef implements TBase<CfDe
     case SUBCOMPARATOR_TYPE:
       return getSubcomparator_type();
 
+    case RECONCILER:
+      return getReconciler();
+
     case COMMENT:
       return getComment();
 
@@ -616,6 +661,8 @@ public class CfDef implements TBase<CfDe
       return isSetComparator_type();
     case SUBCOMPARATOR_TYPE:
       return isSetSubcomparator_type();
+    case RECONCILER:
+      return isSetReconciler();
     case COMMENT:
       return isSetComment();
     case ROW_CACHE_SIZE:
@@ -699,6 +746,15 @@ public class CfDef implements TBase<CfDe
         return false;
     }
 
+    boolean this_present_reconciler = true && this.isSetReconciler();
+    boolean that_present_reconciler = true && that.isSetReconciler();
+    if (this_present_reconciler || that_present_reconciler) {
+      if (!(this_present_reconciler && that_present_reconciler))
+        return false;
+      if (!this.reconciler.equals(that.reconciler))
+        return false;
+    }
+
     boolean this_present_comment = true && this.isSetComment();
     boolean that_present_comment = true && that.isSetComment();
     if (this_present_comment || that_present_comment) {
@@ -805,6 +861,15 @@ public class CfDef implements TBase<CfDe
         return lastComparison;
       }
     }
+    lastComparison = Boolean.valueOf(isSetReconciler()).compareTo(typedOther.isSetReconciler());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetReconciler()) {      lastComparison = TBaseHelper.compareTo(reconciler, typedOther.reconciler);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     lastComparison = Boolean.valueOf(isSetComment()).compareTo(typedOther.isSetComment());
     if (lastComparison != 0) {
       return lastComparison;
@@ -896,14 +961,21 @@ public class CfDef implements TBase<CfDe
             TProtocolUtil.skip(iprot, field.type);
           }
           break;
-        case 7: // COMMENT
+        case 7: // RECONCILER
+          if (field.type == TType.STRING) {
+            this.reconciler = iprot.readString();
+          } else { 
+            TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        case 8: // COMMENT
           if (field.type == TType.STRING) {
             this.comment = iprot.readString();
           } else { 
             TProtocolUtil.skip(iprot, field.type);
           }
           break;
-        case 8: // ROW_CACHE_SIZE
+        case 9: // ROW_CACHE_SIZE
           if (field.type == TType.DOUBLE) {
             this.row_cache_size = iprot.readDouble();
             setRow_cache_sizeIsSet(true);
@@ -911,7 +983,7 @@ public class CfDef implements TBase<CfDe
             TProtocolUtil.skip(iprot, field.type);
           }
           break;
-        case 9: // PRELOAD_ROW_CACHE
+        case 10: // PRELOAD_ROW_CACHE
           if (field.type == TType.BOOL) {
             this.preload_row_cache = iprot.readBool();
             setPreload_row_cacheIsSet(true);
@@ -919,7 +991,7 @@ public class CfDef implements TBase<CfDe
             TProtocolUtil.skip(iprot, field.type);
           }
           break;
-        case 10: // KEY_CACHE_SIZE
+        case 11: // KEY_CACHE_SIZE
           if (field.type == TType.DOUBLE) {
             this.key_cache_size = iprot.readDouble();
             setKey_cache_sizeIsSet(true);
@@ -980,6 +1052,13 @@ public class CfDef implements TBase<CfDe
         oprot.writeFieldEnd();
       }
     }
+    if (this.reconciler != null) {
+      if (isSetReconciler()) {
+        oprot.writeFieldBegin(RECONCILER_FIELD_DESC);
+        oprot.writeString(this.reconciler);
+        oprot.writeFieldEnd();
+      }
+    }
     if (this.comment != null) {
       if (isSetComment()) {
         oprot.writeFieldBegin(COMMENT_FIELD_DESC);
@@ -1066,6 +1145,16 @@ public class CfDef implements TBase<CfDe
       }
       first = false;
     }
+    if (isSetReconciler()) {
+      if (!first) sb.append(", ");
+      sb.append("reconciler:");
+      if (this.reconciler == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.reconciler);
+      }
+      first = false;
+    }
     if (isSetComment()) {
       if (!first) sb.append(", ");
       sb.append("comment:");

Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Tue Jun  1 15:03:48 2010
@@ -43,6 +43,8 @@ import org.apache.cassandra.config.Confi
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.clock.AbstractReconciler;
+import org.apache.cassandra.db.clock.TimestampReconciler;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.db.migration.AddKeyspace;
@@ -512,19 +514,29 @@ public class CassandraServer implements 
             Collection<CFMetaData> cfDefs = new ArrayList<CFMetaData>((int)ksDef.cf_defs.size());
             for (CfDef cfDef : ksDef.cf_defs)
             {
-                String cfType, clockType, compare, subCompare;
+                String cfType, compare, subCompare;
                 cfType = cfDef.column_type == null ? D_CF_CFTYPE : cfDef.column_type.toString();
-                clockType = cfDef.clock_type == null ? D_CF_CFCLOCKTYPE : cfDef.clock_type.toString();
+                ClockType clockType = ClockType.create(cfDef.clock_type == null ? D_CF_CFCLOCKTYPE : cfDef.clock_type.toString());
                 compare = cfDef.comparator_type == null ? D_CF_COMPTYPE : cfDef.comparator_type.toString();
                 subCompare = cfDef.subcomparator_type == null ? D_CF_SUBCOMPTYPE : cfDef.subcomparator_type.toString();
+                AbstractReconciler reconciler = DatabaseDescriptor.getReconciler(cfDef.reconciler.toString());
+                if (reconciler == null)
+                {
+                    if (clockType == ClockType.Timestamp)    
+                        reconciler = new TimestampReconciler(); // default
+                    else
+                        throw new ConfigurationException("No reconciler specified for column family " + cfDef.name.toString());
+
+                }
                 
                 CFMetaData cfmeta = new CFMetaData(
                         cfDef.keyspace.toString(),
                         cfDef.name.toString(),
                         ColumnFamilyType.create(cfType),
-                        ClockType.create(clockType),
+                        clockType,
                         DatabaseDescriptor.getComparator(compare),
                         subCompare.length() == 0 ? null : DatabaseDescriptor.getComparator(subCompare),
+                        reconciler,
                         cfDef.comment == null ? D_CF_COMMENT : cfDef.comment.toString(), 
                         cfDef.row_cache_size == null ? D_CF_ROWCACHE : cfDef.row_cache_size,
                         cfDef.preload_row_cache == null ? D_CF_PRELOAD_ROWCACHE : cfDef.preload_row_cache,

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Tue Jun  1 15:03:48 2010
@@ -36,6 +36,8 @@ import org.apache.commons.lang.builder.H
 
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.db.ClockType;
+import org.apache.cassandra.db.clock.AbstractReconciler;
+import org.apache.cassandra.db.clock.TimestampReconciler;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.utils.Pair;
 
@@ -55,10 +57,10 @@ public final class CFMetaData
     
     private static final BiMap<Pair<String, String>, Integer> cfIdMap = HashBiMap.<Pair<String, String>, Integer>create();
     
-    public static final CFMetaData StatusCf = new CFMetaData(Table.SYSTEM_TABLE, SystemTable.STATUS_CF, ColumnFamilyType.Standard, ClockType.Timestamp, UTF8Type.instance, null, "persistent metadata for the local node", 0, false, 0.01, 0);
-    public static final CFMetaData HintsCf = new CFMetaData(Table.SYSTEM_TABLE, HintedHandOffManager.HINTS_CF, ColumnFamilyType.Super, ClockType.Timestamp, UTF8Type.instance, BytesType.instance, "hinted handoff data", 0, false, 0.01, 1);
-    public static final CFMetaData MigrationsCf = new CFMetaData(Table.SYSTEM_TABLE, Migration.MIGRATIONS_CF, ColumnFamilyType.Standard, ClockType.Timestamp, TimeUUIDType.instance, null, "individual schema mutations", 0, false, 2);
-    public static final CFMetaData SchemaCf = new CFMetaData(Table.SYSTEM_TABLE, Migration.SCHEMA_CF, ColumnFamilyType.Standard, ClockType.Timestamp, UTF8Type.instance, null, "current state of the schema", 0, false, 3);
+    public static final CFMetaData StatusCf = new CFMetaData(Table.SYSTEM_TABLE, SystemTable.STATUS_CF, ColumnFamilyType.Standard, ClockType.Timestamp, UTF8Type.instance, null, new TimestampReconciler(), "persistent metadata for the local node", 0, false, 0.01, 0);
+    public static final CFMetaData HintsCf = new CFMetaData(Table.SYSTEM_TABLE, HintedHandOffManager.HINTS_CF, ColumnFamilyType.Super, ClockType.Timestamp, UTF8Type.instance, BytesType.instance, new TimestampReconciler(), "hinted handoff data", 0, false, 0.01, 1);
+    public static final CFMetaData MigrationsCf = new CFMetaData(Table.SYSTEM_TABLE, Migration.MIGRATIONS_CF, ColumnFamilyType.Standard, ClockType.Timestamp, TimeUUIDType.instance, null, new TimestampReconciler(), "individual schema mutations", 0, false, 2);
+    public static final CFMetaData SchemaCf = new CFMetaData(Table.SYSTEM_TABLE, Migration.SCHEMA_CF, ColumnFamilyType.Standard, ClockType.Timestamp, UTF8Type.instance, null, new TimestampReconciler(), "current state of the schema", 0, false, 3);
 
     /**
      * @return An immutable mapping of (ksname,cfname) to id.
@@ -105,6 +107,7 @@ public final class CFMetaData
     public final ClockType clockType;         // clock type: timestamp, etc.
     public final AbstractType comparator;       // name sorted, time stamp sorted etc.
     public final AbstractType subcolumnComparator; // like comparator, for supercolumns
+    public final AbstractReconciler reconciler; // determine correct column from conflicting versions
     public final String comment; // for humans only
     public final double rowCacheSize; // default 0
     public final double keyCacheSize; // default 0.01
@@ -113,7 +116,7 @@ public final class CFMetaData
     public boolean preloadRowCache;
 
 
-    private CFMetaData(String tableName, String cfName, ColumnFamilyType cfType, ClockType clockType, AbstractType comparator, AbstractType subcolumnComparator, String comment, double rowCacheSize, boolean preloadRowCache, double keyCacheSize, double readRepairChance, int cfId)
+    private CFMetaData(String tableName, String cfName, ColumnFamilyType cfType, ClockType clockType, AbstractType comparator, AbstractType subcolumnComparator, AbstractReconciler reconciler, String comment, double rowCacheSize, boolean preloadRowCache, double keyCacheSize, double readRepairChance, int cfId)
     {
         this.tableName = tableName;
         this.cfName = cfName;
@@ -123,6 +126,7 @@ public final class CFMetaData
         // the default subcolumncomparator is null per thrift spec, but only should be null if cfType == Standard. If
         // cfType == Super, subcolumnComparator should default to BytesType if not set.
         this.subcolumnComparator = subcolumnComparator == null && cfType == ColumnFamilyType.Super ? BytesType.instance : subcolumnComparator;
+        this.reconciler = reconciler;
         this.comment = comment;
         this.rowCacheSize = rowCacheSize;
         this.preloadRowCache = preloadRowCache;
@@ -144,27 +148,27 @@ public final class CFMetaData
         }
     }
     
-    public CFMetaData(String tableName, String cfName, ColumnFamilyType cfType, ClockType clockType, AbstractType comparator, AbstractType subcolumnComparator, String comment, double rowCacheSize, boolean preloadRowCache, double keyCacheSize)
+    public CFMetaData(String tableName, String cfName, ColumnFamilyType cfType, ClockType clockType, AbstractType comparator, AbstractType subcolumnComparator, AbstractReconciler reconciler, String comment, double rowCacheSize, boolean preloadRowCache, double keyCacheSize)
     {
-        this(tableName, cfName, cfType, clockType, comparator, subcolumnComparator, comment, rowCacheSize, preloadRowCache, keyCacheSize, DEFAULT_READ_REPAIR_CHANCE, nextId());
+        this(tableName, cfName, cfType, clockType, comparator, subcolumnComparator, reconciler, comment, rowCacheSize, preloadRowCache, keyCacheSize, DEFAULT_READ_REPAIR_CHANCE, nextId());
     }
 
-    public CFMetaData(String tableName, String cfName, ColumnFamilyType cfType, ClockType clockType, AbstractType comparator, AbstractType subcolumnComparator, String comment, double rowCacheSize, boolean preloadRowCache, double keyCacheSize, double readRepairChance)
+    public CFMetaData(String tableName, String cfName, ColumnFamilyType cfType, ClockType clockType, AbstractType comparator, AbstractType subcolumnComparator, AbstractReconciler reconciler, String comment, double rowCacheSize, boolean preloadRowCache, double keyCacheSize, double readRepairChance)
     {
-        this(tableName, cfName, cfType, clockType, comparator, subcolumnComparator, comment, rowCacheSize, preloadRowCache, keyCacheSize, readRepairChance, nextId());
+        this(tableName, cfName, cfType, clockType, comparator, subcolumnComparator, reconciler, comment, rowCacheSize, preloadRowCache, keyCacheSize, readRepairChance, nextId());
     }
 
     /** clones an existing CFMetaData using the same id. */
     public static CFMetaData rename(CFMetaData cfm, String newName)
     {
-        CFMetaData newCfm = new CFMetaData(cfm.tableName, newName, cfm.cfType, cfm.clockType, cfm.comparator, cfm.subcolumnComparator, cfm.comment, cfm.rowCacheSize, cfm.preloadRowCache, cfm.keyCacheSize, cfm.readRepairChance, cfm.cfId);
+        CFMetaData newCfm = new CFMetaData(cfm.tableName, newName, cfm.cfType, cfm.clockType, cfm.comparator, cfm.subcolumnComparator, cfm.reconciler, cfm.comment, cfm.rowCacheSize, cfm.preloadRowCache, cfm.keyCacheSize, cfm.readRepairChance, cfm.cfId);
         return newCfm;
     }
     
     /** clones existing CFMetaData. keeps the id but changes the table name.*/
     public static CFMetaData renameTable(CFMetaData cfm, String tableName)
     {
-        return new CFMetaData(tableName, cfm.cfName, cfm.cfType, cfm.clockType, cfm.comparator, cfm.subcolumnComparator, cfm.comment, cfm.rowCacheSize, cfm.preloadRowCache, cfm.keyCacheSize, cfm.readRepairChance, cfm.cfId);
+        return new CFMetaData(tableName, cfm.cfName, cfm.cfType, cfm.clockType, cfm.comparator, cfm.subcolumnComparator, cfm.reconciler, cfm.comment, cfm.rowCacheSize, cfm.preloadRowCache, cfm.keyCacheSize, cfm.readRepairChance, cfm.cfId);
     }
     
     /** used for evicting cf data out of static tracking collections. */
@@ -195,6 +199,7 @@ public final class CFMetaData
         dout.writeBoolean(cfm.subcolumnComparator != null);
         if (cfm.subcolumnComparator != null)
             dout.writeUTF(cfm.subcolumnComparator.getClass().getName());
+        dout.writeUTF(cfm.reconciler.getClass().getName());
         dout.writeBoolean(cfm.comment != null);
         if (cfm.comment != null)
             dout.writeUTF(cfm.comment);
@@ -217,13 +222,22 @@ public final class CFMetaData
         AbstractType comparator = DatabaseDescriptor.getComparator(din.readUTF());
         AbstractType subcolumnComparator = null;
         subcolumnComparator = din.readBoolean() ? DatabaseDescriptor.getComparator(din.readUTF()) : null;
+        AbstractReconciler reconciler = null;
+        try
+        {
+            reconciler = (AbstractReconciler)Class.forName(din.readUTF()).newInstance();
+        }
+        catch (Exception ex)
+        {
+            throw new IOException(ex);
+        }
         String comment = din.readBoolean() ? din.readUTF() : null;
         double rowCacheSize = din.readDouble();
         boolean preloadRowCache = din.readBoolean();
         double keyCacheSize = din.readDouble();
         double readRepairChance = din.readDouble();
         int cfId = din.readInt();
-        return new CFMetaData(tableName, cfName, cfType, clockType, comparator, subcolumnComparator, comment, rowCacheSize, preloadRowCache, keyCacheSize, readRepairChance, cfId);
+        return new CFMetaData(tableName, cfName, cfType, clockType, comparator, subcolumnComparator, reconciler, comment, rowCacheSize, preloadRowCache, keyCacheSize, readRepairChance, cfId);
     }
     
 
@@ -246,6 +260,7 @@ public final class CFMetaData
             .append(clockType, rhs.clockType)
             .append(comparator, rhs.comparator)
             .append(subcolumnComparator, rhs.subcolumnComparator)
+            .append(reconciler, rhs.reconciler)
             .append(comment, rhs.comment)
             .append(rowCacheSize, rhs.rowCacheSize)
             .append(keyCacheSize, rhs.keyCacheSize)
@@ -263,6 +278,7 @@ public final class CFMetaData
             .append(clockType)
             .append(comparator)
             .append(subcolumnComparator)
+            .append(reconciler)
             .append(comment)
             .append(rowCacheSize)
             .append(keyCacheSize)

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/ColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/ColumnFamily.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/ColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/ColumnFamily.java Tue Jun  1 15:03:48 2010
@@ -1,10 +1,13 @@
 package org.apache.cassandra.config;
 
+import org.apache.cassandra.db.ClockType;
 import org.apache.cassandra.db.ColumnFamilyType;
 
 public class ColumnFamily {
     public String name;            
     public ColumnFamilyType column_type;
+    public ClockType clock_type;
+    public String reconciler;
     public String compare_with;
     public String compare_subcolumns_with;
     public String comment;

Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Tue Jun  1 15:03:48 2010
@@ -21,6 +21,8 @@ package org.apache.cassandra.config;
 import org.apache.cassandra.auth.AllowAllAuthenticator;
 import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.clock.AbstractReconciler;
+import org.apache.cassandra.db.clock.TimestampReconciler;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.BytesType;
@@ -518,11 +520,22 @@ public class DatabaseDescriptor
                     throw new ConfigurationException("compare_subcolumns_with is only a valid attribute on super columnfamilies (not regular columnfamily " + cf.name + ")");
                 }
                 
+                if (cf.clock_type == null)
+                    cf.clock_type = ClockType.Timestamp; // default
+                
+                AbstractReconciler reconciler = getReconciler(cf.reconciler);
+                if (reconciler == null)
+                {
+                    if (cf.clock_type == ClockType.Timestamp)    
+                        reconciler = new TimestampReconciler(); // default
+                    else
+                        throw new ConfigurationException("No reconciler specified for column family " + cf.name);
+                }
                 if (cf.read_repair_chance < 0.0 || cf.read_repair_chance > 1.0)
                 {                        
                     throw new ConfigurationException("read_repair_chance must be between 0.0 and 1.0");
                 }
-                cfDefs[j++] = new CFMetaData(keyspace.name, cf.name, cfType, ClockType.Timestamp, comparator, subcolumnComparator, cf.comment, cf.rows_cached, cf.preload_row_cache, cf.keys_cached, cf.read_repair_chance);
+                cfDefs[j++] = new CFMetaData(keyspace.name, cf.name, cfType, cf.clock_type, comparator, subcolumnComparator, reconciler, cf.comment, cf.rows_cached, cf.preload_row_cache, cf.keys_cached, cf.read_repair_chance);
             }
             defs.add(new KSMetaData(keyspace.name, strategyClass, keyspace.replication_factor, cfDefs));
             
@@ -582,6 +595,55 @@ public class DatabaseDescriptor
         }
     }
 
+    public static AbstractReconciler getReconciler(String reconcileWith) throws ConfigurationException
+    {
+        if (reconcileWith == null || "".equals(reconcileWith))
+        {
+            return null;
+        }
+        
+        Class<? extends AbstractReconciler> reconcilerClass;
+        {
+            String className = reconcileWith.contains(".") ? reconcileWith :  TimestampReconciler.class.getPackage().getName() + "." + reconcileWith;
+            try
+            {
+                reconcilerClass = (Class<? extends AbstractReconciler>)Class.forName(className);
+            }
+            catch (ClassNotFoundException e)
+            {
+                throw new ConfigurationException("Unable to load class " + className);
+            }
+        }
+        try
+        {
+            return reconcilerClass.getConstructor().newInstance();
+        }
+        catch (InstantiationException e)
+        {
+            ConfigurationException ex = new ConfigurationException(e.getMessage());
+            ex.initCause(e);
+            throw ex;
+        }
+        catch (IllegalAccessException e)
+        {
+            ConfigurationException ex = new ConfigurationException(e.getMessage());
+            ex.initCause(e);
+            throw ex;
+        }
+        catch (InvocationTargetException e)
+        {
+            ConfigurationException ex = new ConfigurationException(e.getMessage());
+            ex.initCause(e);
+            throw ex;
+        }
+        catch (NoSuchMethodException e)
+        {
+            ConfigurationException ex = new ConfigurationException(e.getMessage());
+            ex.initCause(e);
+            throw ex;
+        }
+    }
+
     /**
      * Creates all storage-related directories.
      * @throws IOException when a disk problem is encountered.
@@ -888,6 +950,15 @@ public class DatabaseDescriptor
         return stageQueueSize_;
     }
 
+    public static AbstractReconciler getReconciler(String tableName, String cfName)
+    {
+        assert tableName != null;
+        CFMetaData cfmd = getCFMetaData(tableName, cfName);
+        if (cfmd == null)
+            throw new NullPointerException("Unknown ColumnFamily " + cfName + " in keyspace " + tableName);
+        return cfmd.reconciler;
+    }
+
     /**
      * @return The absolute number of keys that should be cached per table.
      */

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Tue Jun  1 15:03:48 2010
@@ -19,7 +19,6 @@
 package org.apache.cassandra.db;
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
@@ -27,18 +26,17 @@ import java.util.concurrent.ConcurrentSk
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.cassandra.config.CFMetaData;
 
+import org.apache.cassandra.db.IClock.ClockRelationship;
+import org.apache.cassandra.db.clock.AbstractReconciler;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.ICompactSerializer2;
-import org.apache.cassandra.db.IClock.ClockRelationship;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
 
 public class ColumnFamily implements IColumnContainer
 {
@@ -64,31 +62,33 @@ public class ColumnFamily implements ICo
     {
         if (cfm == null)
             throw new IllegalArgumentException("Unknown column family.");
-        return new ColumnFamily(cfm.cfType, cfm.clockType, cfm.comparator, cfm.subcolumnComparator, cfm.cfId);
+        return new ColumnFamily(cfm.cfType, cfm.clockType, cfm.comparator, cfm.subcolumnComparator, cfm.reconciler, cfm.cfId);
     }
 
     private final int cfid;
     private final ColumnFamilyType type;
     private final ClockType clockType;
+    private final AbstractReconciler reconciler;
 
     private transient ICompactSerializer2<IColumn> columnSerializer;
     final AtomicReference<IClock> markedForDeleteAt;
     final AtomicInteger localDeletionTime = new AtomicInteger(Integer.MIN_VALUE);
     private ConcurrentSkipListMap<byte[], IColumn> columns;
 
-    public ColumnFamily(ColumnFamilyType type, ClockType clockType, AbstractType comparator, AbstractType subcolumnComparator, int cfid)
+    public ColumnFamily(ColumnFamilyType type, ClockType clockType, AbstractType comparator, AbstractType subcolumnComparator, AbstractReconciler reconciler, int cfid)
     {
         this.type = type;
         this.clockType = clockType;
+        this.reconciler = reconciler;
         this.markedForDeleteAt = new AtomicReference<IClock>(clockType.minClock());
-        columnSerializer = type == ColumnFamilyType.Standard ? Column.serializer(clockType) : SuperColumn.serializer(subcolumnComparator, clockType);
+        columnSerializer = type == ColumnFamilyType.Standard ? Column.serializer(clockType) : SuperColumn.serializer(subcolumnComparator, clockType, reconciler);
         columns = new ConcurrentSkipListMap<byte[], IColumn>(comparator);
         this.cfid = cfid;
      }
     
     public ColumnFamily cloneMeShallow()
     {
-        ColumnFamily cf = new ColumnFamily(type, clockType, getComparator(), getSubComparator(), cfid);
+        ColumnFamily cf = new ColumnFamily(type, clockType, getComparator(), getSubComparator(), reconciler, cfid);
         cf.markedForDeleteAt.set(markedForDeleteAt.get());
         cf.localDeletionTime.set(localDeletionTime.get());
         return cf;
@@ -108,6 +108,11 @@ public class ColumnFamily implements ICo
     {
         return clockType;
     }
+    
+    public AbstractReconciler getReconciler()
+    {
+        return reconciler;
+    }
 
     public ColumnFamily cloneMe()
     {
@@ -204,7 +209,7 @@ public class ColumnFamily implements ICo
         else
         {
             assert isSuper();
-            c = new SuperColumn(superColumnName, getSubComparator(), clockType);
+            c = new SuperColumn(superColumnName, getSubComparator(), clockType, reconciler);
             c.addColumn(column); // checks subcolumn name
         }
         addColumn(c);
@@ -231,11 +236,15 @@ public class ColumnFamily implements ICo
             }
             else
             {
-                while (ClockRelationship.GREATER_THAN != ((Column) oldColumn).comparePriority((Column) column))
+                // calculate reconciled col from old (existing) col and new col
+                IColumn reconciledColumn = reconciler.reconcile((Column)column, (Column)oldColumn);
+                while (!columns.replace(name, oldColumn, reconciledColumn))
                 {
-                    if (columns.replace(name, oldColumn, column))
-                        break;
+                    // if unable to replace, then get updated old (existing) col
                     oldColumn = columns.get(name);
+                    // re-calculate reconciled col from updated old col and original new col
+                    reconciledColumn = reconciler.reconcile((Column)column, (Column)oldColumn);
+                    // try to re-update value, again
                 }
             }
         }
@@ -291,7 +300,7 @@ public class ColumnFamily implements ICo
      */
     public ColumnFamily diff(ColumnFamily cfComposite)
     {
-        ColumnFamily cfDiff = new ColumnFamily(cfComposite.type, cfComposite.clockType, getComparator(), getSubComparator(), cfComposite.id());
+        ColumnFamily cfDiff = new ColumnFamily(cfComposite.type, cfComposite.clockType, getComparator(), getSubComparator(), cfComposite.reconciler, cfComposite.id());
         ClockRelationship rel = cfComposite.getMarkedForDeleteAt().compare(getMarkedForDeleteAt());
         if (ClockRelationship.GREATER_THAN == rel)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Tue Jun  1 15:03:48 2010
@@ -219,7 +219,7 @@ public class Memtable implements Compara
         if (isStandard)
             startIColumn = new Column(filter.start);
         else
-            startIColumn = new SuperColumn(filter.start, null, cf.getClockType()); // ok to not have subcolumnComparator since we won't be adding columns to this object
+            startIColumn = new SuperColumn(filter.start, null, cf.getClockType(), cf.getReconciler()); // ok to not have subcolumnComparator since we won't be adding columns to this object
 
         // can't use a ColumnComparatorFactory comparator since those compare on both name and time (and thus will fail to match
         // our dummy column, since the time there is arbitrary).

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Tue Jun  1 15:03:48 2010
@@ -24,7 +24,6 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
-import java.nio.ByteBuffer;
 
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
@@ -174,7 +173,7 @@ public class RowMutation
         else if (path.columnName == null)
         {
             SuperColumn sc = new SuperColumn(path.superColumnName, columnFamily.getSubComparator(), 
-                    columnFamily.getClockType());
+                    columnFamily.getClockType(), columnFamily.getReconciler());
             sc.markForDeleteAt(localDeleteTime, clock);
             columnFamily.addColumn(sc);
         }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Tue Jun  1 15:03:48 2010
@@ -30,10 +30,11 @@ import java.util.concurrent.atomic.Atomi
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.IClock.ClockRelationship;
 import org.apache.cassandra.io.ICompactSerializer2;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.IClock.ClockRelationship;
+import org.apache.cassandra.db.clock.AbstractReconciler;
 import org.apache.cassandra.utils.FBUtilities;
 
 
@@ -41,27 +42,29 @@ public class SuperColumn implements ICol
 {
 	private static Logger logger_ = LoggerFactory.getLogger(SuperColumn.class);
 
-    public static SuperColumnSerializer serializer(AbstractType comparator, ClockType clockType)
+    public static SuperColumnSerializer serializer(AbstractType comparator, ClockType clockType, AbstractReconciler reconciler)
     {
-        return new SuperColumnSerializer(comparator, clockType);
+        return new SuperColumnSerializer(comparator, clockType, reconciler);
     }
 
     private byte[] name_;
     private ConcurrentSkipListMap<byte[], IColumn> columns_;
     private AtomicInteger localDeletionTime = new AtomicInteger(Integer.MIN_VALUE);
     private AtomicReference<IClock> markedForDeleteAt;
+    private AbstractReconciler reconciler;
 
-    public SuperColumn(byte[] name, AbstractType comparator, ClockType clockType)
+    public SuperColumn(byte[] name, AbstractType comparator, ClockType clockType, AbstractReconciler reconciler)
     {
-        this(name, new ConcurrentSkipListMap<byte[], IColumn>(comparator), clockType);
+        this(name, new ConcurrentSkipListMap<byte[], IColumn>(comparator), clockType, reconciler);
     }
 
-    private SuperColumn(byte[] name, ConcurrentSkipListMap<byte[], IColumn> columns, ClockType clockType)
+    private SuperColumn(byte[] name, ConcurrentSkipListMap<byte[], IColumn> columns, ClockType clockType, AbstractReconciler reconciler)
     {
         assert name != null;
         assert name.length <= IColumn.MAX_NAME_LENGTH;
-    	name_ = name;
+        name_ = name;
         columns_ = columns;
+        this.reconciler = reconciler;
         markedForDeleteAt = new AtomicReference<IClock>(clockType.minClock());
     }
 
@@ -73,7 +76,7 @@ public class SuperColumn implements ICol
     public SuperColumn cloneMeShallow()
     {
         IClock _markedForDeleteAt = markedForDeleteAt.get();
-        SuperColumn sc = new SuperColumn(name_, getComparator(), _markedForDeleteAt.type());
+        SuperColumn sc = new SuperColumn(name_, getComparator(), _markedForDeleteAt.type(), reconciler);
         sc.markForDeleteAt(localDeletionTime.get(), _markedForDeleteAt);
         return sc;
     }
@@ -81,7 +84,7 @@ public class SuperColumn implements ICol
     public IColumn cloneMe()
     {
         IClock _markedForDeleteAt = markedForDeleteAt.get();
-        SuperColumn sc = new SuperColumn(name_, new ConcurrentSkipListMap<byte[], IColumn>(columns_), _markedForDeleteAt.type());
+        SuperColumn sc = new SuperColumn(name_, new ConcurrentSkipListMap<byte[], IColumn>(columns_), _markedForDeleteAt.type(), reconciler);
         sc.markForDeleteAt(localDeletionTime.get(), _markedForDeleteAt);
         return sc;
     }
@@ -166,18 +169,20 @@ public class SuperColumn implements ICol
 
     public void addColumn(IColumn column)
     {
-    	assert column instanceof Column : "A super column can only contain simple columns";
+        assert column instanceof Column : "A super column can only contain simple columns";
+
         byte[] name = column.name();
         IColumn oldColumn = columns_.putIfAbsent(name, column);
-    	if (oldColumn != null)
+        if (oldColumn != null)
         {
-            ClockRelationship rel = ((Column)oldColumn).comparePriority((Column)column);
-            while (ClockRelationship.GREATER_THAN != rel)
+            IColumn reconciledColumn = reconciler.reconcile((Column)column, (Column)oldColumn);
+            while (!columns_.replace(name, oldColumn, reconciledColumn))
             {
-                if (columns_.replace(name, oldColumn, column))
-                    break;
+                // if unable to replace, then get updated old (existing) col
                 oldColumn = columns_.get(name);
-                rel = ((Column)oldColumn).comparePriority((Column)column);
+                // re-calculate reconciled col from updated old col and original new col
+                reconciledColumn = reconciler.reconcile((Column)column, (Column)oldColumn);
+                // try to re-update value, again
             }
     	}
     }
@@ -211,7 +216,7 @@ public class SuperColumn implements ICol
     public IColumn diff(IColumn columnNew)
     {
         IClock _markedForDeleteAt = markedForDeleteAt.get();
-        IColumn columnDiff = new SuperColumn(columnNew.name(), ((SuperColumn)columnNew).getComparator(), _markedForDeleteAt.type());
+        IColumn columnDiff = new SuperColumn(columnNew.name(), ((SuperColumn)columnNew).getComparator(), _markedForDeleteAt.type(), reconciler);
         ClockRelationship rel = columnNew.getMarkedForDeleteAt().compare(_markedForDeleteAt);
         if (ClockRelationship.GREATER_THAN == rel)
         {
@@ -299,11 +304,13 @@ class SuperColumnSerializer implements I
 {
     private AbstractType comparator;
     private ClockType clockType;
+    private AbstractReconciler reconciler;
 
-    public SuperColumnSerializer(AbstractType comparator, ClockType clockType)
+    public SuperColumnSerializer(AbstractType comparator, ClockType clockType, AbstractReconciler reconciler)
     {
         this.comparator = comparator;
         this.clockType = clockType;
+        this.reconciler = reconciler;
     }
 
     public AbstractType getComparator()
@@ -337,7 +344,7 @@ class SuperColumnSerializer implements I
     public IColumn deserialize(DataInput dis) throws IOException
     {
         byte[] name = FBUtilities.readShortByteArray(dis);
-        SuperColumn superColumn = new SuperColumn(name, comparator, clockType);
+        SuperColumn superColumn = new SuperColumn(name, comparator, clockType, reconciler);
         int localDeleteTime = dis.readInt();
         if (localDeleteTime != Integer.MIN_VALUE && localDeleteTime <= 0)
         {

Added: cassandra/trunk/src/java/org/apache/cassandra/db/clock/AbstractReconciler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/clock/AbstractReconciler.java?rev=950099&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/clock/AbstractReconciler.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/clock/AbstractReconciler.java Tue Jun  1 15:03:48 2010
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.clock;
+
+import org.apache.cassandra.db.Column;
+
+/**
+ * Specifies a Reconciler for a ColumnFamily.
+ */
+public abstract class AbstractReconciler
+{
+    /**
+     * @return Reconcile the two columns into one depending on the clock types. 
+     * For example a strategy could be to keep the one with the highest timestamp.
+     */
+    public abstract Column reconcile(Column left, Column right);
+
+    public final boolean equals(Object obj)
+    {
+        if (obj == null)
+            return false;
+        else
+            return obj.getClass().getName().equals(getClass().getName());
+    }
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/db/clock/TimestampReconciler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/clock/TimestampReconciler.java?rev=950099&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/clock/TimestampReconciler.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/clock/TimestampReconciler.java Tue Jun  1 15:03:48 2010
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.clock;
+
+import org.apache.cassandra.db.Column;
+import org.apache.cassandra.db.IClock.ClockRelationship;
+
+/**
+ * Keeps the column with the highest timestamp. If both are equal
+ * return the left column.
+ */
+public class TimestampReconciler extends AbstractReconciler
+{
+
+    public Column reconcile(Column left, Column right)
+    {
+        ClockRelationship cr = left.clock().compare(right.clock());
+        switch (cr)
+        {
+        case EQUAL:
+        case GREATER_THAN:
+            return left;
+        case LESS_THAN:
+            return right;
+        default:
+            throw new IllegalArgumentException(
+                    "Timestamp clocks must either be equal, greater then or less than: " + cr);
+        }
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Tue Jun  1 15:03:48 2010
@@ -32,6 +32,7 @@ import org.apache.cassandra.auth.AllowAl
 import org.apache.cassandra.auth.SimpleAuthenticator;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.clock.AbstractReconciler;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.thrift.*;
@@ -290,7 +291,8 @@ public class ColumnFamilyRecordReader ex
     {
         AbstractType subComparator = DatabaseDescriptor.getSubComparator(keyspace, cfName);
         ClockType clockType = DatabaseDescriptor.getClockType(keyspace, cfName);
-        org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name, subComparator, clockType);
+        AbstractReconciler reconciler = DatabaseDescriptor.getReconciler(keyspace, cfName);
+        org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name, subComparator, clockType, reconciler);
         for (Column column : super_column.columns)
         {
             sc.addColumn(unthriftifySimple(column));

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Tue Jun  1 15:03:48 2010
@@ -34,6 +34,7 @@ import org.apache.cassandra.dht.IPartiti
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.clock.AbstractReconciler;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.ICompactSerializer2;
@@ -278,9 +279,10 @@ public abstract class SSTableReader exte
     {
         ColumnFamilyType cfType = DatabaseDescriptor.getColumnFamilyType(getTableName(), getColumnFamilyName());
         ClockType clockType = DatabaseDescriptor.getClockType(getTableName(), getColumnFamilyName());
+        AbstractReconciler reconciler = DatabaseDescriptor.getReconciler(getTableName(), getColumnFamilyName());
         return cfType == ColumnFamilyType.Standard
                ? Column.serializer(clockType)
-               : SuperColumn.serializer(getColumnComparator(), clockType);
+               : SuperColumn.serializer(getColumnComparator(), clockType, reconciler);
     }
 
     /**

Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Tue Jun  1 15:03:48 2010
@@ -39,6 +39,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.clock.AbstractReconciler;
+import org.apache.cassandra.db.clock.TimestampReconciler;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.dht.AbstractBounds;
@@ -632,23 +634,7 @@ public class CassandraServer implements 
         
         try
         {
-            ColumnFamilyType cfType = ColumnFamilyType.create(cf_def.column_type);
-            if (cfType == null)
-            {
-              throw new InvalidRequestException("Invalid column type " + cf_def.column_type);
-            }
-            CFMetaData cfm = new CFMetaData(
-                        cf_def.table,
-                        cf_def.name,
-                        cfType,
-                        ClockType.Timestamp,
-                        DatabaseDescriptor.getComparator(cf_def.comparator_type),
-                        cf_def.subcomparator_type.length() == 0 ? null : DatabaseDescriptor.getComparator(cf_def.subcomparator_type),
-                        cf_def.comment,
-                        cf_def.row_cache_size,
-                        cf_def.preload_row_cache,
-                        cf_def.key_cache_size);
-            AddColumnFamily add = new AddColumnFamily(cfm);
+            AddColumnFamily add = new AddColumnFamily(convertToCFMetaData(cf_def));
             add.apply();
             add.announce();
             return DatabaseDescriptor.getDefsVersion().toString();
@@ -742,23 +728,7 @@ public class CassandraServer implements 
             Collection<CFMetaData> cfDefs = new ArrayList<CFMetaData>(ks_def.cf_defs.size());
             for (CfDef cfDef : ks_def.cf_defs)
             {
-                ColumnFamilyType cfType = ColumnFamilyType.create(cfDef.column_type);
-                if (cfType == null)
-                {
-                    throw new InvalidRequestException("Invalid column type " + cfDef.column_type);
-                }
-                CFMetaData cfm = new CFMetaData(
-                        cfDef.table,
-                        cfDef.name,
-                        cfType,
-                        ClockType.Timestamp,
-                        DatabaseDescriptor.getComparator(cfDef.comparator_type),
-                        cfDef.subcomparator_type.length() == 0 ? null : DatabaseDescriptor.getComparator(cfDef.subcomparator_type),
-                        cfDef.comment,
-                        cfDef.row_cache_size,
-                        cfDef.preload_row_cache,
-                        cfDef.key_cache_size);
-                cfDefs.add(cfm);
+                cfDefs.add(convertToCFMetaData(cfDef));
             }
             
             KSMetaData ksm = new KSMetaData(
@@ -848,6 +818,42 @@ public class CassandraServer implements 
             throw ex;
         }
     }
+    
+    private CFMetaData convertToCFMetaData(CfDef cf_def) throws InvalidRequestException, ConfigurationException
+    {
+        ColumnFamilyType cfType = ColumnFamilyType.create(cf_def.column_type);
+        if (cfType == null)
+        {
+          throw new InvalidRequestException("Invalid column type " + cf_def.column_type);
+        }
+        ClockType clockType = ClockType.create(cf_def.clock_type);
+        if (clockType == null)
+        {
+            throw new InvalidRequestException("Invalid clock type " + cf_def.clock_type);
+        }
+        AbstractReconciler reconciler = DatabaseDescriptor.getReconciler(cf_def.reconciler);
+        if (reconciler == null)
+        {
+            if (clockType == ClockType.Timestamp)    
+                reconciler = new TimestampReconciler(); // default
+            else
+                throw new ConfigurationException("No reconciler specified for column family " + cf_def.name);
+
+        }
+        
+        return new CFMetaData(
+                    cf_def.table,
+                    cf_def.name,
+                    cfType,
+                    clockType,
+                    DatabaseDescriptor.getComparator(cf_def.comparator_type),
+                    cf_def.subcomparator_type.length() == 0 ? null : DatabaseDescriptor.getComparator(cf_def.subcomparator_type),
+                    reconciler,
+                    cf_def.comment,
+                    cf_def.row_cache_size,
+                    cf_def.preload_row_cache,
+                    cf_def.key_cache_size);
+    }
 
     @Override
     public void truncate(String keyspace, String cfname) throws InvalidRequestException, UnavailableException, TException

Modified: cassandra/trunk/test/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/cassandra.yaml?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/test/conf/cassandra.yaml (original)
+++ cassandra/trunk/test/conf/cassandra.yaml Tue Jun  1 15:03:48 2010
@@ -32,6 +32,8 @@ keyspaces:
 
         - name: StandardLong1
           compare_with: LongType
+          clock_type: Timestamp
+          reconciler: TimestampReconciler
 
         - name: StandardLong2
           compare_with: LongType

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java Tue Jun  1 15:03:48 2010
@@ -25,6 +25,7 @@ import org.apache.cassandra.config.CFMet
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.db.clock.TimestampReconciler;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.BytesType;
@@ -77,7 +78,7 @@ public class DefsTest extends CleanupHel
     @Test
     public void addNewCfToBogusTable() throws InterruptedException
     {
-        CFMetaData newCf = new CFMetaData("MadeUpKeyspace", "NewCF", ColumnFamilyType.Standard, ClockType.Timestamp, UTF8Type.instance, null, "new cf", 0, false, 0);
+        CFMetaData newCf = new CFMetaData("MadeUpKeyspace", "NewCF", ColumnFamilyType.Standard, ClockType.Timestamp, UTF8Type.instance, null, new TimestampReconciler(), "new cf", 0, false, 0);
         try
         {
             new AddColumnFamily(newCf).apply();
@@ -102,7 +103,7 @@ public class DefsTest extends CleanupHel
         assert DatabaseDescriptor.getDefsVersion().equals(prior);
         
         // add a cf.
-        CFMetaData newCf1 = new CFMetaData("Keyspace1", "MigrationCf_1", ColumnFamilyType.Standard, ClockType.Timestamp, UTF8Type.instance, null, "Migration CF ", 0, false, 0);
+        CFMetaData newCf1 = new CFMetaData("Keyspace1", "MigrationCf_1", ColumnFamilyType.Standard, ClockType.Timestamp, UTF8Type.instance, null, new TimestampReconciler(), "Migration CF ", 0, false, 0);
         Migration m1 = new AddColumnFamily(newCf1);
         m1.apply();
         UUID ver1 = m1.getVersion();
@@ -161,7 +162,7 @@ public class DefsTest extends CleanupHel
         final String cf = "BrandNewCf";
         KSMetaData original = DatabaseDescriptor.getTableDefinition(ks);
 
-        CFMetaData newCf = new CFMetaData(original.name, cf, ColumnFamilyType.Standard, ClockType.Timestamp, UTF8Type.instance, null, "A New Column Family", 0, false, 0);
+        CFMetaData newCf = new CFMetaData(original.name, cf, ColumnFamilyType.Standard, ClockType.Timestamp, UTF8Type.instance, null, new TimestampReconciler(), "A New Column Family", 0, false, 0);
         assert !DatabaseDescriptor.getTableDefinition(ks).cfMetaData().containsKey(newCf.cfName);
         new AddColumnFamily(newCf).apply();
 
@@ -275,7 +276,7 @@ public class DefsTest extends CleanupHel
     public void addNewKS() throws ConfigurationException, IOException, ExecutionException, InterruptedException
     {
         DecoratedKey dk = Util.dk("key0");
-        CFMetaData newCf = new CFMetaData("NewKeyspace1", "AddedStandard1", ColumnFamilyType.Standard, ClockType.Timestamp, UTF8Type.instance, null, "A new cf for a new ks", 0, false, 0);
+        CFMetaData newCf = new CFMetaData("NewKeyspace1", "AddedStandard1", ColumnFamilyType.Standard, ClockType.Timestamp, UTF8Type.instance, null, new TimestampReconciler(), "A new cf for a new ks", 0, false, 0);
         KSMetaData newKs = new KSMetaData(newCf.tableName, RackUnawareStrategy.class, 5, newCf);
         
         new AddKeyspace(newKs).apply();
@@ -431,7 +432,7 @@ public class DefsTest extends CleanupHel
         new AddKeyspace(newKs).apply();
         assert DatabaseDescriptor.getTableDefinition("EmptyKeyspace") != null;
 
-        CFMetaData newCf = new CFMetaData("EmptyKeyspace", "AddedLater", ColumnFamilyType.Standard, ClockType.Timestamp, UTF8Type.instance, null, "A new CF to add to an empty KS", 0, false, 0);
+        CFMetaData newCf = new CFMetaData("EmptyKeyspace", "AddedLater", ColumnFamilyType.Standard, ClockType.Timestamp, UTF8Type.instance, null, new TimestampReconciler(), "A new CF to add to an empty KS", 0, false, 0);
 
         //should not exist until apply
         assert !DatabaseDescriptor.getTableDefinition(newKs.name).cfMetaData().containsKey(newCf.cfName);

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java Tue Jun  1 15:03:48 2010
@@ -46,10 +46,10 @@ public class RowTest extends SchemaLoade
     @Test
     public void testDiffSuperColumn()
     {
-        SuperColumn sc1 = new SuperColumn("one".getBytes(), AsciiType.instance, ClockType.Timestamp);
+        SuperColumn sc1 = new SuperColumn("one".getBytes(), AsciiType.instance, ClockType.Timestamp, null);
         sc1.addColumn(column("subcolumn", "A", new TimestampClock(0)));
 
-        SuperColumn sc2 = new SuperColumn("one".getBytes(), AsciiType.instance, ClockType.Timestamp);
+        SuperColumn sc2 = new SuperColumn("one".getBytes(), AsciiType.instance, ClockType.Timestamp, null);
         sc2.markForDeleteAt(0, new TimestampClock(0));
 
         SuperColumn scDiff = (SuperColumn)sc1.diff(sc2);

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/SuperColumnTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/SuperColumnTest.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/SuperColumnTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/SuperColumnTest.java Tue Jun  1 15:03:48 2010
@@ -22,13 +22,14 @@ import org.junit.Test;
 import static junit.framework.Assert.assertNotNull;
 import static junit.framework.Assert.assertNull;
 import static org.apache.cassandra.Util.getBytes;
+import org.apache.cassandra.db.clock.TimestampReconciler;
 import org.apache.cassandra.db.marshal.LongType;
 
 public class SuperColumnTest
 {   
     @Test
     public void testMissingSubcolumn() {
-    	SuperColumn sc = new SuperColumn("sc1".getBytes(), LongType.instance, ClockType.Timestamp);
+    	SuperColumn sc = new SuperColumn("sc1".getBytes(), LongType.instance, ClockType.Timestamp, new TimestampReconciler());
     	sc.addColumn(new Column(getBytes(1), "value".getBytes(), new TimestampClock(1)));
     	assertNotNull(sc.getSubColumn(getBytes(1)));
     	assertNull(sc.getSubColumn(getBytes(2)));

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java Tue Jun  1 15:03:48 2010
@@ -497,7 +497,7 @@ public class TableTest extends CleanupHe
 
         RowMutation rm = new RowMutation("Keyspace1", ROW.key);
         ColumnFamily cf = ColumnFamily.create("Keyspace1", "Super1");
-        SuperColumn sc = new SuperColumn("sc1".getBytes(), LongType.instance, ClockType.Timestamp);
+        SuperColumn sc = new SuperColumn("sc1".getBytes(), LongType.instance, ClockType.Timestamp, null);
         sc.addColumn(new Column(getBytes(1), "val1".getBytes(), new TimestampClock(1L)));
         cf.addColumn(sc);
         rm.add(cf);

Added: cassandra/trunk/test/unit/org/apache/cassandra/db/clock/TimestampReconcilerTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/clock/TimestampReconcilerTest.java?rev=950099&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/clock/TimestampReconcilerTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/clock/TimestampReconcilerTest.java Tue Jun  1 15:03:48 2010
@@ -0,0 +1,76 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db.clock;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.cassandra.db.Column;
+import org.apache.cassandra.db.DeletedColumn;
+import org.apache.cassandra.db.TimestampClock;
+import org.apache.cassandra.db.clock.TimestampReconciler;
+import org.junit.Test;
+
+public class TimestampReconcilerTest
+{   
+    private static final TimestampReconciler reconciler = new TimestampReconciler();
+
+    @Test
+    public void testReconcileNormal()
+    {
+        TimestampClock leftClock = new TimestampClock(1);
+        TimestampClock rightClock = new TimestampClock(2);
+
+        Column left = new Column(
+                "x".getBytes(),
+                new byte[] {},
+                leftClock);
+        Column right = new Column(
+                "x".getBytes(),
+                new byte[] {},
+                rightClock);
+        
+        Column reconciled = reconciler.reconcile(left, right);
+
+        assertFalse(reconciled.isMarkedForDelete());
+        assertEquals(reconciled, right);
+    }
+
+    @Test
+    public void testReconcileDeleted()
+    {
+        TimestampClock leftClock = new TimestampClock(2);
+        TimestampClock rightClock = new TimestampClock(1);
+
+        Column left = new DeletedColumn(
+                "x".getBytes(),
+                new byte[] {},
+                leftClock);
+        Column right = new Column(
+                "x".getBytes(),
+                new byte[] {},
+                rightClock);
+        
+        Column reconciled = reconciler.reconcile(left, right);
+
+        assertTrue(reconciled.isMarkedForDelete());
+        assertEquals(reconciled, left);
+    }
+}