You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/11/15 17:03:41 UTC
git commit: add memtable_flush_period_in_ms patch by yukim;
reviewed by jbellis for CASSANDRA-4237
Updated Branches:
refs/heads/trunk c4481e207 -> 60027c4cc
add memtable_flush_period_in_ms
patch by yukim; reviewed by jbellis for CASSANDRA-4237
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/60027c4c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/60027c4c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/60027c4c
Branch: refs/heads/trunk
Commit: 60027c4ccabaab390dbf4c4bba83ac3a843b3a48
Parents: c4481e2
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Nov 15 17:03:29 2012 +0100
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Nov 15 17:03:29 2012 +0100
----------------------------------------------------------------------
CHANGES.txt | 4 +
interface/cassandra.thrift | 1 +
.../org/apache/cassandra/thrift/CfDef.java | 110 +++++++++++++--
src/java/org/apache/cassandra/cli/CliClient.java | 4 +
.../org/apache/cassandra/config/CFMetaData.java | 22 +++-
.../apache/cassandra/cql/AlterTableStatement.java | 1 +
src/java/org/apache/cassandra/cql/CFPropDefs.java | 2 +
.../cassandra/cql/CreateColumnFamilyStatement.java | 3 +-
src/java/org/apache/cassandra/cql3/CFPropDefs.java | 13 +-
.../org/apache/cassandra/db/ColumnFamilyStore.java | 31 ++++-
src/java/org/apache/cassandra/db/Memtable.java | 10 ++
11 files changed, 178 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index be34e89..4445407 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,7 @@
+1.3
+ * add memtable_flush_period_in_ms (CASSANDRA-4237)
+
+
1.2.1
* pool [Compressed]RandomAccessReader objects on the partitioned read path
(CASSANDRA-4942)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/interface/cassandra.thrift
----------------------------------------------------------------------
diff --git a/interface/cassandra.thrift b/interface/cassandra.thrift
index c52263b..0a92a9d 100644
--- a/interface/cassandra.thrift
+++ b/interface/cassandra.thrift
@@ -442,6 +442,7 @@ struct CfDef {
33: optional double bloom_filter_fp_chance,
34: optional string caching="keys_only",
37: optional double dclocal_read_repair_chance = 0.0,
+ 38: optional i32 memtable_flush_period_in_ms,
/* All of the following are now ignored and unsupplied. */
http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
----------------------------------------------------------------------
diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
index ccf7fad..50ec681 100644
--- a/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
+++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
@@ -67,6 +67,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
private static final org.apache.thrift.protocol.TField BLOOM_FILTER_FP_CHANCE_FIELD_DESC = new org.apache.thrift.protocol.TField("bloom_filter_fp_chance", org.apache.thrift.protocol.TType.DOUBLE, (short)33);
private static final org.apache.thrift.protocol.TField CACHING_FIELD_DESC = new org.apache.thrift.protocol.TField("caching", org.apache.thrift.protocol.TType.STRING, (short)34);
private static final org.apache.thrift.protocol.TField DCLOCAL_READ_REPAIR_CHANCE_FIELD_DESC = new org.apache.thrift.protocol.TField("dclocal_read_repair_chance", org.apache.thrift.protocol.TType.DOUBLE, (short)37);
+ private static final org.apache.thrift.protocol.TField MEMTABLE_FLUSH_PERIOD_IN_MS_FIELD_DESC = new org.apache.thrift.protocol.TField("memtable_flush_period_in_ms", org.apache.thrift.protocol.TType.I32, (short)38);
private static final org.apache.thrift.protocol.TField ROW_CACHE_SIZE_FIELD_DESC = new org.apache.thrift.protocol.TField("row_cache_size", org.apache.thrift.protocol.TType.DOUBLE, (short)9);
private static final org.apache.thrift.protocol.TField KEY_CACHE_SIZE_FIELD_DESC = new org.apache.thrift.protocol.TField("key_cache_size", org.apache.thrift.protocol.TType.DOUBLE, (short)11);
private static final org.apache.thrift.protocol.TField ROW_CACHE_SAVE_PERIOD_IN_SECONDS_FIELD_DESC = new org.apache.thrift.protocol.TField("row_cache_save_period_in_seconds", org.apache.thrift.protocol.TType.I32, (short)19);
@@ -100,6 +101,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
public double bloom_filter_fp_chance; // required
public String caching; // required
public double dclocal_read_repair_chance; // required
+ public int memtable_flush_period_in_ms; // required
/**
* @deprecated
*/
@@ -165,6 +167,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
BLOOM_FILTER_FP_CHANCE((short)33, "bloom_filter_fp_chance"),
CACHING((short)34, "caching"),
DCLOCAL_READ_REPAIR_CHANCE((short)37, "dclocal_read_repair_chance"),
+ MEMTABLE_FLUSH_PERIOD_IN_MS((short)38, "memtable_flush_period_in_ms"),
/**
* @deprecated
*/
@@ -263,6 +266,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
return CACHING;
case 37: // DCLOCAL_READ_REPAIR_CHANCE
return DCLOCAL_READ_REPAIR_CHANCE;
+ case 38: // MEMTABLE_FLUSH_PERIOD_IN_MS
+ return MEMTABLE_FLUSH_PERIOD_IN_MS;
case 9: // ROW_CACHE_SIZE
return ROW_CACHE_SIZE;
case 11: // KEY_CACHE_SIZE
@@ -331,16 +336,17 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
private static final int __REPLICATE_ON_WRITE_ISSET_ID = 5;
private static final int __BLOOM_FILTER_FP_CHANCE_ISSET_ID = 6;
private static final int __DCLOCAL_READ_REPAIR_CHANCE_ISSET_ID = 7;
- private static final int __ROW_CACHE_SIZE_ISSET_ID = 8;
- private static final int __KEY_CACHE_SIZE_ISSET_ID = 9;
- private static final int __ROW_CACHE_SAVE_PERIOD_IN_SECONDS_ISSET_ID = 10;
- private static final int __KEY_CACHE_SAVE_PERIOD_IN_SECONDS_ISSET_ID = 11;
- private static final int __MEMTABLE_FLUSH_AFTER_MINS_ISSET_ID = 12;
- private static final int __MEMTABLE_THROUGHPUT_IN_MB_ISSET_ID = 13;
- private static final int __MEMTABLE_OPERATIONS_IN_MILLIONS_ISSET_ID = 14;
- private static final int __MERGE_SHARDS_CHANCE_ISSET_ID = 15;
- private static final int __ROW_CACHE_KEYS_TO_SAVE_ISSET_ID = 16;
- private BitSet __isset_bit_vector = new BitSet(17);
+ private static final int __MEMTABLE_FLUSH_PERIOD_IN_MS_ISSET_ID = 8;
+ private static final int __ROW_CACHE_SIZE_ISSET_ID = 9;
+ private static final int __KEY_CACHE_SIZE_ISSET_ID = 10;
+ private static final int __ROW_CACHE_SAVE_PERIOD_IN_SECONDS_ISSET_ID = 11;
+ private static final int __KEY_CACHE_SAVE_PERIOD_IN_SECONDS_ISSET_ID = 12;
+ private static final int __MEMTABLE_FLUSH_AFTER_MINS_ISSET_ID = 13;
+ private static final int __MEMTABLE_THROUGHPUT_IN_MB_ISSET_ID = 14;
+ private static final int __MEMTABLE_OPERATIONS_IN_MILLIONS_ISSET_ID = 15;
+ private static final int __MERGE_SHARDS_CHANCE_ISSET_ID = 16;
+ private static final int __ROW_CACHE_KEYS_TO_SAVE_ISSET_ID = 17;
+ private BitSet __isset_bit_vector = new BitSet(18);
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
@@ -394,6 +400,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.DCLOCAL_READ_REPAIR_CHANCE, new org.apache.thrift.meta_data.FieldMetaData("dclocal_read_repair_chance", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.DOUBLE)));
+ tmpMap.put(_Fields.MEMTABLE_FLUSH_PERIOD_IN_MS, new org.apache.thrift.meta_data.FieldMetaData("memtable_flush_period_in_ms", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
tmpMap.put(_Fields.ROW_CACHE_SIZE, new org.apache.thrift.meta_data.FieldMetaData("row_cache_size", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.DOUBLE)));
tmpMap.put(_Fields.KEY_CACHE_SIZE, new org.apache.thrift.meta_data.FieldMetaData("key_cache_size", org.apache.thrift.TFieldRequirementType.OPTIONAL,
@@ -523,6 +531,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
this.caching = other.caching;
}
this.dclocal_read_repair_chance = other.dclocal_read_repair_chance;
+ this.memtable_flush_period_in_ms = other.memtable_flush_period_in_ms;
this.row_cache_size = other.row_cache_size;
this.key_cache_size = other.key_cache_size;
this.row_cache_save_period_in_seconds = other.row_cache_save_period_in_seconds;
@@ -576,6 +585,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
this.dclocal_read_repair_chance = 0;
+ setMemtable_flush_period_in_msIsSet(false);
+ this.memtable_flush_period_in_ms = 0;
setRow_cache_sizeIsSet(false);
this.row_cache_size = 0.0;
setKey_cache_sizeIsSet(false);
@@ -1164,6 +1175,29 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
__isset_bit_vector.set(__DCLOCAL_READ_REPAIR_CHANCE_ISSET_ID, value);
}
+ public int getMemtable_flush_period_in_ms() {
+ return this.memtable_flush_period_in_ms;
+ }
+
+ public CfDef setMemtable_flush_period_in_ms(int memtable_flush_period_in_ms) {
+ this.memtable_flush_period_in_ms = memtable_flush_period_in_ms;
+ setMemtable_flush_period_in_msIsSet(true);
+ return this;
+ }
+
+ public void unsetMemtable_flush_period_in_ms() {
+ __isset_bit_vector.clear(__MEMTABLE_FLUSH_PERIOD_IN_MS_ISSET_ID);
+ }
+
+ /** Returns true if field memtable_flush_period_in_ms is set (has been assigned a value) and false otherwise */
+ public boolean isSetMemtable_flush_period_in_ms() {
+ return __isset_bit_vector.get(__MEMTABLE_FLUSH_PERIOD_IN_MS_ISSET_ID);
+ }
+
+ public void setMemtable_flush_period_in_msIsSet(boolean value) {
+ __isset_bit_vector.set(__MEMTABLE_FLUSH_PERIOD_IN_MS_ISSET_ID, value);
+ }
+
/**
* @deprecated
*/
@@ -1633,6 +1667,14 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
}
break;
+ case MEMTABLE_FLUSH_PERIOD_IN_MS:
+ if (value == null) {
+ unsetMemtable_flush_period_in_ms();
+ } else {
+ setMemtable_flush_period_in_ms((Integer)value);
+ }
+ break;
+
case ROW_CACHE_SIZE:
if (value == null) {
unsetRow_cache_size();
@@ -1784,6 +1826,9 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
case DCLOCAL_READ_REPAIR_CHANCE:
return Double.valueOf(getDclocal_read_repair_chance());
+ case MEMTABLE_FLUSH_PERIOD_IN_MS:
+ return Integer.valueOf(getMemtable_flush_period_in_ms());
+
case ROW_CACHE_SIZE:
return Double.valueOf(getRow_cache_size());
@@ -1869,6 +1914,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
return isSetCaching();
case DCLOCAL_READ_REPAIR_CHANCE:
return isSetDclocal_read_repair_chance();
+ case MEMTABLE_FLUSH_PERIOD_IN_MS:
+ return isSetMemtable_flush_period_in_ms();
case ROW_CACHE_SIZE:
return isSetRow_cache_size();
case KEY_CACHE_SIZE:
@@ -2104,6 +2151,15 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
return false;
}
+ boolean this_present_memtable_flush_period_in_ms = true && this.isSetMemtable_flush_period_in_ms();
+ boolean that_present_memtable_flush_period_in_ms = true && that.isSetMemtable_flush_period_in_ms();
+ if (this_present_memtable_flush_period_in_ms || that_present_memtable_flush_period_in_ms) {
+ if (!(this_present_memtable_flush_period_in_ms && that_present_memtable_flush_period_in_ms))
+ return false;
+ if (this.memtable_flush_period_in_ms != that.memtable_flush_period_in_ms)
+ return false;
+ }
+
boolean this_present_row_cache_size = true && this.isSetRow_cache_size();
boolean that_present_row_cache_size = true && that.isSetRow_cache_size();
if (this_present_row_cache_size || that_present_row_cache_size) {
@@ -2311,6 +2367,11 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
if (present_dclocal_read_repair_chance)
builder.append(dclocal_read_repair_chance);
+ boolean present_memtable_flush_period_in_ms = true && (isSetMemtable_flush_period_in_ms());
+ builder.append(present_memtable_flush_period_in_ms);
+ if (present_memtable_flush_period_in_ms)
+ builder.append(memtable_flush_period_in_ms);
+
boolean present_row_cache_size = true && (isSetRow_cache_size());
builder.append(present_row_cache_size);
if (present_row_cache_size)
@@ -2592,6 +2653,16 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetMemtable_flush_period_in_ms()).compareTo(typedOther.isSetMemtable_flush_period_in_ms());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetMemtable_flush_period_in_ms()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.memtable_flush_period_in_ms, typedOther.memtable_flush_period_in_ms);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
lastComparison = Boolean.valueOf(isSetRow_cache_size()).compareTo(typedOther.isSetRow_cache_size());
if (lastComparison != 0) {
return lastComparison;
@@ -2906,6 +2977,14 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
}
break;
+ case 38: // MEMTABLE_FLUSH_PERIOD_IN_MS
+ if (field.type == org.apache.thrift.protocol.TType.I32) {
+ this.memtable_flush_period_in_ms = iprot.readI32();
+ setMemtable_flush_period_in_msIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
case 9: // ROW_CACHE_SIZE
if (field.type == org.apache.thrift.protocol.TType.DOUBLE) {
this.row_cache_size = iprot.readDouble();
@@ -3209,6 +3288,11 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
oprot.writeDouble(this.dclocal_read_repair_chance);
oprot.writeFieldEnd();
}
+ if (isSetMemtable_flush_period_in_ms()) {
+ oprot.writeFieldBegin(MEMTABLE_FLUSH_PERIOD_IN_MS_FIELD_DESC);
+ oprot.writeI32(this.memtable_flush_period_in_ms);
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -3401,6 +3485,12 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
sb.append(this.dclocal_read_repair_chance);
first = false;
}
+ if (isSetMemtable_flush_period_in_ms()) {
+ if (!first) sb.append(", ");
+ sb.append("memtable_flush_period_in_ms:");
+ sb.append(this.memtable_flush_period_in_ms);
+ first = false;
+ }
if (isSetRow_cache_size()) {
if (!first) sb.append(", ");
sb.append("row_cache_size:");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/src/java/org/apache/cassandra/cli/CliClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cli/CliClient.java b/src/java/org/apache/cassandra/cli/CliClient.java
index a4b4483..197a870 100644
--- a/src/java/org/apache/cassandra/cli/CliClient.java
+++ b/src/java/org/apache/cassandra/cli/CliClient.java
@@ -137,6 +137,7 @@ public class CliClient
COMPACTION_STRATEGY_OPTIONS,
COMPRESSION_OPTIONS,
BLOOM_FILTER_FP_CHANCE,
+ MEMTABLE_FLUSH_PERIOD_IN_MS,
CACHING
}
@@ -1323,6 +1324,9 @@ public class CliClient
case BLOOM_FILTER_FP_CHANCE:
cfDef.setBloom_filter_fp_chance(Double.parseDouble(mValue));
break;
+ case MEMTABLE_FLUSH_PERIOD_IN_MS:
+ cfDef.setMemtable_flush_period_in_ms(Integer.parseInt(mValue));
+ break;
case CACHING:
cfDef.setCaching(CliUtils.unescapeSQLString(mValue));
break;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index b50abc5..b2b3a3c 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -123,6 +123,7 @@ public final class CFMetaData
+ "key_validator text,"
+ "min_compaction_threshold int,"
+ "max_compaction_threshold int,"
+ + "memtable_flush_period_in_ms int,"
+ "key_alias text," // that one is kept for compatibility sake
+ "key_aliases text,"
+ "bloom_filter_fp_chance double,"
@@ -258,6 +259,7 @@ public final class CFMetaData
private volatile ByteBuffer valueAlias = null;
private volatile Double bloomFilterFpChance = null;
private volatile Caching caching = DEFAULT_CACHING_STRATEGY;
+ private int memtableFlushPeriod = 0;
volatile Map<ByteBuffer, ColumnDefinition> column_metadata = new HashMap<ByteBuffer,ColumnDefinition>();
public volatile Class<? extends AbstractCompactionStrategy> compactionStrategyClass = DEFAULT_COMPACTION_STRATEGY_CLASS;
@@ -288,6 +290,7 @@ public final class CFMetaData
public CFMetaData compressionParameters(CompressionParameters prop) {compressionParameters = prop; return this;}
public CFMetaData bloomFilterFpChance(Double prop) {bloomFilterFpChance = prop; return this;}
public CFMetaData caching(Caching prop) {caching = prop; return this;}
+ public CFMetaData memtableFlushPeriod(int prop) {memtableFlushPeriod = prop; return this;}
public CFMetaData(String keyspace, String name, ColumnFamilyType type, AbstractType<?> comp, AbstractType<?> subcc)
{
@@ -426,7 +429,8 @@ public final class CFMetaData
.compactionStrategyOptions(oldCFMD.compactionStrategyOptions)
.compressionParameters(oldCFMD.compressionParameters)
.bloomFilterFpChance(oldCFMD.bloomFilterFpChance)
- .caching(oldCFMD.caching);
+ .caching(oldCFMD.caching)
+ .memtableFlushPeriod(oldCFMD.memtableFlushPeriod);
}
/**
@@ -539,6 +543,11 @@ public final class CFMetaData
return caching;
}
+ public int getMemtableFlushPeriod()
+ {
+ return memtableFlushPeriod;
+ }
+
public boolean equals(Object obj)
{
if (obj == this)
@@ -575,6 +584,7 @@ public final class CFMetaData
.append(compactionStrategyOptions, rhs.compactionStrategyOptions)
.append(compressionParameters, rhs.compressionParameters)
.append(bloomFilterFpChance, rhs.bloomFilterFpChance)
+ .append(memtableFlushPeriod, rhs.memtableFlushPeriod)
.append(caching, rhs.caching)
.isEquals();
}
@@ -605,6 +615,7 @@ public final class CFMetaData
.append(compactionStrategyOptions)
.append(compressionParameters)
.append(bloomFilterFpChance)
+ .append(memtableFlushPeriod)
.append(caching)
.toHashCode();
}
@@ -677,6 +688,8 @@ public final class CFMetaData
newCFMD.compactionStrategyOptions(new HashMap<String, String>(cf_def.compaction_strategy_options));
if (cf_def.isSetBloom_filter_fp_chance())
newCFMD.bloomFilterFpChance(cf_def.bloom_filter_fp_chance);
+ if (cf_def.isSetMemtable_flush_period_in_ms())
+ newCFMD.memtableFlushPeriod(cf_def.memtable_flush_period_in_ms);
if (cf_def.isSetCaching())
newCFMD.caching(Caching.fromString(cf_def.caching));
if (cf_def.isSetRead_repair_chance())
@@ -786,6 +799,7 @@ public final class CFMetaData
valueAlias = cfm.valueAlias;
bloomFilterFpChance = cfm.bloomFilterFpChance;
+ memtableFlushPeriod = cfm.memtableFlushPeriod;
caching = cfm.caching;
MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(column_metadata, cfm.column_metadata);
@@ -879,6 +893,7 @@ public final class CFMetaData
def.setCompression_options(compressionParameters.asThriftOptions());
if (bloomFilterFpChance != null)
def.setBloom_filter_fp_chance(bloomFilterFpChance);
+ def.setMemtable_flush_period_in_ms(memtableFlushPeriod);
def.setCaching(caching.toString());
return def;
}
@@ -1215,6 +1230,7 @@ public final class CFMetaData
cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "key_validator"));
cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "min_compaction_threshold"));
cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "max_compaction_threshold"));
+ cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "memtable_flush_period_in_ms"));
cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "key_alias"));
cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "key_aliases"));
cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "bloom_filter_fp_chance"));
@@ -1268,6 +1284,7 @@ public final class CFMetaData
cf.addColumn(Column.create(json(aliasesAsStrings(keyAliases)), timestamp, cfName, "key_aliases"));
cf.addColumn(bloomFilterFpChance == null ? DeletedColumn.create(ldt, timestamp, cfName, "bloomFilterFpChance")
: Column.create(bloomFilterFpChance, timestamp, cfName, "bloom_filter_fp_chance"));
+ cf.addColumn(Column.create(memtableFlushPeriod, timestamp, cfName, "memtable_flush_period_in_ms"));
cf.addColumn(Column.create(caching.toString(), timestamp, cfName, "caching"));
cf.addColumn(Column.create(compactionStrategyClass.getName(), timestamp, cfName, "compaction_strategy_class"));
cf.addColumn(Column.create(json(compressionParameters.asThriftOptions()), timestamp, cfName, "compression_parameters"));
@@ -1312,6 +1329,8 @@ public final class CFMetaData
}
if (result.has("bloom_filter_fp_chance"))
cfm.bloomFilterFpChance(result.getDouble("bloom_filter_fp_chance"));
+ if (result.has("memtable_flush_period_in_ms"))
+ cfm.memtableFlushPeriod(result.getInt("memtable_flush_period_in_ms"));
cfm.caching(Caching.valueOf(result.getString("caching")));
cfm.compactionStrategyClass(createCompactionStrategy(result.getString("compaction_strategy_class")));
cfm.compressionParameters(CompressionParameters.create(fromJsonMap(result.getString("compression_parameters"))));
@@ -1481,6 +1500,7 @@ public final class CFMetaData
.append("compactionStrategyOptions", compactionStrategyOptions)
.append("compressionOptions", compressionParameters.asThriftOptions())
.append("bloomFilterFpChance", bloomFilterFpChance)
+ .append("memtable_flush_period_in_ms", memtableFlushPeriod)
.append("caching", caching)
.toString();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/src/java/org/apache/cassandra/cql/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/AlterTableStatement.java b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
index 5210f25..7680a6b 100644
--- a/src/java/org/apache/cassandra/cql/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
@@ -186,6 +186,7 @@ public class AlterTableStatement
cfm.maxCompactionThreshold(cfProps.getPropertyInt(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD, cfm.getMaxCompactionThreshold()));
cfm.caching(CFMetaData.Caching.fromString(cfProps.getPropertyString(CFPropDefs.KW_CACHING, cfm.getCaching().toString())));
cfm.bloomFilterFpChance(cfProps.getPropertyDouble(CFPropDefs.KW_BF_FP_CHANCE, cfm.getBloomFilterFpChance()));
+ cfm.memtableFlushPeriod(cfProps.getPropertyInt(CFPropDefs.KW_MEMTABLE_FLUSH_PERIOD, cfm.getMemtableFlushPeriod()));
if (!cfProps.compactionStrategyOptions.isEmpty())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/src/java/org/apache/cassandra/cql/CFPropDefs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/CFPropDefs.java b/src/java/org/apache/cassandra/cql/CFPropDefs.java
index d50f2d0..091ee6a 100644
--- a/src/java/org/apache/cassandra/cql/CFPropDefs.java
+++ b/src/java/org/apache/cassandra/cql/CFPropDefs.java
@@ -50,6 +50,7 @@ public class CFPropDefs {
public static final String KW_COMPACTION_STRATEGY_CLASS = "compaction_strategy_class";
public static final String KW_CACHING = "caching";
public static final String KW_BF_FP_CHANCE = "bloom_filter_fp_chance";
+ public static final String KW_MEMTABLE_FLUSH_PERIOD = "memtable_flush_period_in_ms";
// Maps CQL short names to the respective Cassandra comparator/validator class names
public static final Map<String, String> comparators = new HashMap<String, String>();
@@ -91,6 +92,7 @@ public class CFPropDefs {
keywords.add(KW_COMPACTION_STRATEGY_CLASS);
keywords.add(KW_CACHING);
keywords.add(KW_BF_FP_CHANCE);
+ keywords.add(KW_MEMTABLE_FLUSH_PERIOD);
obsoleteKeywords.add("row_cache_size");
obsoleteKeywords.add("key_cache_size");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
index fed856f..992166e 100644
--- a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
+++ b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
@@ -192,7 +192,8 @@ public class CreateColumnFamilyStatement
.compactionStrategyOptions(cfProps.compactionStrategyOptions)
.compressionParameters(CompressionParameters.create(cfProps.compressionParameters))
.caching(CFMetaData.Caching.fromString(getPropertyString(CFPropDefs.KW_CACHING, CFMetaData.DEFAULT_CACHING_STRATEGY.toString())))
- .bloomFilterFpChance(getPropertyDouble(CFPropDefs.KW_BF_FP_CHANCE, null));
+ .bloomFilterFpChance(getPropertyDouble(CFPropDefs.KW_BF_FP_CHANCE, null))
+ .memtableFlushPeriod(getPropertyInt(CFPropDefs.KW_MEMTABLE_FLUSH_PERIOD, 0));
// CQL2 can have null keyAliases
if (keyAlias != null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/src/java/org/apache/cassandra/cql3/CFPropDefs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CFPropDefs.java b/src/java/org/apache/cassandra/cql3/CFPropDefs.java
index 0b563cc..61cefff 100644
--- a/src/java/org/apache/cassandra/cql3/CFPropDefs.java
+++ b/src/java/org/apache/cassandra/cql3/CFPropDefs.java
@@ -17,20 +17,16 @@
*/
package org.apache.cassandra.cql3;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.io.compress.CompressionParameters;
public class CFPropDefs extends PropertyDefinitions
@@ -46,6 +42,7 @@ public class CFPropDefs extends PropertyDefinitions
public static final String KW_REPLICATEONWRITE = "replicate_on_write";
public static final String KW_CACHING = "caching";
public static final String KW_BF_FP_CHANCE = "bloom_filter_fp_chance";
+ public static final String KW_MEMTABLE_FLUSH_PERIOD = "memtable_flush_period_in_ms";
public static final String KW_COMPACTION = "compaction";
public static final String KW_COMPRESSION = "compression";
@@ -66,6 +63,7 @@ public class CFPropDefs extends PropertyDefinitions
keywords.add(KW_BF_FP_CHANCE);
keywords.add(KW_COMPACTION);
keywords.add(KW_COMPRESSION);
+ keywords.add(KW_MEMTABLE_FLUSH_PERIOD);
obsoleteKeywords.add("compaction_strategy_class");
obsoleteKeywords.add("compaction_strategy_options");
@@ -128,6 +126,7 @@ public class CFPropDefs extends PropertyDefinitions
cfm.maxCompactionThreshold(toInt(KW_MAXCOMPACTIONTHRESHOLD, getCompactionOptions().get(KW_MAXCOMPACTIONTHRESHOLD), cfm.getMaxCompactionThreshold()));
cfm.caching(CFMetaData.Caching.fromString(getString(KW_CACHING, cfm.getCaching().toString())));
cfm.bloomFilterFpChance(getDouble(KW_BF_FP_CHANCE, cfm.getBloomFilterFpChance()));
+ cfm.memtableFlushPeriod(getInt(KW_MEMTABLE_FLUSH_PERIOD, cfm.getMemtableFlushPeriod()));
if (compactionStrategyClass != null)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 439ef5f..434a5c4 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -33,10 +33,6 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
-
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.tracing.TraceState;
-import org.apache.cassandra.tracing.Tracing;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,6 +52,7 @@ import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.filter.ExtendedFilter;
+import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.index.SecondaryIndex;
@@ -154,6 +151,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
maybeReloadCompactionStrategy();
+ scheduleFlush();
+
indexManager.reload();
// If the CF comparator has changed, we need to change the memtable,
@@ -206,6 +205,30 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
}
+ void scheduleFlush()
+ {
+ int period = metadata.getMemtableFlushPeriod();
+ if (period > 0)
+ {
+ logger.debug("scheduling flush in {} ms", period);
+ WrappedRunnable runnable = new WrappedRunnable()
+ {
+ protected void runMayThrow() throws Exception
+ {
+ if (getMemtableThreadSafe().isExpired())
+ {
+ Future<?> future = forceFlush();
+ // if memtable is already expired but didn't flush because it's empty,
+ // then schedule another flush.
+ if (future == null)
+ scheduleFlush();
+ }
+ }
+ };
+ StorageService.scheduledTasks.schedule(runnable, period, TimeUnit.MILLISECONDS);
+ }
+ }
+
public void setCompactionStrategyClass(String compactionStrategyClass) throws ConfigurationException
{
metadata.compactionStrategyClass = CFMetaData.createCompactionStrategy(compactionStrategyClass);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 82d22ca..bbbe272 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -112,6 +112,7 @@ public class Memtable
this.cfs = cfs;
this.creationTime = System.currentTimeMillis();
this.initialComparator = cfs.metadata.comparator;
+ this.cfs.scheduleFlush();
Callable<Set<Object>> provider = new Callable<Set<Object>>()
{
@@ -313,6 +314,15 @@ public class Memtable
}
/**
+ * @return true if this memtable is expired. Expiration time is determined by CF's memtable_flush_period_in_ms.
+ */
+ public boolean isExpired()
+ {
+ int period = cfs.metadata.getMemtableFlushPeriod();
+ return period > 0 && (System.currentTimeMillis() >= creationTime + period);
+ }
+
+ /**
* obtain an iterator of columns in this memtable in the specified order starting from a given column.
*/
public static OnDiskAtomIterator getSliceIterator(final DecoratedKey key, final ColumnFamily cf, SliceQueryFilter filter)