You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jo...@apache.org on 2010/06/01 17:03:49 UTC
svn commit: r950099 - in /cassandra/trunk: conf/ interface/
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/avro/ src/java/org/apache/cassandra/config/
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra...
Author: johan
Date: Tue Jun 1 15:03:48 2010
New Revision: 950099
URL: http://svn.apache.org/viewvc?rev=950099&view=rev
Log:
Add support for reconciler classes that decide how to resolve conflicting columns. Patch by Kelvin Kakugawa and johan, review by jbellis. CASSANDRA-1144
Added:
cassandra/trunk/src/java/org/apache/cassandra/db/clock/
cassandra/trunk/src/java/org/apache/cassandra/db/clock/AbstractReconciler.java
cassandra/trunk/src/java/org/apache/cassandra/db/clock/TimestampReconciler.java
cassandra/trunk/test/unit/org/apache/cassandra/db/clock/
cassandra/trunk/test/unit/org/apache/cassandra/db/clock/TimestampReconcilerTest.java
Modified:
cassandra/trunk/conf/cassandra.yaml
cassandra/trunk/interface/cassandra.avpr
cassandra/trunk/interface/cassandra.thrift
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
cassandra/trunk/src/java/org/apache/cassandra/config/ColumnFamily.java
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
cassandra/trunk/test/conf/cassandra.yaml
cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java
cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java
cassandra/trunk/test/unit/org/apache/cassandra/db/SuperColumnTest.java
cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
Modified: cassandra/trunk/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Tue Jun 1 15:03:48 2010
@@ -210,6 +210,8 @@ keyspaces:
- name: StandardByUUID1
compare_with: TimeUUIDType
+ clock_type: Timestamp
+ reconciler: TimestampReconciler
- name: Super1
column_type: Super
Modified: cassandra/trunk/interface/cassandra.avpr
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.avpr?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.avpr (original)
+++ cassandra/trunk/interface/cassandra.avpr Tue Jun 1 15:03:48 2010
@@ -84,6 +84,7 @@
{"name": "clock_type", "type": ["string", "null"]},
{"name": "comparator_type", "type": ["string", "null"]},
{"name": "subcomparator_type", "type": ["string", "null"]},
+ {"name": "reconciler", "type": ["string", "null"]},
{"name": "comment", "type": ["string", "null"]},
{"name": "row_cache_size", "type": ["double", "null"]},
{"name": "preload_row_cache", "type": ["boolean", "null"]},
Modified: cassandra/trunk/interface/cassandra.thrift
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.thrift?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.thrift (original)
+++ cassandra/trunk/interface/cassandra.thrift Tue Jun 1 15:03:48 2010
@@ -322,10 +322,11 @@ struct CfDef {
4: optional string clock_type="Timestamp",
5: optional string comparator_type="BytesType",
6: optional string subcomparator_type="",
- 7: optional string comment="",
- 8: optional double row_cache_size=0,
- 9: optional bool preload_row_cache=0,
- 10: optional double key_cache_size=200000,
+ 7: optional string reconciler="",
+ 8: optional string comment="",
+ 9: optional double row_cache_size=0,
+ 10: optional bool preload_row_cache=0,
+ 11: optional double key_cache_size=200000
}
/* describes a keyspace. */
Modified: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java (original)
+++ cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java Tue Jun 1 15:03:48 2010
@@ -53,10 +53,11 @@ public class CfDef implements TBase<CfDe
private static final TField CLOCK_TYPE_FIELD_DESC = new TField("clock_type", TType.STRING, (short)4);
private static final TField COMPARATOR_TYPE_FIELD_DESC = new TField("comparator_type", TType.STRING, (short)5);
private static final TField SUBCOMPARATOR_TYPE_FIELD_DESC = new TField("subcomparator_type", TType.STRING, (short)6);
- private static final TField COMMENT_FIELD_DESC = new TField("comment", TType.STRING, (short)7);
- private static final TField ROW_CACHE_SIZE_FIELD_DESC = new TField("row_cache_size", TType.DOUBLE, (short)8);
- private static final TField PRELOAD_ROW_CACHE_FIELD_DESC = new TField("preload_row_cache", TType.BOOL, (short)9);
- private static final TField KEY_CACHE_SIZE_FIELD_DESC = new TField("key_cache_size", TType.DOUBLE, (short)10);
+ private static final TField RECONCILER_FIELD_DESC = new TField("reconciler", TType.STRING, (short)7);
+ private static final TField COMMENT_FIELD_DESC = new TField("comment", TType.STRING, (short)8);
+ private static final TField ROW_CACHE_SIZE_FIELD_DESC = new TField("row_cache_size", TType.DOUBLE, (short)9);
+ private static final TField PRELOAD_ROW_CACHE_FIELD_DESC = new TField("preload_row_cache", TType.BOOL, (short)10);
+ private static final TField KEY_CACHE_SIZE_FIELD_DESC = new TField("key_cache_size", TType.DOUBLE, (short)11);
public String table;
public String name;
@@ -64,6 +65,7 @@ public class CfDef implements TBase<CfDe
public String clock_type;
public String comparator_type;
public String subcomparator_type;
+ public String reconciler;
public String comment;
public double row_cache_size;
public boolean preload_row_cache;
@@ -77,10 +79,11 @@ public class CfDef implements TBase<CfDe
CLOCK_TYPE((short)4, "clock_type"),
COMPARATOR_TYPE((short)5, "comparator_type"),
SUBCOMPARATOR_TYPE((short)6, "subcomparator_type"),
- COMMENT((short)7, "comment"),
- ROW_CACHE_SIZE((short)8, "row_cache_size"),
- PRELOAD_ROW_CACHE((short)9, "preload_row_cache"),
- KEY_CACHE_SIZE((short)10, "key_cache_size");
+ RECONCILER((short)7, "reconciler"),
+ COMMENT((short)8, "comment"),
+ ROW_CACHE_SIZE((short)9, "row_cache_size"),
+ PRELOAD_ROW_CACHE((short)10, "preload_row_cache"),
+ KEY_CACHE_SIZE((short)11, "key_cache_size");
private static final Map<Integer, _Fields> byId = new HashMap<Integer, _Fields>();
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -152,6 +155,8 @@ public class CfDef implements TBase<CfDe
new FieldValueMetaData(TType.STRING)));
put(_Fields.SUBCOMPARATOR_TYPE, new FieldMetaData("subcomparator_type", TFieldRequirementType.OPTIONAL,
new FieldValueMetaData(TType.STRING)));
+ put(_Fields.RECONCILER, new FieldMetaData("reconciler", TFieldRequirementType.OPTIONAL,
+ new FieldValueMetaData(TType.STRING)));
put(_Fields.COMMENT, new FieldMetaData("comment", TFieldRequirementType.OPTIONAL,
new FieldValueMetaData(TType.STRING)));
put(_Fields.ROW_CACHE_SIZE, new FieldMetaData("row_cache_size", TFieldRequirementType.OPTIONAL,
@@ -175,6 +180,8 @@ public class CfDef implements TBase<CfDe
this.subcomparator_type = "";
+ this.reconciler = "";
+
this.comment = "";
this.row_cache_size = (double)0;
@@ -218,6 +225,9 @@ public class CfDef implements TBase<CfDe
if (other.isSetSubcomparator_type()) {
this.subcomparator_type = other.subcomparator_type;
}
+ if (other.isSetReconciler()) {
+ this.reconciler = other.reconciler;
+ }
if (other.isSetComment()) {
this.comment = other.comment;
}
@@ -379,6 +389,30 @@ public class CfDef implements TBase<CfDe
}
}
+ public String getReconciler() {
+ return this.reconciler;
+ }
+
+ public CfDef setReconciler(String reconciler) {
+ this.reconciler = reconciler;
+ return this;
+ }
+
+ public void unsetReconciler() {
+ this.reconciler = null;
+ }
+
+ /** Returns true if field reconciler is set (has been asigned a value) and false otherwise */
+ public boolean isSetReconciler() {
+ return this.reconciler != null;
+ }
+
+ public void setReconcilerIsSet(boolean value) {
+ if (!value) {
+ this.reconciler = null;
+ }
+ }
+
public String getComment() {
return this.comment;
}
@@ -522,6 +556,14 @@ public class CfDef implements TBase<CfDe
}
break;
+ case RECONCILER:
+ if (value == null) {
+ unsetReconciler();
+ } else {
+ setReconciler((String)value);
+ }
+ break;
+
case COMMENT:
if (value == null) {
unsetComment();
@@ -581,6 +623,9 @@ public class CfDef implements TBase<CfDe
case SUBCOMPARATOR_TYPE:
return getSubcomparator_type();
+ case RECONCILER:
+ return getReconciler();
+
case COMMENT:
return getComment();
@@ -616,6 +661,8 @@ public class CfDef implements TBase<CfDe
return isSetComparator_type();
case SUBCOMPARATOR_TYPE:
return isSetSubcomparator_type();
+ case RECONCILER:
+ return isSetReconciler();
case COMMENT:
return isSetComment();
case ROW_CACHE_SIZE:
@@ -699,6 +746,15 @@ public class CfDef implements TBase<CfDe
return false;
}
+ boolean this_present_reconciler = true && this.isSetReconciler();
+ boolean that_present_reconciler = true && that.isSetReconciler();
+ if (this_present_reconciler || that_present_reconciler) {
+ if (!(this_present_reconciler && that_present_reconciler))
+ return false;
+ if (!this.reconciler.equals(that.reconciler))
+ return false;
+ }
+
boolean this_present_comment = true && this.isSetComment();
boolean that_present_comment = true && that.isSetComment();
if (this_present_comment || that_present_comment) {
@@ -805,6 +861,15 @@ public class CfDef implements TBase<CfDe
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetReconciler()).compareTo(typedOther.isSetReconciler());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetReconciler()) { lastComparison = TBaseHelper.compareTo(reconciler, typedOther.reconciler);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
lastComparison = Boolean.valueOf(isSetComment()).compareTo(typedOther.isSetComment());
if (lastComparison != 0) {
return lastComparison;
@@ -896,14 +961,21 @@ public class CfDef implements TBase<CfDe
TProtocolUtil.skip(iprot, field.type);
}
break;
- case 7: // COMMENT
+ case 7: // RECONCILER
+ if (field.type == TType.STRING) {
+ this.reconciler = iprot.readString();
+ } else {
+ TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 8: // COMMENT
if (field.type == TType.STRING) {
this.comment = iprot.readString();
} else {
TProtocolUtil.skip(iprot, field.type);
}
break;
- case 8: // ROW_CACHE_SIZE
+ case 9: // ROW_CACHE_SIZE
if (field.type == TType.DOUBLE) {
this.row_cache_size = iprot.readDouble();
setRow_cache_sizeIsSet(true);
@@ -911,7 +983,7 @@ public class CfDef implements TBase<CfDe
TProtocolUtil.skip(iprot, field.type);
}
break;
- case 9: // PRELOAD_ROW_CACHE
+ case 10: // PRELOAD_ROW_CACHE
if (field.type == TType.BOOL) {
this.preload_row_cache = iprot.readBool();
setPreload_row_cacheIsSet(true);
@@ -919,7 +991,7 @@ public class CfDef implements TBase<CfDe
TProtocolUtil.skip(iprot, field.type);
}
break;
- case 10: // KEY_CACHE_SIZE
+ case 11: // KEY_CACHE_SIZE
if (field.type == TType.DOUBLE) {
this.key_cache_size = iprot.readDouble();
setKey_cache_sizeIsSet(true);
@@ -980,6 +1052,13 @@ public class CfDef implements TBase<CfDe
oprot.writeFieldEnd();
}
}
+ if (this.reconciler != null) {
+ if (isSetReconciler()) {
+ oprot.writeFieldBegin(RECONCILER_FIELD_DESC);
+ oprot.writeString(this.reconciler);
+ oprot.writeFieldEnd();
+ }
+ }
if (this.comment != null) {
if (isSetComment()) {
oprot.writeFieldBegin(COMMENT_FIELD_DESC);
@@ -1066,6 +1145,16 @@ public class CfDef implements TBase<CfDe
}
first = false;
}
+ if (isSetReconciler()) {
+ if (!first) sb.append(", ");
+ sb.append("reconciler:");
+ if (this.reconciler == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.reconciler);
+ }
+ first = false;
+ }
if (isSetComment()) {
if (!first) sb.append(", ");
sb.append("comment:");
Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Tue Jun 1 15:03:48 2010
@@ -43,6 +43,8 @@ import org.apache.cassandra.config.Confi
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.clock.AbstractReconciler;
+import org.apache.cassandra.db.clock.TimestampReconciler;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.MarshalException;
import org.apache.cassandra.db.migration.AddKeyspace;
@@ -512,19 +514,29 @@ public class CassandraServer implements
Collection<CFMetaData> cfDefs = new ArrayList<CFMetaData>((int)ksDef.cf_defs.size());
for (CfDef cfDef : ksDef.cf_defs)
{
- String cfType, clockType, compare, subCompare;
+ String cfType, compare, subCompare;
cfType = cfDef.column_type == null ? D_CF_CFTYPE : cfDef.column_type.toString();
- clockType = cfDef.clock_type == null ? D_CF_CFCLOCKTYPE : cfDef.clock_type.toString();
+ ClockType clockType = ClockType.create(cfDef.clock_type == null ? D_CF_CFCLOCKTYPE : cfDef.clock_type.toString());
compare = cfDef.comparator_type == null ? D_CF_COMPTYPE : cfDef.comparator_type.toString();
subCompare = cfDef.subcomparator_type == null ? D_CF_SUBCOMPTYPE : cfDef.subcomparator_type.toString();
+ AbstractReconciler reconciler = DatabaseDescriptor.getReconciler(cfDef.reconciler.toString());
+ if (reconciler == null)
+ {
+ if (clockType == ClockType.Timestamp)
+ reconciler = new TimestampReconciler(); // default
+ else
+ throw new ConfigurationException("No reconciler specified for column family " + cfDef.name.toString());
+
+ }
CFMetaData cfmeta = new CFMetaData(
cfDef.keyspace.toString(),
cfDef.name.toString(),
ColumnFamilyType.create(cfType),
- ClockType.create(clockType),
+ clockType,
DatabaseDescriptor.getComparator(compare),
subCompare.length() == 0 ? null : DatabaseDescriptor.getComparator(subCompare),
+ reconciler,
cfDef.comment == null ? D_CF_COMMENT : cfDef.comment.toString(),
cfDef.row_cache_size == null ? D_CF_ROWCACHE : cfDef.row_cache_size,
cfDef.preload_row_cache == null ? D_CF_PRELOAD_ROWCACHE : cfDef.preload_row_cache,
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/CFMetaData.java Tue Jun 1 15:03:48 2010
@@ -36,6 +36,8 @@ import org.apache.commons.lang.builder.H
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.db.ClockType;
+import org.apache.cassandra.db.clock.AbstractReconciler;
+import org.apache.cassandra.db.clock.TimestampReconciler;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.utils.Pair;
@@ -55,10 +57,10 @@ public final class CFMetaData
private static final BiMap<Pair<String, String>, Integer> cfIdMap = HashBiMap.<Pair<String, String>, Integer>create();
- public static final CFMetaData StatusCf = new CFMetaData(Table.SYSTEM_TABLE, SystemTable.STATUS_CF, ColumnFamilyType.Standard, ClockType.Timestamp, UTF8Type.instance, null, "persistent metadata for the local node", 0, false, 0.01, 0);
- public static final CFMetaData HintsCf = new CFMetaData(Table.SYSTEM_TABLE, HintedHandOffManager.HINTS_CF, ColumnFamilyType.Super, ClockType.Timestamp, UTF8Type.instance, BytesType.instance, "hinted handoff data", 0, false, 0.01, 1);
- public static final CFMetaData MigrationsCf = new CFMetaData(Table.SYSTEM_TABLE, Migration.MIGRATIONS_CF, ColumnFamilyType.Standard, ClockType.Timestamp, TimeUUIDType.instance, null, "individual schema mutations", 0, false, 2);
- public static final CFMetaData SchemaCf = new CFMetaData(Table.SYSTEM_TABLE, Migration.SCHEMA_CF, ColumnFamilyType.Standard, ClockType.Timestamp, UTF8Type.instance, null, "current state of the schema", 0, false, 3);
+ public static final CFMetaData StatusCf = new CFMetaData(Table.SYSTEM_TABLE, SystemTable.STATUS_CF, ColumnFamilyType.Standard, ClockType.Timestamp, UTF8Type.instance, null, new TimestampReconciler(), "persistent metadata for the local node", 0, false, 0.01, 0);
+ public static final CFMetaData HintsCf = new CFMetaData(Table.SYSTEM_TABLE, HintedHandOffManager.HINTS_CF, ColumnFamilyType.Super, ClockType.Timestamp, UTF8Type.instance, BytesType.instance, new TimestampReconciler(), "hinted handoff data", 0, false, 0.01, 1);
+ public static final CFMetaData MigrationsCf = new CFMetaData(Table.SYSTEM_TABLE, Migration.MIGRATIONS_CF, ColumnFamilyType.Standard, ClockType.Timestamp, TimeUUIDType.instance, null, new TimestampReconciler(), "individual schema mutations", 0, false, 2);
+ public static final CFMetaData SchemaCf = new CFMetaData(Table.SYSTEM_TABLE, Migration.SCHEMA_CF, ColumnFamilyType.Standard, ClockType.Timestamp, UTF8Type.instance, null, new TimestampReconciler(), "current state of the schema", 0, false, 3);
/**
* @return An immutable mapping of (ksname,cfname) to id.
@@ -105,6 +107,7 @@ public final class CFMetaData
public final ClockType clockType; // clock type: timestamp, etc.
public final AbstractType comparator; // name sorted, time stamp sorted etc.
public final AbstractType subcolumnComparator; // like comparator, for supercolumns
+ public final AbstractReconciler reconciler; // determine correct column from conflicting versions
public final String comment; // for humans only
public final double rowCacheSize; // default 0
public final double keyCacheSize; // default 0.01
@@ -113,7 +116,7 @@ public final class CFMetaData
public boolean preloadRowCache;
- private CFMetaData(String tableName, String cfName, ColumnFamilyType cfType, ClockType clockType, AbstractType comparator, AbstractType subcolumnComparator, String comment, double rowCacheSize, boolean preloadRowCache, double keyCacheSize, double readRepairChance, int cfId)
+ private CFMetaData(String tableName, String cfName, ColumnFamilyType cfType, ClockType clockType, AbstractType comparator, AbstractType subcolumnComparator, AbstractReconciler reconciler, String comment, double rowCacheSize, boolean preloadRowCache, double keyCacheSize, double readRepairChance, int cfId)
{
this.tableName = tableName;
this.cfName = cfName;
@@ -123,6 +126,7 @@ public final class CFMetaData
// the default subcolumncomparator is null per thrift spec, but only should be null if cfType == Standard. If
// cfType == Super, subcolumnComparator should default to BytesType if not set.
this.subcolumnComparator = subcolumnComparator == null && cfType == ColumnFamilyType.Super ? BytesType.instance : subcolumnComparator;
+ this.reconciler = reconciler;
this.comment = comment;
this.rowCacheSize = rowCacheSize;
this.preloadRowCache = preloadRowCache;
@@ -144,27 +148,27 @@ public final class CFMetaData
}
}
- public CFMetaData(String tableName, String cfName, ColumnFamilyType cfType, ClockType clockType, AbstractType comparator, AbstractType subcolumnComparator, String comment, double rowCacheSize, boolean preloadRowCache, double keyCacheSize)
+ public CFMetaData(String tableName, String cfName, ColumnFamilyType cfType, ClockType clockType, AbstractType comparator, AbstractType subcolumnComparator, AbstractReconciler reconciler, String comment, double rowCacheSize, boolean preloadRowCache, double keyCacheSize)
{
- this(tableName, cfName, cfType, clockType, comparator, subcolumnComparator, comment, rowCacheSize, preloadRowCache, keyCacheSize, DEFAULT_READ_REPAIR_CHANCE, nextId());
+ this(tableName, cfName, cfType, clockType, comparator, subcolumnComparator, reconciler, comment, rowCacheSize, preloadRowCache, keyCacheSize, DEFAULT_READ_REPAIR_CHANCE, nextId());
}
- public CFMetaData(String tableName, String cfName, ColumnFamilyType cfType, ClockType clockType, AbstractType comparator, AbstractType subcolumnComparator, String comment, double rowCacheSize, boolean preloadRowCache, double keyCacheSize, double readRepairChance)
+ public CFMetaData(String tableName, String cfName, ColumnFamilyType cfType, ClockType clockType, AbstractType comparator, AbstractType subcolumnComparator, AbstractReconciler reconciler, String comment, double rowCacheSize, boolean preloadRowCache, double keyCacheSize, double readRepairChance)
{
- this(tableName, cfName, cfType, clockType, comparator, subcolumnComparator, comment, rowCacheSize, preloadRowCache, keyCacheSize, readRepairChance, nextId());
+ this(tableName, cfName, cfType, clockType, comparator, subcolumnComparator, reconciler, comment, rowCacheSize, preloadRowCache, keyCacheSize, readRepairChance, nextId());
}
/** clones an existing CFMetaData using the same id. */
public static CFMetaData rename(CFMetaData cfm, String newName)
{
- CFMetaData newCfm = new CFMetaData(cfm.tableName, newName, cfm.cfType, cfm.clockType, cfm.comparator, cfm.subcolumnComparator, cfm.comment, cfm.rowCacheSize, cfm.preloadRowCache, cfm.keyCacheSize, cfm.readRepairChance, cfm.cfId);
+ CFMetaData newCfm = new CFMetaData(cfm.tableName, newName, cfm.cfType, cfm.clockType, cfm.comparator, cfm.subcolumnComparator, cfm.reconciler, cfm.comment, cfm.rowCacheSize, cfm.preloadRowCache, cfm.keyCacheSize, cfm.readRepairChance, cfm.cfId);
return newCfm;
}
/** clones existing CFMetaData. keeps the id but changes the table name.*/
public static CFMetaData renameTable(CFMetaData cfm, String tableName)
{
- return new CFMetaData(tableName, cfm.cfName, cfm.cfType, cfm.clockType, cfm.comparator, cfm.subcolumnComparator, cfm.comment, cfm.rowCacheSize, cfm.preloadRowCache, cfm.keyCacheSize, cfm.readRepairChance, cfm.cfId);
+ return new CFMetaData(tableName, cfm.cfName, cfm.cfType, cfm.clockType, cfm.comparator, cfm.subcolumnComparator, cfm.reconciler, cfm.comment, cfm.rowCacheSize, cfm.preloadRowCache, cfm.keyCacheSize, cfm.readRepairChance, cfm.cfId);
}
/** used for evicting cf data out of static tracking collections. */
@@ -195,6 +199,7 @@ public final class CFMetaData
dout.writeBoolean(cfm.subcolumnComparator != null);
if (cfm.subcolumnComparator != null)
dout.writeUTF(cfm.subcolumnComparator.getClass().getName());
+ dout.writeUTF(cfm.reconciler.getClass().getName());
dout.writeBoolean(cfm.comment != null);
if (cfm.comment != null)
dout.writeUTF(cfm.comment);
@@ -217,13 +222,22 @@ public final class CFMetaData
AbstractType comparator = DatabaseDescriptor.getComparator(din.readUTF());
AbstractType subcolumnComparator = null;
subcolumnComparator = din.readBoolean() ? DatabaseDescriptor.getComparator(din.readUTF()) : null;
+ AbstractReconciler reconciler = null;
+ try
+ {
+ reconciler = (AbstractReconciler)Class.forName(din.readUTF()).newInstance();
+ }
+ catch (Exception ex)
+ {
+ throw new IOException(ex);
+ }
String comment = din.readBoolean() ? din.readUTF() : null;
double rowCacheSize = din.readDouble();
boolean preloadRowCache = din.readBoolean();
double keyCacheSize = din.readDouble();
double readRepairChance = din.readDouble();
int cfId = din.readInt();
- return new CFMetaData(tableName, cfName, cfType, clockType, comparator, subcolumnComparator, comment, rowCacheSize, preloadRowCache, keyCacheSize, readRepairChance, cfId);
+ return new CFMetaData(tableName, cfName, cfType, clockType, comparator, subcolumnComparator, reconciler, comment, rowCacheSize, preloadRowCache, keyCacheSize, readRepairChance, cfId);
}
@@ -246,6 +260,7 @@ public final class CFMetaData
.append(clockType, rhs.clockType)
.append(comparator, rhs.comparator)
.append(subcolumnComparator, rhs.subcolumnComparator)
+ .append(reconciler, rhs.reconciler)
.append(comment, rhs.comment)
.append(rowCacheSize, rhs.rowCacheSize)
.append(keyCacheSize, rhs.keyCacheSize)
@@ -263,6 +278,7 @@ public final class CFMetaData
.append(clockType)
.append(comparator)
.append(subcolumnComparator)
+ .append(reconciler)
.append(comment)
.append(rowCacheSize)
.append(keyCacheSize)
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/ColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/ColumnFamily.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/ColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/ColumnFamily.java Tue Jun 1 15:03:48 2010
@@ -1,10 +1,13 @@
package org.apache.cassandra.config;
+import org.apache.cassandra.db.ClockType;
import org.apache.cassandra.db.ColumnFamilyType;
public class ColumnFamily {
public String name;
public ColumnFamilyType column_type;
+ public ClockType clock_type;
+ public String reconciler;
public String compare_with;
public String compare_subcolumns_with;
public String comment;
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java Tue Jun 1 15:03:48 2010
@@ -21,6 +21,8 @@ package org.apache.cassandra.config;
import org.apache.cassandra.auth.AllowAllAuthenticator;
import org.apache.cassandra.auth.IAuthenticator;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.clock.AbstractReconciler;
+import org.apache.cassandra.db.clock.TimestampReconciler;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.BytesType;
@@ -518,11 +520,22 @@ public class DatabaseDescriptor
throw new ConfigurationException("compare_subcolumns_with is only a valid attribute on super columnfamilies (not regular columnfamily " + cf.name + ")");
}
+ if (cf.clock_type == null)
+ cf.clock_type = ClockType.Timestamp; // default
+
+ AbstractReconciler reconciler = getReconciler(cf.reconciler);
+ if (reconciler == null)
+ {
+ if (cf.clock_type == ClockType.Timestamp)
+ reconciler = new TimestampReconciler(); // default
+ else
+ throw new ConfigurationException("No reconciler specified for column family " + cf.name);
+ }
if (cf.read_repair_chance < 0.0 || cf.read_repair_chance > 1.0)
{
throw new ConfigurationException("read_repair_chance must be between 0.0 and 1.0");
}
- cfDefs[j++] = new CFMetaData(keyspace.name, cf.name, cfType, ClockType.Timestamp, comparator, subcolumnComparator, cf.comment, cf.rows_cached, cf.preload_row_cache, cf.keys_cached, cf.read_repair_chance);
+ cfDefs[j++] = new CFMetaData(keyspace.name, cf.name, cfType, cf.clock_type, comparator, subcolumnComparator, reconciler, cf.comment, cf.rows_cached, cf.preload_row_cache, cf.keys_cached, cf.read_repair_chance);
}
defs.add(new KSMetaData(keyspace.name, strategyClass, keyspace.replication_factor, cfDefs));
@@ -582,6 +595,55 @@ public class DatabaseDescriptor
}
}
+ public static AbstractReconciler getReconciler(String reconcileWith) throws ConfigurationException
+ {
+ if (reconcileWith == null || "".equals(reconcileWith))
+ {
+ return null;
+ }
+
+ Class<? extends AbstractReconciler> reconcilerClass;
+ {
+ String className = reconcileWith.contains(".") ? reconcileWith : TimestampReconciler.class.getPackage().getName() + "." + reconcileWith;
+ try
+ {
+ reconcilerClass = (Class<? extends AbstractReconciler>)Class.forName(className);
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new ConfigurationException("Unable to load class " + className);
+ }
+ }
+ try
+ {
+ return reconcilerClass.getConstructor().newInstance();
+ }
+ catch (InstantiationException e)
+ {
+ ConfigurationException ex = new ConfigurationException(e.getMessage());
+ ex.initCause(e);
+ throw ex;
+ }
+ catch (IllegalAccessException e)
+ {
+ ConfigurationException ex = new ConfigurationException(e.getMessage());
+ ex.initCause(e);
+ throw ex;
+ }
+ catch (InvocationTargetException e)
+ {
+ ConfigurationException ex = new ConfigurationException(e.getMessage());
+ ex.initCause(e);
+ throw ex;
+ }
+ catch (NoSuchMethodException e)
+ {
+ ConfigurationException ex = new ConfigurationException(e.getMessage());
+ ex.initCause(e);
+ throw ex;
+ }
+ }
+
/**
* Creates all storage-related directories.
* @throws IOException when a disk problem is encountered.
@@ -888,6 +950,15 @@ public class DatabaseDescriptor
return stageQueueSize_;
}
+ public static AbstractReconciler getReconciler(String tableName, String cfName)
+ {
+ assert tableName != null;
+ CFMetaData cfmd = getCFMetaData(tableName, cfName);
+ if (cfmd == null)
+ throw new NullPointerException("Unknown ColumnFamily " + cfName + " in keyspace " + tableName);
+ return cfmd.reconciler;
+ }
+
/**
* @return The absolute number of keys that should be cached per table.
*/
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Tue Jun 1 15:03:48 2010
@@ -19,7 +19,6 @@
package org.apache.cassandra.db;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
@@ -27,18 +26,17 @@ import java.util.concurrent.ConcurrentSk
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.IClock.ClockRelationship;
+import org.apache.cassandra.db.clock.AbstractReconciler;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.ICompactSerializer2;
-import org.apache.cassandra.db.IClock.ClockRelationship;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.Pair;
public class ColumnFamily implements IColumnContainer
{
@@ -64,31 +62,33 @@ public class ColumnFamily implements ICo
{
if (cfm == null)
throw new IllegalArgumentException("Unknown column family.");
- return new ColumnFamily(cfm.cfType, cfm.clockType, cfm.comparator, cfm.subcolumnComparator, cfm.cfId);
+ return new ColumnFamily(cfm.cfType, cfm.clockType, cfm.comparator, cfm.subcolumnComparator, cfm.reconciler, cfm.cfId);
}
private final int cfid;
private final ColumnFamilyType type;
private final ClockType clockType;
+ private final AbstractReconciler reconciler;
private transient ICompactSerializer2<IColumn> columnSerializer;
final AtomicReference<IClock> markedForDeleteAt;
final AtomicInteger localDeletionTime = new AtomicInteger(Integer.MIN_VALUE);
private ConcurrentSkipListMap<byte[], IColumn> columns;
- public ColumnFamily(ColumnFamilyType type, ClockType clockType, AbstractType comparator, AbstractType subcolumnComparator, int cfid)
+ public ColumnFamily(ColumnFamilyType type, ClockType clockType, AbstractType comparator, AbstractType subcolumnComparator, AbstractReconciler reconciler, int cfid)
{
this.type = type;
this.clockType = clockType;
+ this.reconciler = reconciler;
this.markedForDeleteAt = new AtomicReference<IClock>(clockType.minClock());
- columnSerializer = type == ColumnFamilyType.Standard ? Column.serializer(clockType) : SuperColumn.serializer(subcolumnComparator, clockType);
+ columnSerializer = type == ColumnFamilyType.Standard ? Column.serializer(clockType) : SuperColumn.serializer(subcolumnComparator, clockType, reconciler);
columns = new ConcurrentSkipListMap<byte[], IColumn>(comparator);
this.cfid = cfid;
}
public ColumnFamily cloneMeShallow()
{
- ColumnFamily cf = new ColumnFamily(type, clockType, getComparator(), getSubComparator(), cfid);
+ ColumnFamily cf = new ColumnFamily(type, clockType, getComparator(), getSubComparator(), reconciler, cfid);
cf.markedForDeleteAt.set(markedForDeleteAt.get());
cf.localDeletionTime.set(localDeletionTime.get());
return cf;
@@ -108,6 +108,11 @@ public class ColumnFamily implements ICo
{
return clockType;
}
+
+ public AbstractReconciler getReconciler()
+ {
+ return reconciler;
+ }
public ColumnFamily cloneMe()
{
@@ -204,7 +209,7 @@ public class ColumnFamily implements ICo
else
{
assert isSuper();
- c = new SuperColumn(superColumnName, getSubComparator(), clockType);
+ c = new SuperColumn(superColumnName, getSubComparator(), clockType, reconciler);
c.addColumn(column); // checks subcolumn name
}
addColumn(c);
@@ -231,11 +236,15 @@ public class ColumnFamily implements ICo
}
else
{
- while (ClockRelationship.GREATER_THAN != ((Column) oldColumn).comparePriority((Column) column))
+ // calculate reconciled col from old (existing) col and new col
+ IColumn reconciledColumn = reconciler.reconcile((Column)column, (Column)oldColumn);
+ while (!columns.replace(name, oldColumn, reconciledColumn))
{
- if (columns.replace(name, oldColumn, column))
- break;
+ // if unable to replace, then get updated old (existing) col
oldColumn = columns.get(name);
+ // re-calculate reconciled col from updated old col and original new col
+ reconciledColumn = reconciler.reconcile((Column)column, (Column)oldColumn);
+ // try to re-update value, again
}
}
}
@@ -291,7 +300,7 @@ public class ColumnFamily implements ICo
*/
public ColumnFamily diff(ColumnFamily cfComposite)
{
- ColumnFamily cfDiff = new ColumnFamily(cfComposite.type, cfComposite.clockType, getComparator(), getSubComparator(), cfComposite.id());
+ ColumnFamily cfDiff = new ColumnFamily(cfComposite.type, cfComposite.clockType, getComparator(), getSubComparator(), cfComposite.reconciler, cfComposite.id());
ClockRelationship rel = cfComposite.getMarkedForDeleteAt().compare(getMarkedForDeleteAt());
if (ClockRelationship.GREATER_THAN == rel)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Tue Jun 1 15:03:48 2010
@@ -219,7 +219,7 @@ public class Memtable implements Compara
if (isStandard)
startIColumn = new Column(filter.start);
else
- startIColumn = new SuperColumn(filter.start, null, cf.getClockType()); // ok to not have subcolumnComparator since we won't be adding columns to this object
+ startIColumn = new SuperColumn(filter.start, null, cf.getClockType(), cf.getReconciler()); // ok to not have subcolumnComparator since we won't be adding columns to this object
// can't use a ColumnComparatorFactory comparator since those compare on both name and time (and thus will fail to match
// our dummy column, since the time there is arbitrary).
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Tue Jun 1 15:03:48 2010
@@ -24,7 +24,6 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ExecutionException;
-import java.nio.ByteBuffer;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
@@ -174,7 +173,7 @@ public class RowMutation
else if (path.columnName == null)
{
SuperColumn sc = new SuperColumn(path.superColumnName, columnFamily.getSubComparator(),
- columnFamily.getClockType());
+ columnFamily.getClockType(), columnFamily.getReconciler());
sc.markForDeleteAt(localDeleteTime, clock);
columnFamily.addColumn(sc);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Tue Jun 1 15:03:48 2010
@@ -30,10 +30,11 @@ import java.util.concurrent.atomic.Atomi
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.db.IClock.ClockRelationship;
import org.apache.cassandra.io.ICompactSerializer2;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.IClock.ClockRelationship;
+import org.apache.cassandra.db.clock.AbstractReconciler;
import org.apache.cassandra.utils.FBUtilities;
@@ -41,27 +42,29 @@ public class SuperColumn implements ICol
{
private static Logger logger_ = LoggerFactory.getLogger(SuperColumn.class);
- public static SuperColumnSerializer serializer(AbstractType comparator, ClockType clockType)
+ public static SuperColumnSerializer serializer(AbstractType comparator, ClockType clockType, AbstractReconciler reconciler)
{
- return new SuperColumnSerializer(comparator, clockType);
+ return new SuperColumnSerializer(comparator, clockType, reconciler);
}
private byte[] name_;
private ConcurrentSkipListMap<byte[], IColumn> columns_;
private AtomicInteger localDeletionTime = new AtomicInteger(Integer.MIN_VALUE);
private AtomicReference<IClock> markedForDeleteAt;
+ private AbstractReconciler reconciler;
- public SuperColumn(byte[] name, AbstractType comparator, ClockType clockType)
+ public SuperColumn(byte[] name, AbstractType comparator, ClockType clockType, AbstractReconciler reconciler)
{
- this(name, new ConcurrentSkipListMap<byte[], IColumn>(comparator), clockType);
+ this(name, new ConcurrentSkipListMap<byte[], IColumn>(comparator), clockType, reconciler);
}
- private SuperColumn(byte[] name, ConcurrentSkipListMap<byte[], IColumn> columns, ClockType clockType)
+ private SuperColumn(byte[] name, ConcurrentSkipListMap<byte[], IColumn> columns, ClockType clockType, AbstractReconciler reconciler)
{
assert name != null;
assert name.length <= IColumn.MAX_NAME_LENGTH;
- name_ = name;
+ name_ = name;
columns_ = columns;
+ this.reconciler = reconciler;
markedForDeleteAt = new AtomicReference<IClock>(clockType.minClock());
}
@@ -73,7 +76,7 @@ public class SuperColumn implements ICol
public SuperColumn cloneMeShallow()
{
IClock _markedForDeleteAt = markedForDeleteAt.get();
- SuperColumn sc = new SuperColumn(name_, getComparator(), _markedForDeleteAt.type());
+ SuperColumn sc = new SuperColumn(name_, getComparator(), _markedForDeleteAt.type(), reconciler);
sc.markForDeleteAt(localDeletionTime.get(), _markedForDeleteAt);
return sc;
}
@@ -81,7 +84,7 @@ public class SuperColumn implements ICol
public IColumn cloneMe()
{
IClock _markedForDeleteAt = markedForDeleteAt.get();
- SuperColumn sc = new SuperColumn(name_, new ConcurrentSkipListMap<byte[], IColumn>(columns_), _markedForDeleteAt.type());
+ SuperColumn sc = new SuperColumn(name_, new ConcurrentSkipListMap<byte[], IColumn>(columns_), _markedForDeleteAt.type(), reconciler);
sc.markForDeleteAt(localDeletionTime.get(), _markedForDeleteAt);
return sc;
}
@@ -166,18 +169,20 @@ public class SuperColumn implements ICol
public void addColumn(IColumn column)
{
- assert column instanceof Column : "A super column can only contain simple columns";
+ assert column instanceof Column : "A super column can only contain simple columns";
+
byte[] name = column.name();
IColumn oldColumn = columns_.putIfAbsent(name, column);
- if (oldColumn != null)
+ if (oldColumn != null)
{
- ClockRelationship rel = ((Column)oldColumn).comparePriority((Column)column);
- while (ClockRelationship.GREATER_THAN != rel)
+ IColumn reconciledColumn = reconciler.reconcile((Column)column, (Column)oldColumn);
+ while (!columns_.replace(name, oldColumn, reconciledColumn))
{
- if (columns_.replace(name, oldColumn, column))
- break;
+ // if unable to replace, then get updated old (existing) col
oldColumn = columns_.get(name);
- rel = ((Column)oldColumn).comparePriority((Column)column);
+ // re-calculate reconciled col from updated old col and original new col
+ reconciledColumn = reconciler.reconcile((Column)column, (Column)oldColumn);
+ // try to re-update value, again
}
}
}
@@ -211,7 +216,7 @@ public class SuperColumn implements ICol
public IColumn diff(IColumn columnNew)
{
IClock _markedForDeleteAt = markedForDeleteAt.get();
- IColumn columnDiff = new SuperColumn(columnNew.name(), ((SuperColumn)columnNew).getComparator(), _markedForDeleteAt.type());
+ IColumn columnDiff = new SuperColumn(columnNew.name(), ((SuperColumn)columnNew).getComparator(), _markedForDeleteAt.type(), reconciler);
ClockRelationship rel = columnNew.getMarkedForDeleteAt().compare(_markedForDeleteAt);
if (ClockRelationship.GREATER_THAN == rel)
{
@@ -299,11 +304,13 @@ class SuperColumnSerializer implements I
{
private AbstractType comparator;
private ClockType clockType;
+ private AbstractReconciler reconciler;
- public SuperColumnSerializer(AbstractType comparator, ClockType clockType)
+ public SuperColumnSerializer(AbstractType comparator, ClockType clockType, AbstractReconciler reconciler)
{
this.comparator = comparator;
this.clockType = clockType;
+ this.reconciler = reconciler;
}
public AbstractType getComparator()
@@ -337,7 +344,7 @@ class SuperColumnSerializer implements I
public IColumn deserialize(DataInput dis) throws IOException
{
byte[] name = FBUtilities.readShortByteArray(dis);
- SuperColumn superColumn = new SuperColumn(name, comparator, clockType);
+ SuperColumn superColumn = new SuperColumn(name, comparator, clockType, reconciler);
int localDeleteTime = dis.readInt();
if (localDeleteTime != Integer.MIN_VALUE && localDeleteTime <= 0)
{
Added: cassandra/trunk/src/java/org/apache/cassandra/db/clock/AbstractReconciler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/clock/AbstractReconciler.java?rev=950099&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/clock/AbstractReconciler.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/clock/AbstractReconciler.java Tue Jun 1 15:03:48 2010
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.clock;
+
+import org.apache.cassandra.db.Column;
+
+/**
+ * Specifies a Reconciler for a ColumnFamily.
+ */
+public abstract class AbstractReconciler
+{
+ /**
+ * @return Reconcile the two columns into one depending on the clock types.
+ * For example a strategy could be to keep the one with the highest timestamp.
+ */
+ public abstract Column reconcile(Column left, Column right);
+
+ public final boolean equals(Object obj)
+ {
+ if (obj == null)
+ return false;
+ else
+ return obj.getClass().getName().equals(getClass().getName());
+ }
+}
Added: cassandra/trunk/src/java/org/apache/cassandra/db/clock/TimestampReconciler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/clock/TimestampReconciler.java?rev=950099&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/clock/TimestampReconciler.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/clock/TimestampReconciler.java Tue Jun 1 15:03:48 2010
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.clock;
+
+import org.apache.cassandra.db.Column;
+import org.apache.cassandra.db.IClock.ClockRelationship;
+
+/**
+ * Keeps the column with the highest timestamp. If both are equal
+ * return the left column.
+ */
+public class TimestampReconciler extends AbstractReconciler
+{
+
+ public Column reconcile(Column left, Column right)
+ {
+ ClockRelationship cr = left.clock().compare(right.clock());
+ switch (cr)
+ {
+ case EQUAL:
+ case GREATER_THAN:
+ return left;
+ case LESS_THAN:
+ return right;
+ default:
+ throw new IllegalArgumentException(
+ "Timestamp clocks must either be equal, greater then or less than: " + cr);
+ }
+ }
+}
Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Tue Jun 1 15:03:48 2010
@@ -32,6 +32,7 @@ import org.apache.cassandra.auth.AllowAl
import org.apache.cassandra.auth.SimpleAuthenticator;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.clock.AbstractReconciler;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.thrift.*;
@@ -290,7 +291,8 @@ public class ColumnFamilyRecordReader ex
{
AbstractType subComparator = DatabaseDescriptor.getSubComparator(keyspace, cfName);
ClockType clockType = DatabaseDescriptor.getClockType(keyspace, cfName);
- org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name, subComparator, clockType);
+ AbstractReconciler reconciler = DatabaseDescriptor.getReconciler(keyspace, cfName);
+ org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name, subComparator, clockType, reconciler);
for (Column column : super_column.columns)
{
sc.addColumn(unthriftifySimple(column));
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Tue Jun 1 15:03:48 2010
@@ -34,6 +34,7 @@ import org.apache.cassandra.dht.IPartiti
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.clock.AbstractReconciler;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.ICompactSerializer2;
@@ -278,9 +279,10 @@ public abstract class SSTableReader exte
{
ColumnFamilyType cfType = DatabaseDescriptor.getColumnFamilyType(getTableName(), getColumnFamilyName());
ClockType clockType = DatabaseDescriptor.getClockType(getTableName(), getColumnFamilyName());
+ AbstractReconciler reconciler = DatabaseDescriptor.getReconciler(getTableName(), getColumnFamilyName());
return cfType == ColumnFamilyType.Standard
? Column.serializer(clockType)
- : SuperColumn.serializer(getColumnComparator(), clockType);
+ : SuperColumn.serializer(getColumnComparator(), clockType, reconciler);
}
/**
Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Tue Jun 1 15:03:48 2010
@@ -39,6 +39,8 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.clock.AbstractReconciler;
+import org.apache.cassandra.db.clock.TimestampReconciler;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.MarshalException;
import org.apache.cassandra.dht.AbstractBounds;
@@ -632,23 +634,7 @@ public class CassandraServer implements
try
{
- ColumnFamilyType cfType = ColumnFamilyType.create(cf_def.column_type);
- if (cfType == null)
- {
- throw new InvalidRequestException("Invalid column type " + cf_def.column_type);
- }
- CFMetaData cfm = new CFMetaData(
- cf_def.table,
- cf_def.name,
- cfType,
- ClockType.Timestamp,
- DatabaseDescriptor.getComparator(cf_def.comparator_type),
- cf_def.subcomparator_type.length() == 0 ? null : DatabaseDescriptor.getComparator(cf_def.subcomparator_type),
- cf_def.comment,
- cf_def.row_cache_size,
- cf_def.preload_row_cache,
- cf_def.key_cache_size);
- AddColumnFamily add = new AddColumnFamily(cfm);
+ AddColumnFamily add = new AddColumnFamily(convertToCFMetaData(cf_def));
add.apply();
add.announce();
return DatabaseDescriptor.getDefsVersion().toString();
@@ -742,23 +728,7 @@ public class CassandraServer implements
Collection<CFMetaData> cfDefs = new ArrayList<CFMetaData>(ks_def.cf_defs.size());
for (CfDef cfDef : ks_def.cf_defs)
{
- ColumnFamilyType cfType = ColumnFamilyType.create(cfDef.column_type);
- if (cfType == null)
- {
- throw new InvalidRequestException("Invalid column type " + cfDef.column_type);
- }
- CFMetaData cfm = new CFMetaData(
- cfDef.table,
- cfDef.name,
- cfType,
- ClockType.Timestamp,
- DatabaseDescriptor.getComparator(cfDef.comparator_type),
- cfDef.subcomparator_type.length() == 0 ? null : DatabaseDescriptor.getComparator(cfDef.subcomparator_type),
- cfDef.comment,
- cfDef.row_cache_size,
- cfDef.preload_row_cache,
- cfDef.key_cache_size);
- cfDefs.add(cfm);
+ cfDefs.add(convertToCFMetaData(cfDef));
}
KSMetaData ksm = new KSMetaData(
@@ -848,6 +818,42 @@ public class CassandraServer implements
throw ex;
}
}
+
+ private CFMetaData convertToCFMetaData(CfDef cf_def) throws InvalidRequestException, ConfigurationException
+ {
+ ColumnFamilyType cfType = ColumnFamilyType.create(cf_def.column_type);
+ if (cfType == null)
+ {
+ throw new InvalidRequestException("Invalid column type " + cf_def.column_type);
+ }
+ ClockType clockType = ClockType.create(cf_def.clock_type);
+ if (clockType == null)
+ {
+ throw new InvalidRequestException("Invalid clock type " + cf_def.clock_type);
+ }
+ AbstractReconciler reconciler = DatabaseDescriptor.getReconciler(cf_def.reconciler);
+ if (reconciler == null)
+ {
+ if (clockType == ClockType.Timestamp)
+ reconciler = new TimestampReconciler(); // default
+ else
+ throw new ConfigurationException("No reconciler specified for column family " + cf_def.name);
+
+ }
+
+ return new CFMetaData(
+ cf_def.table,
+ cf_def.name,
+ cfType,
+ clockType,
+ DatabaseDescriptor.getComparator(cf_def.comparator_type),
+ cf_def.subcomparator_type.length() == 0 ? null : DatabaseDescriptor.getComparator(cf_def.subcomparator_type),
+ reconciler,
+ cf_def.comment,
+ cf_def.row_cache_size,
+ cf_def.preload_row_cache,
+ cf_def.key_cache_size);
+ }
@Override
public void truncate(String keyspace, String cfname) throws InvalidRequestException, UnavailableException, TException
Modified: cassandra/trunk/test/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/cassandra.yaml?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/test/conf/cassandra.yaml (original)
+++ cassandra/trunk/test/conf/cassandra.yaml Tue Jun 1 15:03:48 2010
@@ -32,6 +32,8 @@ keyspaces:
- name: StandardLong1
compare_with: LongType
+ clock_type: Timestamp
+ reconciler: TimestampReconciler
- name: StandardLong2
compare_with: LongType
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/DefsTest.java Tue Jun 1 15:03:48 2010
@@ -25,6 +25,7 @@ import org.apache.cassandra.config.CFMet
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.db.clock.TimestampReconciler;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.BytesType;
@@ -77,7 +78,7 @@ public class DefsTest extends CleanupHel
@Test
public void addNewCfToBogusTable() throws InterruptedException
{
- CFMetaData newCf = new CFMetaData("MadeUpKeyspace", "NewCF", ColumnFamilyType.Standard, ClockType.Timestamp, UTF8Type.instance, null, "new cf", 0, false, 0);
+ CFMetaData newCf = new CFMetaData("MadeUpKeyspace", "NewCF", ColumnFamilyType.Standard, ClockType.Timestamp, UTF8Type.instance, null, new TimestampReconciler(), "new cf", 0, false, 0);
try
{
new AddColumnFamily(newCf).apply();
@@ -102,7 +103,7 @@ public class DefsTest extends CleanupHel
assert DatabaseDescriptor.getDefsVersion().equals(prior);
// add a cf.
- CFMetaData newCf1 = new CFMetaData("Keyspace1", "MigrationCf_1", ColumnFamilyType.Standard, ClockType.Timestamp, UTF8Type.instance, null, "Migration CF ", 0, false, 0);
+ CFMetaData newCf1 = new CFMetaData("Keyspace1", "MigrationCf_1", ColumnFamilyType.Standard, ClockType.Timestamp, UTF8Type.instance, null, new TimestampReconciler(), "Migration CF ", 0, false, 0);
Migration m1 = new AddColumnFamily(newCf1);
m1.apply();
UUID ver1 = m1.getVersion();
@@ -161,7 +162,7 @@ public class DefsTest extends CleanupHel
final String cf = "BrandNewCf";
KSMetaData original = DatabaseDescriptor.getTableDefinition(ks);
- CFMetaData newCf = new CFMetaData(original.name, cf, ColumnFamilyType.Standard, ClockType.Timestamp, UTF8Type.instance, null, "A New Column Family", 0, false, 0);
+ CFMetaData newCf = new CFMetaData(original.name, cf, ColumnFamilyType.Standard, ClockType.Timestamp, UTF8Type.instance, null, new TimestampReconciler(), "A New Column Family", 0, false, 0);
assert !DatabaseDescriptor.getTableDefinition(ks).cfMetaData().containsKey(newCf.cfName);
new AddColumnFamily(newCf).apply();
@@ -275,7 +276,7 @@ public class DefsTest extends CleanupHel
public void addNewKS() throws ConfigurationException, IOException, ExecutionException, InterruptedException
{
DecoratedKey dk = Util.dk("key0");
- CFMetaData newCf = new CFMetaData("NewKeyspace1", "AddedStandard1", ColumnFamilyType.Standard, ClockType.Timestamp, UTF8Type.instance, null, "A new cf for a new ks", 0, false, 0);
+ CFMetaData newCf = new CFMetaData("NewKeyspace1", "AddedStandard1", ColumnFamilyType.Standard, ClockType.Timestamp, UTF8Type.instance, null, new TimestampReconciler(), "A new cf for a new ks", 0, false, 0);
KSMetaData newKs = new KSMetaData(newCf.tableName, RackUnawareStrategy.class, 5, newCf);
new AddKeyspace(newKs).apply();
@@ -431,7 +432,7 @@ public class DefsTest extends CleanupHel
new AddKeyspace(newKs).apply();
assert DatabaseDescriptor.getTableDefinition("EmptyKeyspace") != null;
- CFMetaData newCf = new CFMetaData("EmptyKeyspace", "AddedLater", ColumnFamilyType.Standard, ClockType.Timestamp, UTF8Type.instance, null, "A new CF to add to an empty KS", 0, false, 0);
+ CFMetaData newCf = new CFMetaData("EmptyKeyspace", "AddedLater", ColumnFamilyType.Standard, ClockType.Timestamp, UTF8Type.instance, null, new TimestampReconciler(), "A new CF to add to an empty KS", 0, false, 0);
//should not exist until apply
assert !DatabaseDescriptor.getTableDefinition(newKs.name).cfMetaData().containsKey(newCf.cfName);
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/RowTest.java Tue Jun 1 15:03:48 2010
@@ -46,10 +46,10 @@ public class RowTest extends SchemaLoade
@Test
public void testDiffSuperColumn()
{
- SuperColumn sc1 = new SuperColumn("one".getBytes(), AsciiType.instance, ClockType.Timestamp);
+ SuperColumn sc1 = new SuperColumn("one".getBytes(), AsciiType.instance, ClockType.Timestamp, null);
sc1.addColumn(column("subcolumn", "A", new TimestampClock(0)));
- SuperColumn sc2 = new SuperColumn("one".getBytes(), AsciiType.instance, ClockType.Timestamp);
+ SuperColumn sc2 = new SuperColumn("one".getBytes(), AsciiType.instance, ClockType.Timestamp, null);
sc2.markForDeleteAt(0, new TimestampClock(0));
SuperColumn scDiff = (SuperColumn)sc1.diff(sc2);
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/SuperColumnTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/SuperColumnTest.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/SuperColumnTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/SuperColumnTest.java Tue Jun 1 15:03:48 2010
@@ -22,13 +22,14 @@ import org.junit.Test;
import static junit.framework.Assert.assertNotNull;
import static junit.framework.Assert.assertNull;
import static org.apache.cassandra.Util.getBytes;
+import org.apache.cassandra.db.clock.TimestampReconciler;
import org.apache.cassandra.db.marshal.LongType;
public class SuperColumnTest
{
@Test
public void testMissingSubcolumn() {
- SuperColumn sc = new SuperColumn("sc1".getBytes(), LongType.instance, ClockType.Timestamp);
+ SuperColumn sc = new SuperColumn("sc1".getBytes(), LongType.instance, ClockType.Timestamp, new TimestampReconciler());
sc.addColumn(new Column(getBytes(1), "value".getBytes(), new TimestampClock(1)));
assertNotNull(sc.getSubColumn(getBytes(1)));
assertNull(sc.getSubColumn(getBytes(2)));
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java?rev=950099&r1=950098&r2=950099&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java Tue Jun 1 15:03:48 2010
@@ -497,7 +497,7 @@ public class TableTest extends CleanupHe
RowMutation rm = new RowMutation("Keyspace1", ROW.key);
ColumnFamily cf = ColumnFamily.create("Keyspace1", "Super1");
- SuperColumn sc = new SuperColumn("sc1".getBytes(), LongType.instance, ClockType.Timestamp);
+ SuperColumn sc = new SuperColumn("sc1".getBytes(), LongType.instance, ClockType.Timestamp, null);
sc.addColumn(new Column(getBytes(1), "val1".getBytes(), new TimestampClock(1L)));
cf.addColumn(sc);
rm.add(cf);
Added: cassandra/trunk/test/unit/org/apache/cassandra/db/clock/TimestampReconcilerTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/clock/TimestampReconcilerTest.java?rev=950099&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/clock/TimestampReconcilerTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/clock/TimestampReconcilerTest.java Tue Jun 1 15:03:48 2010
@@ -0,0 +1,76 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db.clock;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.cassandra.db.Column;
+import org.apache.cassandra.db.DeletedColumn;
+import org.apache.cassandra.db.TimestampClock;
+import org.apache.cassandra.db.clock.TimestampReconciler;
+import org.junit.Test;
+
+public class TimestampReconcilerTest
+{
+ private static final TimestampReconciler reconciler = new TimestampReconciler();
+
+ @Test
+ public void testReconcileNormal()
+ {
+ TimestampClock leftClock = new TimestampClock(1);
+ TimestampClock rightClock = new TimestampClock(2);
+
+ Column left = new Column(
+ "x".getBytes(),
+ new byte[] {},
+ leftClock);
+ Column right = new Column(
+ "x".getBytes(),
+ new byte[] {},
+ rightClock);
+
+ Column reconciled = reconciler.reconcile(left, right);
+
+ assertFalse(reconciled.isMarkedForDelete());
+ assertEquals(reconciled, right);
+ }
+
+ @Test
+ public void testReconcileDeleted()
+ {
+ TimestampClock leftClock = new TimestampClock(2);
+ TimestampClock rightClock = new TimestampClock(1);
+
+ Column left = new DeletedColumn(
+ "x".getBytes(),
+ new byte[] {},
+ leftClock);
+ Column right = new Column(
+ "x".getBytes(),
+ new byte[] {},
+ rightClock);
+
+ Column reconciled = reconciler.reconcile(left, right);
+
+ assertTrue(reconciled.isMarkedForDelete());
+ assertEquals(reconciled, left);
+ }
+}