You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/11/15 17:03:41 UTC

git commit: add memtable_flush_period_in_ms patch by yukim; reviewed by jbellis for CASSANDRA-4237

Updated Branches:
  refs/heads/trunk c4481e207 -> 60027c4cc


add memtable_flush_period_in_ms
patch by yukim; reviewed by jbellis for CASSANDRA-4237


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/60027c4c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/60027c4c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/60027c4c

Branch: refs/heads/trunk
Commit: 60027c4ccabaab390dbf4c4bba83ac3a843b3a48
Parents: c4481e2
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Nov 15 17:03:29 2012 +0100
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Nov 15 17:03:29 2012 +0100

----------------------------------------------------------------------
 CHANGES.txt                                        |    4 +
 interface/cassandra.thrift                         |    1 +
 .../org/apache/cassandra/thrift/CfDef.java         |  110 +++++++++++++--
 src/java/org/apache/cassandra/cli/CliClient.java   |    4 +
 .../org/apache/cassandra/config/CFMetaData.java    |   22 +++-
 .../apache/cassandra/cql/AlterTableStatement.java  |    1 +
 src/java/org/apache/cassandra/cql/CFPropDefs.java  |    2 +
 .../cassandra/cql/CreateColumnFamilyStatement.java |    3 +-
 src/java/org/apache/cassandra/cql3/CFPropDefs.java |   13 +-
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   31 ++++-
 src/java/org/apache/cassandra/db/Memtable.java     |   10 ++
 11 files changed, 178 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index be34e89..4445407 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,7 @@
+1.3
+ * add memtable_flush_period_in_ms (CASSANDRA-4237)
+
+
 1.2.1
  * pool [Compressed]RandomAccessReader objects on the partitioned read path
    (CASSANDRA-4942)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/interface/cassandra.thrift
----------------------------------------------------------------------
diff --git a/interface/cassandra.thrift b/interface/cassandra.thrift
index c52263b..0a92a9d 100644
--- a/interface/cassandra.thrift
+++ b/interface/cassandra.thrift
@@ -442,6 +442,7 @@ struct CfDef {
     33: optional double bloom_filter_fp_chance,
     34: optional string caching="keys_only",
     37: optional double dclocal_read_repair_chance = 0.0,
+    38: optional i32 memtable_flush_period_in_ms,
 
     /* All of the following are now ignored and unsupplied. */
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
----------------------------------------------------------------------
diff --git a/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java b/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
index ccf7fad..50ec681 100644
--- a/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
+++ b/interface/thrift/gen-java/org/apache/cassandra/thrift/CfDef.java
@@ -67,6 +67,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
   private static final org.apache.thrift.protocol.TField BLOOM_FILTER_FP_CHANCE_FIELD_DESC = new org.apache.thrift.protocol.TField("bloom_filter_fp_chance", org.apache.thrift.protocol.TType.DOUBLE, (short)33);
   private static final org.apache.thrift.protocol.TField CACHING_FIELD_DESC = new org.apache.thrift.protocol.TField("caching", org.apache.thrift.protocol.TType.STRING, (short)34);
   private static final org.apache.thrift.protocol.TField DCLOCAL_READ_REPAIR_CHANCE_FIELD_DESC = new org.apache.thrift.protocol.TField("dclocal_read_repair_chance", org.apache.thrift.protocol.TType.DOUBLE, (short)37);
+  private static final org.apache.thrift.protocol.TField MEMTABLE_FLUSH_PERIOD_IN_MS_FIELD_DESC = new org.apache.thrift.protocol.TField("memtable_flush_period_in_ms", org.apache.thrift.protocol.TType.I32, (short)38);
   private static final org.apache.thrift.protocol.TField ROW_CACHE_SIZE_FIELD_DESC = new org.apache.thrift.protocol.TField("row_cache_size", org.apache.thrift.protocol.TType.DOUBLE, (short)9);
   private static final org.apache.thrift.protocol.TField KEY_CACHE_SIZE_FIELD_DESC = new org.apache.thrift.protocol.TField("key_cache_size", org.apache.thrift.protocol.TType.DOUBLE, (short)11);
   private static final org.apache.thrift.protocol.TField ROW_CACHE_SAVE_PERIOD_IN_SECONDS_FIELD_DESC = new org.apache.thrift.protocol.TField("row_cache_save_period_in_seconds", org.apache.thrift.protocol.TType.I32, (short)19);
@@ -100,6 +101,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
   public double bloom_filter_fp_chance; // required
   public String caching; // required
   public double dclocal_read_repair_chance; // required
+  public int memtable_flush_period_in_ms; // required
   /**
    * @deprecated
    */
@@ -165,6 +167,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
     BLOOM_FILTER_FP_CHANCE((short)33, "bloom_filter_fp_chance"),
     CACHING((short)34, "caching"),
     DCLOCAL_READ_REPAIR_CHANCE((short)37, "dclocal_read_repair_chance"),
+    MEMTABLE_FLUSH_PERIOD_IN_MS((short)38, "memtable_flush_period_in_ms"),
     /**
      * @deprecated
      */
@@ -263,6 +266,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
           return CACHING;
         case 37: // DCLOCAL_READ_REPAIR_CHANCE
           return DCLOCAL_READ_REPAIR_CHANCE;
+        case 38: // MEMTABLE_FLUSH_PERIOD_IN_MS
+          return MEMTABLE_FLUSH_PERIOD_IN_MS;
         case 9: // ROW_CACHE_SIZE
           return ROW_CACHE_SIZE;
         case 11: // KEY_CACHE_SIZE
@@ -331,16 +336,17 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
   private static final int __REPLICATE_ON_WRITE_ISSET_ID = 5;
   private static final int __BLOOM_FILTER_FP_CHANCE_ISSET_ID = 6;
   private static final int __DCLOCAL_READ_REPAIR_CHANCE_ISSET_ID = 7;
-  private static final int __ROW_CACHE_SIZE_ISSET_ID = 8;
-  private static final int __KEY_CACHE_SIZE_ISSET_ID = 9;
-  private static final int __ROW_CACHE_SAVE_PERIOD_IN_SECONDS_ISSET_ID = 10;
-  private static final int __KEY_CACHE_SAVE_PERIOD_IN_SECONDS_ISSET_ID = 11;
-  private static final int __MEMTABLE_FLUSH_AFTER_MINS_ISSET_ID = 12;
-  private static final int __MEMTABLE_THROUGHPUT_IN_MB_ISSET_ID = 13;
-  private static final int __MEMTABLE_OPERATIONS_IN_MILLIONS_ISSET_ID = 14;
-  private static final int __MERGE_SHARDS_CHANCE_ISSET_ID = 15;
-  private static final int __ROW_CACHE_KEYS_TO_SAVE_ISSET_ID = 16;
-  private BitSet __isset_bit_vector = new BitSet(17);
+  private static final int __MEMTABLE_FLUSH_PERIOD_IN_MS_ISSET_ID = 8;
+  private static final int __ROW_CACHE_SIZE_ISSET_ID = 9;
+  private static final int __KEY_CACHE_SIZE_ISSET_ID = 10;
+  private static final int __ROW_CACHE_SAVE_PERIOD_IN_SECONDS_ISSET_ID = 11;
+  private static final int __KEY_CACHE_SAVE_PERIOD_IN_SECONDS_ISSET_ID = 12;
+  private static final int __MEMTABLE_FLUSH_AFTER_MINS_ISSET_ID = 13;
+  private static final int __MEMTABLE_THROUGHPUT_IN_MB_ISSET_ID = 14;
+  private static final int __MEMTABLE_OPERATIONS_IN_MILLIONS_ISSET_ID = 15;
+  private static final int __MERGE_SHARDS_CHANCE_ISSET_ID = 16;
+  private static final int __ROW_CACHE_KEYS_TO_SAVE_ISSET_ID = 17;
+  private BitSet __isset_bit_vector = new BitSet(18);
 
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
@@ -394,6 +400,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     tmpMap.put(_Fields.DCLOCAL_READ_REPAIR_CHANCE, new org.apache.thrift.meta_data.FieldMetaData("dclocal_read_repair_chance", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.DOUBLE)));
+    tmpMap.put(_Fields.MEMTABLE_FLUSH_PERIOD_IN_MS, new org.apache.thrift.meta_data.FieldMetaData("memtable_flush_period_in_ms", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
     tmpMap.put(_Fields.ROW_CACHE_SIZE, new org.apache.thrift.meta_data.FieldMetaData("row_cache_size", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.DOUBLE)));
     tmpMap.put(_Fields.KEY_CACHE_SIZE, new org.apache.thrift.meta_data.FieldMetaData("key_cache_size", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
@@ -523,6 +531,7 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
       this.caching = other.caching;
     }
     this.dclocal_read_repair_chance = other.dclocal_read_repair_chance;
+    this.memtable_flush_period_in_ms = other.memtable_flush_period_in_ms;
     this.row_cache_size = other.row_cache_size;
     this.key_cache_size = other.key_cache_size;
     this.row_cache_save_period_in_seconds = other.row_cache_save_period_in_seconds;
@@ -576,6 +585,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
 
     this.dclocal_read_repair_chance = 0;
 
+    setMemtable_flush_period_in_msIsSet(false);
+    this.memtable_flush_period_in_ms = 0;
     setRow_cache_sizeIsSet(false);
     this.row_cache_size = 0.0;
     setKey_cache_sizeIsSet(false);
@@ -1164,6 +1175,29 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
     __isset_bit_vector.set(__DCLOCAL_READ_REPAIR_CHANCE_ISSET_ID, value);
   }
 
+  public int getMemtable_flush_period_in_ms() {
+    return this.memtable_flush_period_in_ms;
+  }
+
+  public CfDef setMemtable_flush_period_in_ms(int memtable_flush_period_in_ms) {
+    this.memtable_flush_period_in_ms = memtable_flush_period_in_ms;
+    setMemtable_flush_period_in_msIsSet(true);
+    return this;
+  }
+
+  public void unsetMemtable_flush_period_in_ms() {
+    __isset_bit_vector.clear(__MEMTABLE_FLUSH_PERIOD_IN_MS_ISSET_ID);
+  }
+
+  /** Returns true if field memtable_flush_period_in_ms is set (has been assigned a value) and false otherwise */
+  public boolean isSetMemtable_flush_period_in_ms() {
+    return __isset_bit_vector.get(__MEMTABLE_FLUSH_PERIOD_IN_MS_ISSET_ID);
+  }
+
+  public void setMemtable_flush_period_in_msIsSet(boolean value) {
+    __isset_bit_vector.set(__MEMTABLE_FLUSH_PERIOD_IN_MS_ISSET_ID, value);
+  }
+
   /**
    * @deprecated
    */
@@ -1633,6 +1667,14 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
       }
       break;
 
+    case MEMTABLE_FLUSH_PERIOD_IN_MS:
+      if (value == null) {
+        unsetMemtable_flush_period_in_ms();
+      } else {
+        setMemtable_flush_period_in_ms((Integer)value);
+      }
+      break;
+
     case ROW_CACHE_SIZE:
       if (value == null) {
         unsetRow_cache_size();
@@ -1784,6 +1826,9 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
     case DCLOCAL_READ_REPAIR_CHANCE:
       return Double.valueOf(getDclocal_read_repair_chance());
 
+    case MEMTABLE_FLUSH_PERIOD_IN_MS:
+      return Integer.valueOf(getMemtable_flush_period_in_ms());
+
     case ROW_CACHE_SIZE:
       return Double.valueOf(getRow_cache_size());
 
@@ -1869,6 +1914,8 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
       return isSetCaching();
     case DCLOCAL_READ_REPAIR_CHANCE:
       return isSetDclocal_read_repair_chance();
+    case MEMTABLE_FLUSH_PERIOD_IN_MS:
+      return isSetMemtable_flush_period_in_ms();
     case ROW_CACHE_SIZE:
       return isSetRow_cache_size();
     case KEY_CACHE_SIZE:
@@ -2104,6 +2151,15 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
         return false;
     }
 
+    boolean this_present_memtable_flush_period_in_ms = true && this.isSetMemtable_flush_period_in_ms();
+    boolean that_present_memtable_flush_period_in_ms = true && that.isSetMemtable_flush_period_in_ms();
+    if (this_present_memtable_flush_period_in_ms || that_present_memtable_flush_period_in_ms) {
+      if (!(this_present_memtable_flush_period_in_ms && that_present_memtable_flush_period_in_ms))
+        return false;
+      if (this.memtable_flush_period_in_ms != that.memtable_flush_period_in_ms)
+        return false;
+    }
+
     boolean this_present_row_cache_size = true && this.isSetRow_cache_size();
     boolean that_present_row_cache_size = true && that.isSetRow_cache_size();
     if (this_present_row_cache_size || that_present_row_cache_size) {
@@ -2311,6 +2367,11 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
     if (present_dclocal_read_repair_chance)
       builder.append(dclocal_read_repair_chance);
 
+    boolean present_memtable_flush_period_in_ms = true && (isSetMemtable_flush_period_in_ms());
+    builder.append(present_memtable_flush_period_in_ms);
+    if (present_memtable_flush_period_in_ms)
+      builder.append(memtable_flush_period_in_ms);
+
     boolean present_row_cache_size = true && (isSetRow_cache_size());
     builder.append(present_row_cache_size);
     if (present_row_cache_size)
@@ -2592,6 +2653,16 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
         return lastComparison;
       }
     }
+    lastComparison = Boolean.valueOf(isSetMemtable_flush_period_in_ms()).compareTo(typedOther.isSetMemtable_flush_period_in_ms());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetMemtable_flush_period_in_ms()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.memtable_flush_period_in_ms, typedOther.memtable_flush_period_in_ms);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     lastComparison = Boolean.valueOf(isSetRow_cache_size()).compareTo(typedOther.isSetRow_cache_size());
     if (lastComparison != 0) {
       return lastComparison;
@@ -2906,6 +2977,14 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
           }
           break;
+        case 38: // MEMTABLE_FLUSH_PERIOD_IN_MS
+          if (field.type == org.apache.thrift.protocol.TType.I32) {
+            this.memtable_flush_period_in_ms = iprot.readI32();
+            setMemtable_flush_period_in_msIsSet(true);
+          } else { 
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
         case 9: // ROW_CACHE_SIZE
           if (field.type == org.apache.thrift.protocol.TType.DOUBLE) {
             this.row_cache_size = iprot.readDouble();
@@ -3209,6 +3288,11 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
       oprot.writeDouble(this.dclocal_read_repair_chance);
       oprot.writeFieldEnd();
     }
+    if (isSetMemtable_flush_period_in_ms()) {
+      oprot.writeFieldBegin(MEMTABLE_FLUSH_PERIOD_IN_MS_FIELD_DESC);
+      oprot.writeI32(this.memtable_flush_period_in_ms);
+      oprot.writeFieldEnd();
+    }
     oprot.writeFieldStop();
     oprot.writeStructEnd();
   }
@@ -3401,6 +3485,12 @@ public class CfDef implements org.apache.thrift.TBase<CfDef, CfDef._Fields>, jav
       sb.append(this.dclocal_read_repair_chance);
       first = false;
     }
+    if (isSetMemtable_flush_period_in_ms()) {
+      if (!first) sb.append(", ");
+      sb.append("memtable_flush_period_in_ms:");
+      sb.append(this.memtable_flush_period_in_ms);
+      first = false;
+    }
     if (isSetRow_cache_size()) {
       if (!first) sb.append(", ");
       sb.append("row_cache_size:");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/src/java/org/apache/cassandra/cli/CliClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cli/CliClient.java b/src/java/org/apache/cassandra/cli/CliClient.java
index a4b4483..197a870 100644
--- a/src/java/org/apache/cassandra/cli/CliClient.java
+++ b/src/java/org/apache/cassandra/cli/CliClient.java
@@ -137,6 +137,7 @@ public class CliClient
         COMPACTION_STRATEGY_OPTIONS,
         COMPRESSION_OPTIONS,
         BLOOM_FILTER_FP_CHANCE,
+        MEMTABLE_FLUSH_PERIOD_IN_MS,
         CACHING
     }
 
@@ -1323,6 +1324,9 @@ public class CliClient
             case BLOOM_FILTER_FP_CHANCE:
                 cfDef.setBloom_filter_fp_chance(Double.parseDouble(mValue));
                 break;
+            case MEMTABLE_FLUSH_PERIOD_IN_MS:
+                cfDef.setMemtable_flush_period_in_ms(Integer.parseInt(mValue));
+                break;
             case CACHING:
                 cfDef.setCaching(CliUtils.unescapeSQLString(mValue));
                 break;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index b50abc5..b2b3a3c 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -123,6 +123,7 @@ public final class CFMetaData
                                                                      + "key_validator text,"
                                                                      + "min_compaction_threshold int,"
                                                                      + "max_compaction_threshold int,"
+                                                                     + "memtable_flush_period_in_ms int,"
                                                                      + "key_alias text," // that one is kept for compatibility sake
                                                                      + "key_aliases text,"
                                                                      + "bloom_filter_fp_chance double,"
@@ -258,6 +259,7 @@ public final class CFMetaData
     private volatile ByteBuffer valueAlias = null;
     private volatile Double bloomFilterFpChance = null;
     private volatile Caching caching = DEFAULT_CACHING_STRATEGY;
+    private int memtableFlushPeriod = 0;
 
     volatile Map<ByteBuffer, ColumnDefinition> column_metadata = new HashMap<ByteBuffer,ColumnDefinition>();
     public volatile Class<? extends AbstractCompactionStrategy> compactionStrategyClass = DEFAULT_COMPACTION_STRATEGY_CLASS;
@@ -288,6 +290,7 @@ public final class CFMetaData
     public CFMetaData compressionParameters(CompressionParameters prop) {compressionParameters = prop; return this;}
     public CFMetaData bloomFilterFpChance(Double prop) {bloomFilterFpChance = prop; return this;}
     public CFMetaData caching(Caching prop) {caching = prop; return this;}
+    public CFMetaData memtableFlushPeriod(int prop) {memtableFlushPeriod = prop; return this;}
 
     public CFMetaData(String keyspace, String name, ColumnFamilyType type, AbstractType<?> comp, AbstractType<?> subcc)
     {
@@ -426,7 +429,8 @@ public final class CFMetaData
                       .compactionStrategyOptions(oldCFMD.compactionStrategyOptions)
                       .compressionParameters(oldCFMD.compressionParameters)
                       .bloomFilterFpChance(oldCFMD.bloomFilterFpChance)
-                      .caching(oldCFMD.caching);
+                      .caching(oldCFMD.caching)
+                      .memtableFlushPeriod(oldCFMD.memtableFlushPeriod);
     }
 
     /**
@@ -539,6 +543,11 @@ public final class CFMetaData
         return caching;
     }
 
+    public int getMemtableFlushPeriod()
+    {
+        return memtableFlushPeriod;
+    }
+
     public boolean equals(Object obj)
     {
         if (obj == this)
@@ -575,6 +584,7 @@ public final class CFMetaData
             .append(compactionStrategyOptions, rhs.compactionStrategyOptions)
             .append(compressionParameters, rhs.compressionParameters)
             .append(bloomFilterFpChance, rhs.bloomFilterFpChance)
+            .append(memtableFlushPeriod, rhs.memtableFlushPeriod)
             .append(caching, rhs.caching)
             .isEquals();
     }
@@ -605,6 +615,7 @@ public final class CFMetaData
             .append(compactionStrategyOptions)
             .append(compressionParameters)
             .append(bloomFilterFpChance)
+            .append(memtableFlushPeriod)
             .append(caching)
             .toHashCode();
     }
@@ -677,6 +688,8 @@ public final class CFMetaData
                 newCFMD.compactionStrategyOptions(new HashMap<String, String>(cf_def.compaction_strategy_options));
             if (cf_def.isSetBloom_filter_fp_chance())
                 newCFMD.bloomFilterFpChance(cf_def.bloom_filter_fp_chance);
+            if (cf_def.isSetMemtable_flush_period_in_ms())
+                newCFMD.memtableFlushPeriod(cf_def.memtable_flush_period_in_ms);
             if (cf_def.isSetCaching())
                 newCFMD.caching(Caching.fromString(cf_def.caching));
             if (cf_def.isSetRead_repair_chance())
@@ -786,6 +799,7 @@ public final class CFMetaData
             valueAlias = cfm.valueAlias;
 
         bloomFilterFpChance = cfm.bloomFilterFpChance;
+        memtableFlushPeriod = cfm.memtableFlushPeriod;
         caching = cfm.caching;
 
         MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(column_metadata, cfm.column_metadata);
@@ -879,6 +893,7 @@ public final class CFMetaData
         def.setCompression_options(compressionParameters.asThriftOptions());
         if (bloomFilterFpChance != null)
             def.setBloom_filter_fp_chance(bloomFilterFpChance);
+        def.setMemtable_flush_period_in_ms(memtableFlushPeriod);
         def.setCaching(caching.toString());
         return def;
     }
@@ -1215,6 +1230,7 @@ public final class CFMetaData
         cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "key_validator"));
         cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "min_compaction_threshold"));
         cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "max_compaction_threshold"));
+        cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "memtable_flush_period_in_ms"));
         cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "key_alias"));
         cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "key_aliases"));
         cf.addColumn(DeletedColumn.create(ldt, timestamp, cfName, "bloom_filter_fp_chance"));
@@ -1268,6 +1284,7 @@ public final class CFMetaData
         cf.addColumn(Column.create(json(aliasesAsStrings(keyAliases)), timestamp, cfName, "key_aliases"));
         cf.addColumn(bloomFilterFpChance == null ? DeletedColumn.create(ldt, timestamp, cfName, "bloomFilterFpChance")
                                                  : Column.create(bloomFilterFpChance, timestamp, cfName, "bloom_filter_fp_chance"));
+        cf.addColumn(Column.create(memtableFlushPeriod, timestamp, cfName, "memtable_flush_period_in_ms"));
         cf.addColumn(Column.create(caching.toString(), timestamp, cfName, "caching"));
         cf.addColumn(Column.create(compactionStrategyClass.getName(), timestamp, cfName, "compaction_strategy_class"));
         cf.addColumn(Column.create(json(compressionParameters.asThriftOptions()), timestamp, cfName, "compression_parameters"));
@@ -1312,6 +1329,8 @@ public final class CFMetaData
             }
             if (result.has("bloom_filter_fp_chance"))
                 cfm.bloomFilterFpChance(result.getDouble("bloom_filter_fp_chance"));
+            if (result.has("memtable_flush_period_in_ms"))
+                cfm.memtableFlushPeriod(result.getInt("memtable_flush_period_in_ms"));
             cfm.caching(Caching.valueOf(result.getString("caching")));
             cfm.compactionStrategyClass(createCompactionStrategy(result.getString("compaction_strategy_class")));
             cfm.compressionParameters(CompressionParameters.create(fromJsonMap(result.getString("compression_parameters"))));
@@ -1481,6 +1500,7 @@ public final class CFMetaData
             .append("compactionStrategyOptions", compactionStrategyOptions)
             .append("compressionOptions", compressionParameters.asThriftOptions())
             .append("bloomFilterFpChance", bloomFilterFpChance)
+            .append("memtable_flush_period_in_ms", memtableFlushPeriod)
             .append("caching", caching)
             .toString();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/src/java/org/apache/cassandra/cql/AlterTableStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/AlterTableStatement.java b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
index 5210f25..7680a6b 100644
--- a/src/java/org/apache/cassandra/cql/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql/AlterTableStatement.java
@@ -186,6 +186,7 @@ public class AlterTableStatement
         cfm.maxCompactionThreshold(cfProps.getPropertyInt(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD, cfm.getMaxCompactionThreshold()));
         cfm.caching(CFMetaData.Caching.fromString(cfProps.getPropertyString(CFPropDefs.KW_CACHING, cfm.getCaching().toString())));
         cfm.bloomFilterFpChance(cfProps.getPropertyDouble(CFPropDefs.KW_BF_FP_CHANCE, cfm.getBloomFilterFpChance()));
+        cfm.memtableFlushPeriod(cfProps.getPropertyInt(CFPropDefs.KW_MEMTABLE_FLUSH_PERIOD, cfm.getMemtableFlushPeriod()));
 
         if (!cfProps.compactionStrategyOptions.isEmpty())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/src/java/org/apache/cassandra/cql/CFPropDefs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/CFPropDefs.java b/src/java/org/apache/cassandra/cql/CFPropDefs.java
index d50f2d0..091ee6a 100644
--- a/src/java/org/apache/cassandra/cql/CFPropDefs.java
+++ b/src/java/org/apache/cassandra/cql/CFPropDefs.java
@@ -50,6 +50,7 @@ public class CFPropDefs {
     public static final String KW_COMPACTION_STRATEGY_CLASS = "compaction_strategy_class";
     public static final String KW_CACHING = "caching";
     public static final String KW_BF_FP_CHANCE = "bloom_filter_fp_chance";
+    public static final String KW_MEMTABLE_FLUSH_PERIOD = "memtable_flush_period_in_ms";
 
     // Maps CQL short names to the respective Cassandra comparator/validator class names
     public static final Map<String, String> comparators = new HashMap<String, String>();
@@ -91,6 +92,7 @@ public class CFPropDefs {
         keywords.add(KW_COMPACTION_STRATEGY_CLASS);
         keywords.add(KW_CACHING);
         keywords.add(KW_BF_FP_CHANCE);
+        keywords.add(KW_MEMTABLE_FLUSH_PERIOD);
 
         obsoleteKeywords.add("row_cache_size");
         obsoleteKeywords.add("key_cache_size");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
index fed856f..992166e 100644
--- a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
+++ b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java
@@ -192,7 +192,8 @@ public class CreateColumnFamilyStatement
                    .compactionStrategyOptions(cfProps.compactionStrategyOptions)
                    .compressionParameters(CompressionParameters.create(cfProps.compressionParameters))
                    .caching(CFMetaData.Caching.fromString(getPropertyString(CFPropDefs.KW_CACHING, CFMetaData.DEFAULT_CACHING_STRATEGY.toString())))
-                   .bloomFilterFpChance(getPropertyDouble(CFPropDefs.KW_BF_FP_CHANCE, null));
+                   .bloomFilterFpChance(getPropertyDouble(CFPropDefs.KW_BF_FP_CHANCE, null))
+                   .memtableFlushPeriod(getPropertyInt(CFPropDefs.KW_MEMTABLE_FLUSH_PERIOD, 0));
 
             // CQL2 can have null keyAliases
             if (keyAlias != null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/src/java/org/apache/cassandra/cql3/CFPropDefs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CFPropDefs.java b/src/java/org/apache/cassandra/cql3/CFPropDefs.java
index 0b563cc..61cefff 100644
--- a/src/java/org/apache/cassandra/cql3/CFPropDefs.java
+++ b/src/java/org/apache/cassandra/cql3/CFPropDefs.java
@@ -17,20 +17,16 @@
  */
 package org.apache.cassandra.cql3;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.io.compress.CompressionParameters;
 
 public class CFPropDefs extends PropertyDefinitions
@@ -46,6 +42,7 @@ public class CFPropDefs extends PropertyDefinitions
     public static final String KW_REPLICATEONWRITE = "replicate_on_write";
     public static final String KW_CACHING = "caching";
     public static final String KW_BF_FP_CHANCE = "bloom_filter_fp_chance";
+    public static final String KW_MEMTABLE_FLUSH_PERIOD = "memtable_flush_period_in_ms";
 
     public static final String KW_COMPACTION = "compaction";
     public static final String KW_COMPRESSION = "compression";
@@ -66,6 +63,7 @@ public class CFPropDefs extends PropertyDefinitions
         keywords.add(KW_BF_FP_CHANCE);
         keywords.add(KW_COMPACTION);
         keywords.add(KW_COMPRESSION);
+        keywords.add(KW_MEMTABLE_FLUSH_PERIOD);
 
         obsoleteKeywords.add("compaction_strategy_class");
         obsoleteKeywords.add("compaction_strategy_options");
@@ -128,6 +126,7 @@ public class CFPropDefs extends PropertyDefinitions
         cfm.maxCompactionThreshold(toInt(KW_MAXCOMPACTIONTHRESHOLD, getCompactionOptions().get(KW_MAXCOMPACTIONTHRESHOLD), cfm.getMaxCompactionThreshold()));
         cfm.caching(CFMetaData.Caching.fromString(getString(KW_CACHING, cfm.getCaching().toString())));
         cfm.bloomFilterFpChance(getDouble(KW_BF_FP_CHANCE, cfm.getBloomFilterFpChance()));
+        cfm.memtableFlushPeriod(getInt(KW_MEMTABLE_FLUSH_PERIOD, cfm.getMemtableFlushPeriod()));
 
         if (compactionStrategyClass != null)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 439ef5f..434a5c4 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -33,10 +33,6 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Futures;
-
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.tracing.TraceState;
-import org.apache.cassandra.tracing.Tracing;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,6 +52,7 @@ import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.filter.ExtendedFilter;
+import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.index.SecondaryIndex;
@@ -154,6 +151,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
         maybeReloadCompactionStrategy();
 
+        scheduleFlush();
+
         indexManager.reload();
 
         // If the CF comparator has changed, we need to change the memtable,
@@ -206,6 +205,30 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
+    void scheduleFlush()
+    {
+        int period = metadata.getMemtableFlushPeriod();
+        if (period > 0)
+        {
+            logger.debug("scheduling flush in {} ms", period);
+            WrappedRunnable runnable = new WrappedRunnable()
+            {
+                protected void runMayThrow() throws Exception
+                {
+                    if (getMemtableThreadSafe().isExpired())
+                    {
+                        Future<?> future = forceFlush();
+                        // if memtable is already expired but didn't flush because it's empty,
+                        // then schedule another flush.
+                        if (future == null)
+                            scheduleFlush();
+                    }
+                }
+            };
+            StorageService.scheduledTasks.schedule(runnable, period, TimeUnit.MILLISECONDS);
+        }
+    }
+
     public void setCompactionStrategyClass(String compactionStrategyClass) throws ConfigurationException
     {
         metadata.compactionStrategyClass = CFMetaData.createCompactionStrategy(compactionStrategyClass);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60027c4c/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 82d22ca..bbbe272 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -112,6 +112,7 @@ public class Memtable
         this.cfs = cfs;
         this.creationTime = System.currentTimeMillis();
         this.initialComparator = cfs.metadata.comparator;
+        this.cfs.scheduleFlush();
 
         Callable<Set<Object>> provider = new Callable<Set<Object>>()
         {
@@ -313,6 +314,15 @@ public class Memtable
     }
 
     /**
+     * @return true if this memtable is expired. Expiration time is determined by CF's memtable_flush_period_in_ms.
+     */
+    public boolean isExpired()
+    {
+        int period = cfs.metadata.getMemtableFlushPeriod();
+        return period > 0 && (System.currentTimeMillis() >= creationTime + period);
+    }
+
+    /**
      * obtain an iterator of columns in this memtable in the specified order starting from a given column.
      */
     public static OnDiskAtomIterator getSliceIterator(final DecoratedKey key, final ColumnFamily cf, SliceQueryFilter filter)