You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pa...@apache.org on 2017/12/06 21:25:25 UTC

[2/6] cassandra git commit: Fix updating base table rows with TTL not removing materialized view entries

Fix updating base table rows with TTL not removing materialized view entries

Patch by Zhao Yang; Reviewed by Paulo Motta for CASSANDRA-14071


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/461af5b9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/461af5b9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/461af5b9

Branch: refs/heads/cassandra-3.11
Commit: 461af5b9a6f58b6ed3db78a879840816b906cac8
Parents: 10ca7e4
Author: Zhao Yang <zh...@gmail.com>
Authored: Tue Nov 28 12:03:25 2017 +0800
Committer: Paulo Motta <pa...@apache.org>
Committed: Thu Dec 7 08:17:06 2017 +1100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/cql3/Attributes.java   |   6 +
 .../org/apache/cassandra/db/LivenessInfo.java   |  62 ++++-
 .../cassandra/db/view/ViewUpdateGenerator.java  |  11 +-
 .../apache/cassandra/schema/TableParams.java    |   4 +
 .../apache/cassandra/tools/JsonTransformer.java |   2 +-
 .../org/apache/cassandra/cql3/ViewLongTest.java | 228 +++++++++++++++++++
 .../cql3/validation/operations/TTLTest.java     | 104 +++++++++
 .../apache/cassandra/db/LivenessInfoTest.java   | 112 +++++++++
 9 files changed, 521 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/461af5b9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cf8883a..54a8538 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.16
+ * Fix updating base table rows with TTL not removing materialized view entries (CASSANDRA-14071)
  * Reduce garbage created by DynamicSnitch (CASSANDRA-14091)
  * More frequent commitlog chained markers (CASSANDRA-13987)
  * Fix serialized size of DataLimits (CASSANDRA-14057)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/461af5b9/src/java/org/apache/cassandra/cql3/Attributes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Attributes.java b/src/java/org/apache/cassandra/cql3/Attributes.java
index e1d2522..4ed0f83 100644
--- a/src/java/org/apache/cassandra/cql3/Attributes.java
+++ b/src/java/org/apache/cassandra/cql3/Attributes.java
@@ -36,6 +36,12 @@ import org.apache.cassandra.utils.ByteBufferUtil;
  */
 public class Attributes
 {
+    /**
+     * If this limit is ever raised, make sure @{@link Integer#MAX_VALUE} is not allowed,
+     * as this is used as a flag to represent expired liveness.
+     *
+     * See {@link org.apache.cassandra.db.LivenessInfo#EXPIRED_LIVENESS_TTL}
+     */
     public static final int MAX_TTL = 20 * 365 * 24 * 60 * 60; // 20 years in seconds
 
     private final Term timestamp;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/461af5b9/src/java/org/apache/cassandra/db/LivenessInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LivenessInfo.java b/src/java/org/apache/cassandra/db/LivenessInfo.java
index ab61a23..89e0578 100644
--- a/src/java/org/apache/cassandra/db/LivenessInfo.java
+++ b/src/java/org/apache/cassandra/db/LivenessInfo.java
@@ -41,6 +41,13 @@ public class LivenessInfo
 {
     public static final long NO_TIMESTAMP = Long.MIN_VALUE;
     public static final int NO_TTL = 0;
+    /**
+     * Used as flag for representing an expired liveness.
+     *
+     * TTL per request is at most 20 yrs, so this shouldn't conflict
+     * (See {@link org.apache.cassandra.cql3.Attributes#MAX_TTL})
+     */
+    public static final int EXPIRED_LIVENESS_TTL = Integer.MAX_VALUE;
     public static final int NO_EXPIRATION_TIME = Integer.MAX_VALUE;
 
     public static final LivenessInfo EMPTY = new LivenessInfo(NO_TIMESTAMP);
@@ -63,6 +70,7 @@ public class LivenessInfo
 
     public static LivenessInfo expiring(long timestamp, int ttl, int nowInSec)
     {
+        assert ttl != EXPIRED_LIVENESS_TTL;
         return new ExpiringLivenessInfo(timestamp, ttl, nowInSec + ttl);
     }
 
@@ -77,6 +85,8 @@ public class LivenessInfo
     // Use when you know that's what you want.
     public static LivenessInfo create(long timestamp, int ttl, int localExpirationTime)
     {
+        if (ttl == EXPIRED_LIVENESS_TTL)
+            return new ExpiredLivenessInfo(timestamp, ttl, localExpirationTime);
         return ttl == NO_TTL ? new LivenessInfo(timestamp) : new ExpiringLivenessInfo(timestamp, ttl, localExpirationTime);
     }
 
@@ -178,11 +188,15 @@ public class LivenessInfo
      *
      * </br>
      *
-     * If timestamps are the same, livenessInfo with greater TTL supersedes another.
+     * If timestamps are the same and none of them are expired livenessInfo,
+     * livenessInfo with greater TTL supersedes another. It also means, if timestamps are the same,
+     * ttl superseders no-ttl. This is the same rule as {@link Conflicts#resolveRegular}
      *
-     * It also means, if timestamps are the same, ttl superseders no-ttl.
+     * If timestamps are the same and one of them is expired livenessInfo. Expired livenessInfo
+     * supersedes, ie. tombstone supersedes.
      *
-     * This is the same rule as {@link Conflicts#resolveRegular}
+     * If timestamps are the same and both of them are expired livenessInfo(Ideally it shouldn't happen),
+     * greater localDeletionTime wins.
      *
      * @param other
      *            the {@code LivenessInfo} to compare this info to.
@@ -193,11 +207,18 @@ public class LivenessInfo
     {
         if (timestamp != other.timestamp)
             return timestamp > other.timestamp;
+        if (isExpired() ^ other.isExpired())
+            return isExpired();
         if (isExpiring() == other.isExpiring())
             return localExpirationTime() > other.localExpirationTime();
         return isExpiring();
     }
 
+    protected boolean isExpired()
+    {
+        return false;
+    }
+
     /**
      * Returns a copy of this liveness info updated with the provided timestamp.
      *
@@ -235,6 +256,41 @@ public class LivenessInfo
         return Objects.hash(timestamp(), ttl(), localExpirationTime());
     }
 
+    /**
+     * Effectively acts as a PK tombstone. This is used for Materialized Views to shadow
+     * updated entries while co-existing with row tombstones.
+     *
+     * See {@link org.apache.cassandra.db.view.ViewUpdateGenerator#deleteOldEntryInternal}.
+     */
+    private static class ExpiredLivenessInfo extends ExpiringLivenessInfo
+    {
+        private ExpiredLivenessInfo(long timestamp, int ttl, int localExpirationTime)
+        {
+            super(timestamp, ttl, localExpirationTime);
+            assert ttl == EXPIRED_LIVENESS_TTL;
+            assert timestamp != NO_TIMESTAMP;
+        }
+
+        @Override
+        public boolean isExpired()
+        {
+            return true;
+        }
+
+        @Override
+        public boolean isLive(int nowInSec)
+        {
+            // used as tombstone to shadow entire PK
+            return false;
+        }
+
+        @Override
+        public LivenessInfo withUpdatedTimestamp(long newTimestamp)
+        {
+            return new ExpiredLivenessInfo(newTimestamp, ttl(), localExpirationTime());
+        }
+    }
+
     private static class ExpiringLivenessInfo extends LivenessInfo
     {
         private final int ttl;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/461af5b9/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
index 341c511..74d3e52 100644
--- a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
+++ b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
@@ -25,7 +25,6 @@ import com.google.common.collect.PeekingIterator;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.ViewDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
@@ -404,11 +403,13 @@ public class ViewUpdateGenerator
         if (timestamp > rowDeletion)
         {
             /**
-              * TODO: This is a hack and overload of LivenessInfo and we should probably modify
-              * the storage engine to properly support this, but on the meantime this
-              * should be fine because it only happens in some specific scenarios explained above.
+              * We use an expired liveness instead of a row tombstone to allow a shadowed MV
+              * entry to co-exist with a row tombstone, see ViewComplexTest#testCommutativeRowDeletion.
+              *
+              * TODO This is a dirty overload of LivenessInfo and we should modify
+              * the storage engine to properly support this on CASSANDRA-13826.
               */
-            LivenessInfo info = LivenessInfo.create(timestamp, Integer.MAX_VALUE, nowInSec);
+            LivenessInfo info = LivenessInfo.create(timestamp, LivenessInfo.EXPIRED_LIVENESS_TTL, nowInSec);
             currentViewEntryBuilder.addPrimaryKeyLivenessInfo(info);
         }
         currentViewEntryBuilder.addRowDeletion(mergedBaseRow.deletion());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/461af5b9/src/java/org/apache/cassandra/schema/TableParams.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/TableParams.java b/src/java/org/apache/cassandra/schema/TableParams.java
index 29d3e29..dfa8603 100644
--- a/src/java/org/apache/cassandra/schema/TableParams.java
+++ b/src/java/org/apache/cassandra/schema/TableParams.java
@@ -24,6 +24,7 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableMap;
 
+import org.apache.cassandra.cql3.Attributes;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.utils.BloomCalculations;
 import static java.lang.String.format;
@@ -166,6 +167,9 @@ public final class TableParams
         if (defaultTimeToLive < 0)
             fail("%s must be greater than or equal to 0 (got %s)", Option.DEFAULT_TIME_TO_LIVE, defaultTimeToLive);
 
+        if (defaultTimeToLive > Attributes.MAX_TTL)
+            fail("%s must be less than or equal to %d (got %s)", Option.DEFAULT_TIME_TO_LIVE, Attributes.MAX_TTL, defaultTimeToLive);
+
         if (gcGraceSeconds < 0)
             fail("%s must be greater than or equal to 0 (got %s)", Option.GC_GRACE_SECONDS, gcGraceSeconds);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/461af5b9/src/java/org/apache/cassandra/tools/JsonTransformer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/JsonTransformer.java b/src/java/org/apache/cassandra/tools/JsonTransformer.java
index 5c32035..c679fc3 100644
--- a/src/java/org/apache/cassandra/tools/JsonTransformer.java
+++ b/src/java/org/apache/cassandra/tools/JsonTransformer.java
@@ -347,7 +347,7 @@ public final class JsonTransformer
                 }
                 else
                 {
-                    json.writeString(column.cellValueType().getString(clustering.get(i)));
+                    json.writeRawValue(column.cellValueType().toJSONString(clustering.get(i), Server.CURRENT_VERSION));
                 }
             }
             json.writeEndArray();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/461af5b9/test/long/org/apache/cassandra/cql3/ViewLongTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/cql3/ViewLongTest.java b/test/long/org/apache/cassandra/cql3/ViewLongTest.java
index 3808b73..68931e2 100644
--- a/test/long/org/apache/cassandra/cql3/ViewLongTest.java
+++ b/test/long/org/apache/cassandra/cql3/ViewLongTest.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.cql3;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -35,7 +36,9 @@ import com.datastax.driver.core.exceptions.WriteTimeoutException;
 import org.apache.cassandra.concurrent.SEPExecutor;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.batchlog.BatchlogManager;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 public class ViewLongTest extends CQLTester
@@ -185,4 +188,229 @@ public class ViewLongTest extends CQLTester
             throw new AssertionError(String.format("Single row had c = %d, expected %d", rows.get(0).getInt("c"), value));
         }
     }
+
+    @Test
+    public void testExpiredLivenessInfoWithDefaultTTLWithFlush() throws Throwable
+    {
+        testExpiredLivenessInfoWithDefaultTTL(true);
+    }
+
+    @Test
+    public void testExpiredLivenessInfoWithDefaultTTLWithoutFlush() throws Throwable
+    {
+        testExpiredLivenessInfoWithDefaultTTL(false);
+    }
+
+    private void testExpiredLivenessInfoWithDefaultTTL(boolean flush) throws Throwable
+    {
+        createTable("CREATE TABLE %s (field1 int,field2 int,date int,PRIMARY KEY ((field1), field2)) WITH default_time_to_live = 5;");
+
+        execute("USE " + keyspace());
+        executeNet(protocolVersion, "USE " + keyspace());
+
+        createView("mv",
+                   "CREATE MATERIALIZED VIEW mv AS SELECT * FROM %%s WHERE field1 IS NOT NULL AND field2 IS NOT NULL AND date IS NOT NULL PRIMARY KEY ((field1), date, field2) WITH CLUSTERING ORDER BY (date desc, field2 asc);");
+
+        updateViewWithFlush("insert into %s (field1, field2, date) values (1, 2, 111);", flush);
+        assertRows(execute("select * from %s"), row(1, 2, 111));
+        assertRows(execute("select * from mv"), row(1, 111, 2));
+
+        updateViewWithFlush("insert into %s (field1, field2, date) values (1, 2, 222);", flush);
+        assertRows(execute("select * from %s"), row(1, 2, 222));
+        assertRows(execute("select * from mv"), row(1, 222, 2));
+
+        updateViewWithFlush("insert into %s (field1, field2, date) values (1, 2, 333);", flush);
+
+        assertRows(execute("select * from %s"), row(1, 2, 333));
+        assertRows(execute("select * from mv"), row(1, 333, 2));
+
+        if (flush)
+        {
+            Keyspace.open(keyspace()).getColumnFamilyStore("mv").forceMajorCompaction();
+            assertRows(execute("select * from %s"), row(1, 2, 333));
+            assertRows(execute("select * from mv"), row(1, 333, 2));
+        }
+
+        // wait for ttl, data should be removed
+        updateViewWithFlush("insert into %s (field1, field2, date) values (1, 2, 444);", flush);
+        assertRows(execute("select * from %s"), row(1, 2, 444));
+        assertRows(execute("select * from mv"), row(1, 444, 2));
+
+        Thread.sleep(5000);
+        assertRows(execute("select * from %s"));
+        assertRows(execute("select * from mv"));
+
+        // shadow mv with date=555 and then update it back to live, wait for ttl
+        updateView("update %s set date=555 where field1=1 and field2=2;");
+        updateView("update %s set date=666 where field1=1 and field2=2;");
+        updateViewWithFlush("update %s set date=555 where field1=1 and field2=2;", flush);
+        assertRows(execute("select * from %s"), row(1, 2, 555));
+        assertRows(execute("select * from mv"), row(1, 555, 2));
+
+        Thread.sleep(5000);
+        assertRows(execute("select * from %s"));
+        assertRows(execute("select * from mv"));
+
+        // test user-provided ttl for table with/without default-ttl
+        for (boolean withDefaultTTL : Arrays.asList(true, false))
+        {
+            execute("TRUNCATE %s");
+            if (withDefaultTTL)
+                execute("ALTER TABLE %s with default_time_to_live=" + (withDefaultTTL ? 10 : 0));
+            updateViewWithFlush("insert into %s (field1, field2, date) values (1, 2, 666) USING TTL 1000;", flush);
+
+            assertRows(execute("select * from %s"), row(1, 2, 666));
+            assertRows(execute("select * from mv"), row(1, 666, 2));
+
+            updateViewWithFlush("insert into %s (field1, field2, date) values (1, 2, 777) USING TTL 1100;", flush);
+            assertRows(execute("select * from %s"), row(1, 2, 777));
+            assertRows(execute("select * from mv"), row(1, 777, 2));
+
+            updateViewWithFlush("insert into %s (field1, field2, date) values (1, 2, 888) USING TTL 800;", flush);
+
+            assertRows(execute("select * from %s"), row(1, 2, 888));
+            assertRows(execute("select * from mv"), row(1, 888, 2));
+
+            if (flush)
+            {
+                Keyspace.open(keyspace()).getColumnFamilyStore("mv").forceMajorCompaction();
+                assertRows(execute("select * from %s"), row(1, 2, 888));
+                assertRows(execute("select * from mv"), row(1, 888, 2));
+            }
+
+            // wait for ttl, data should be removed
+            updateViewWithFlush("insert into %s (field1, field2, date) values (1, 2, 999) USING TTL 5;", flush);
+            assertRows(execute("select * from %s"), row(1, 2, 999));
+            assertRows(execute("select * from mv"), row(1, 999, 2));
+
+            Thread.sleep(5000);
+            assertRows(execute("select * from %s"));
+            assertRows(execute("select * from mv"));
+
+            // shadow mv with date=555 and then update it back to live with ttl=5, wait for ttl to expire
+            updateViewWithFlush("update %s  USING TTL 800 set date=555 where field1=1 and field2=2;", flush);
+            assertRows(execute("select * from %s"), row(1, 2, 555));
+            assertRows(execute("select * from mv"), row(1, 555, 2));
+
+            updateViewWithFlush("update %s set date=666 where field1=1 and field2=2;", flush);
+            assertRows(execute("select * from %s"), row(1, 2, 666));
+            assertRows(execute("select * from mv"), row(1, 666, 2));
+
+            updateViewWithFlush("update %s USING TTL 5 set date=555 where field1=1 and field2=2;", flush);
+            assertRows(execute("select * from %s"), row(1, 2, 555));
+            assertRows(execute("select * from mv"), row(1, 555, 2));
+
+            Thread.sleep(5000);
+            assertRows(execute("select * from %s"));
+            assertRows(execute("select * from mv"));
+        }
+    }
+
+    @Test
+    public void testExpiredLivenessInfoWithUnselectedColumnAndDefaultTTLWithFlush() throws Throwable
+    {
+        testExpiredLivenessInfoWithUnselectedColumnAndDefaultTTL(true);
+    }
+
+    @Test
+    public void testExpiredLivenessInfoWithUnselectedColumnAndDefaultTTLWithoutFlush() throws Throwable
+    {
+        testExpiredLivenessInfoWithUnselectedColumnAndDefaultTTL(false);
+    }
+
+    private void testExpiredLivenessInfoWithUnselectedColumnAndDefaultTTL(boolean flush) throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int,c int,a int, b int, PRIMARY KEY ((k), c)) WITH default_time_to_live = 1000;");
+
+        execute("USE " + keyspace());
+        executeNet(protocolVersion, "USE " + keyspace());
+
+        createView("mv",
+                   "CREATE MATERIALIZED VIEW mv AS SELECT k,c,a FROM %%s WHERE k IS NOT NULL AND c IS NOT NULL "
+                           + "PRIMARY KEY (c, k)");
+
+        // table default ttl
+        updateViewWithFlush("UPDATE %s SET b = 111 WHERE k = 1 AND c = 2", flush);
+        assertRows(execute("select k,c,a,b from %s"), row(1, 2, null, 111));
+        assertRows(execute("select k,c,a from mv"), row(1, 2, null));
+
+        updateViewWithFlush("UPDATE %s SET b = null WHERE k = 1 AND c = 2", flush);
+        assertRows(execute("select k,c,a,b from %s"));
+        assertRows(execute("select k,c,a from mv"));
+
+        updateViewWithFlush("UPDATE %s SET b = 222 WHERE k = 1 AND c = 2", flush);
+        assertRows(execute("select k,c,a,b from %s"), row(1, 2, null, 222));
+        assertRows(execute("select k,c,a from mv"), row(1, 2, null));
+
+        updateViewWithFlush("DELETE b FROM %s WHERE k = 1 AND c = 2", flush);
+        assertRows(execute("select k,c,a,b from %s"));
+        assertRows(execute("select k,c,a from mv"));
+
+        if (flush)
+        {
+            Keyspace.open(keyspace()).getColumnFamilyStore("mv").forceMajorCompaction();
+            assertRows(execute("select k,c,a,b from %s"));
+            assertRows(execute("select k,c,a from mv"));
+        }
+
+        // test user-provided ttl for table with/without default-ttl
+        for (boolean withDefaultTTL : Arrays.asList(true, false))
+        {
+            execute("TRUNCATE %s");
+            if (withDefaultTTL)
+                execute("ALTER TABLE %s with default_time_to_live=" + (withDefaultTTL ? 10 : 0));
+
+            updateViewWithFlush("UPDATE %s USING TTL 100 SET b = 666 WHERE k = 1 AND c = 2", flush);
+            assertRows(execute("select k,c,a,b from %s"), row(1, 2, null, 666));
+            assertRows(execute("select k,c,a from mv"), row(1, 2, null));
+
+            updateViewWithFlush("UPDATE %s USING TTL 90  SET b = null WHERE k = 1 AND c = 2", flush);
+            if (flush)
+                FBUtilities.waitOnFutures(Keyspace.open(keyspace()).flush());
+            assertRows(execute("select k,c,a,b from %s"));
+            assertRows(execute("select k,c,a from mv"));
+
+            updateViewWithFlush("UPDATE %s USING TTL 80  SET b = 777 WHERE k = 1 AND c = 2", flush);
+            assertRows(execute("select k,c,a,b from %s"), row(1, 2, null, 777));
+            assertRows(execute("select k,c,a from mv"), row(1, 2, null));
+
+            updateViewWithFlush("DELETE b FROM %s WHERE k = 1 AND c = 2", flush);
+            assertRows(execute("select k,c,a,b from %s"));
+            assertRows(execute("select k,c,a from mv"));
+
+            updateViewWithFlush("UPDATE %s USING TTL 110  SET b = 888 WHERE k = 1 AND c = 2", flush);
+            assertRows(execute("select k,c,a,b from %s"), row(1, 2, null, 888));
+            assertRows(execute("select k,c,a from mv"), row(1, 2, null));
+
+            updateViewWithFlush("UPDATE %s USING TTL 5  SET b = 999 WHERE k = 1 AND c = 2", flush);
+            assertRows(execute("select k,c,a,b from %s"), row(1, 2, null, 999));
+            assertRows(execute("select k,c,a from mv"), row(1, 2, null));
+
+            Thread.sleep(5000); // wait for ttl expired
+
+            if (flush)
+            {
+                Keyspace.open(keyspace()).getColumnFamilyStore("mv").forceMajorCompaction();
+                assertRows(execute("select k,c,a,b from %s"));
+                assertRows(execute("select k,c,a from mv"));
+            }
+        }
+    }
+
+    private void updateView(String query, Object... params) throws Throwable
+    {
+        updateViewWithFlush(query, false, params);
+    }
+
+    private void updateViewWithFlush(String query, boolean flush, Object... params) throws Throwable
+    {
+        executeNet(protocolVersion, query, params);
+        while (!(((SEPExecutor) StageManager.getStage(Stage.VIEW_MUTATION)).getPendingTasks() == 0
+                && ((SEPExecutor) StageManager.getStage(Stage.VIEW_MUTATION)).getActiveCount() == 0))
+        {
+            Thread.sleep(1);
+        }
+        if (flush)
+            Keyspace.open(keyspace()).flush();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/461af5b9/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java
new file mode 100644
index 0000000..9f375d4
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java
@@ -0,0 +1,104 @@
+package org.apache.cassandra.cql3.validation.operations;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.cassandra.cql3.Attributes;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.junit.Test;
+
+public class TTLTest extends CQLTester
+{
+
+    @Test
+    public void testTTLPerRequestLimit() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, i int)");
+        // insert
+        execute("INSERT INTO %s (k, i) VALUES (1, 1) USING TTL ?", Attributes.MAX_TTL); // max ttl
+        int ttl = execute("SELECT ttl(i) FROM %s").one().getInt("ttl(i)");
+        assertTrue(ttl > Attributes.MAX_TTL - 10);
+
+        try
+        {
+            execute("INSERT INTO %s (k, i) VALUES (1, 1) USING TTL ?", Attributes.MAX_TTL + 1);
+            fail("Expect InvalidRequestException");
+        }
+        catch (InvalidRequestException e)
+        {
+            assertTrue(e.getMessage().contains("ttl is too large."));
+        }
+
+        try
+        {
+            execute("INSERT INTO %s (k, i) VALUES (1, 1) USING TTL ?", -1);
+            fail("Expect InvalidRequestException");
+        }
+        catch (InvalidRequestException e)
+        {
+            assertTrue(e.getMessage().contains("A TTL must be greater or equal to 0, but was -1"));
+        }
+        execute("TRUNCATE %s");
+
+        // update
+        execute("UPDATE %s USING TTL ? SET i = 1 WHERE k = 2", Attributes.MAX_TTL); // max ttl
+        ttl = execute("SELECT ttl(i) FROM %s").one().getInt("ttl(i)");
+        assertTrue(ttl > Attributes.MAX_TTL - 10);
+
+        try
+        {
+            execute("UPDATE %s USING TTL ? SET i = 1 WHERE k = 2", Attributes.MAX_TTL + 1);
+            fail("Expect InvalidRequestException");
+        }
+        catch (InvalidRequestException e)
+        {
+            assertTrue(e.getMessage().contains("ttl is too large."));
+        }
+
+        try
+        {
+            execute("UPDATE %s USING TTL ? SET i = 1 WHERE k = 2", -1);
+            fail("Expect InvalidRequestException");
+        }
+        catch (InvalidRequestException e)
+        {
+            assertTrue(e.getMessage().contains("A TTL must be greater or equal to 0, but was -1"));
+        }
+    }
+
+    @Test
+    public void testTTLDefaultLimit() throws Throwable
+    {
+        try
+        {
+            createTable("CREATE TABLE %s (k int PRIMARY KEY, i int) WITH default_time_to_live=-1");
+            fail("Expect Invalid schema");
+        }
+        catch (RuntimeException e)
+        {
+            assertTrue(e.getCause()
+                        .getMessage()
+                        .contains("default_time_to_live must be greater than or equal to 0 (got -1)"));
+        }
+        try
+        {
+            createTable("CREATE TABLE %s (k int PRIMARY KEY, i int) WITH default_time_to_live="
+                    + (Attributes.MAX_TTL + 1));
+            fail("Expect Invalid schema");
+        }
+        catch (RuntimeException e)
+        {
+            assertTrue(e.getCause()
+                        .getMessage()
+                        .contains("default_time_to_live must be less than or equal to " + Attributes.MAX_TTL + " (got "
+                                + (Attributes.MAX_TTL + 1) + ")"));
+        }
+
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, i int) WITH default_time_to_live=" + Attributes.MAX_TTL);
+        execute("INSERT INTO %s (k, i) VALUES (1, 1)");
+        int ttl = execute("SELECT ttl(i) FROM %s").one().getInt("ttl(i)");
+        assertTrue(ttl > 10000 - 10); // within 10 second
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/461af5b9/test/unit/org/apache/cassandra/db/LivenessInfoTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/LivenessInfoTest.java b/test/unit/org/apache/cassandra/db/LivenessInfoTest.java
new file mode 100644
index 0000000..b08023c
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/LivenessInfoTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import static org.junit.Assert.*;
+
+import org.apache.cassandra.utils.FBUtilities;
+
+import org.junit.Test;
+
+public class LivenessInfoTest
+{
+    @Test
+    public void testSupersedes()
+    {
+        LivenessInfo first;
+        LivenessInfo second;
+        int nowInSeconds = FBUtilities.nowInSeconds();
+
+        // timestamp supersedes for normal liveness info
+        first = LivenessInfo.create(100, 0, nowInSeconds);
+        second = LivenessInfo.create(101, 0, nowInSeconds);
+        assertSupersedes(second, first);
+
+        // timestamp supersedes for ttl
+        first = LivenessInfo.create(100, 0, nowInSeconds);
+        second = LivenessInfo.expiring(99, 1, nowInSeconds);
+        assertSupersedes(first, second);
+
+        // timestamp supersedes for mv expired liveness
+        first = LivenessInfo.create(100, 0, nowInSeconds);
+        second = LivenessInfo.create(99, LivenessInfo.EXPIRED_LIVENESS_TTL, nowInSeconds);
+        assertSupersedes(first, second);
+
+        // timestamp ties, ttl supersedes non-ttl
+        first = LivenessInfo.expiring(100, 1, nowInSeconds);
+        second = LivenessInfo.create(100, 0, nowInSeconds);
+        assertSupersedes(first, second);
+
+        // timestamp ties, greater localDeletionTime supersedes
+        first = LivenessInfo.expiring(100, 2, nowInSeconds);
+        second = LivenessInfo.expiring(100, 1, nowInSeconds);
+        assertSupersedes(first, second);
+
+        first = LivenessInfo.expiring(100, 5, nowInSeconds - 4);
+        second = LivenessInfo.expiring(100, 2, nowInSeconds);
+        assertSupersedes(second, first);
+
+        // timestamp ties, mv expired liveness supersedes normal ttl
+        first = LivenessInfo.create(100, LivenessInfo.EXPIRED_LIVENESS_TTL, nowInSeconds);
+        second = LivenessInfo.expiring(100, 1000, nowInSeconds);
+        assertSupersedes(first, second);
+
+        // timestamp ties, mv expired liveness supersedes non-ttl
+        first = LivenessInfo.create(100, LivenessInfo.EXPIRED_LIVENESS_TTL, nowInSeconds);
+        second = LivenessInfo.create(100, 0, nowInSeconds);
+        assertSupersedes(first, second);
+
+        // timestamp ties, both are mv expired liveness, local deletion time win
+        first = LivenessInfo.create(100, LivenessInfo.EXPIRED_LIVENESS_TTL, nowInSeconds + 1);
+        second = LivenessInfo.create(100, LivenessInfo.EXPIRED_LIVENESS_TTL, nowInSeconds);
+        assertSupersedes(first, second);
+    }
+
+    @Test
+    public void testIsLive()
+    {
+        int nowInSeconds = FBUtilities.nowInSeconds();
+
+        assertIsLive(LivenessInfo.create(100, 0, nowInSeconds), nowInSeconds - 3, true);
+        assertIsLive(LivenessInfo.create(100, 0, nowInSeconds), nowInSeconds, true);
+        assertIsLive(LivenessInfo.create(100, 0, nowInSeconds), nowInSeconds + 3, true);
+
+        assertIsLive(LivenessInfo.expiring(100, 2, nowInSeconds), nowInSeconds - 3, true);
+        assertIsLive(LivenessInfo.expiring(100, 2, nowInSeconds), nowInSeconds, true);
+        assertIsLive(LivenessInfo.expiring(100, 2, nowInSeconds), nowInSeconds + 3, false);
+
+        assertIsLive(LivenessInfo.create(100, LivenessInfo.EXPIRED_LIVENESS_TTL, nowInSeconds), nowInSeconds - 3, false);
+        assertIsLive(LivenessInfo.create(100, LivenessInfo.EXPIRED_LIVENESS_TTL, nowInSeconds), nowInSeconds, false);
+        assertIsLive(LivenessInfo.create(100, LivenessInfo.EXPIRED_LIVENESS_TTL, nowInSeconds), nowInSeconds + 3, false);
+    }
+
+    /**
+     * left supersedes right, right doesn't supersede left.
+     */
+    private static void assertSupersedes(LivenessInfo left, LivenessInfo right)
+    {
+        assertTrue(left.supersedes(right));
+        assertFalse(right.supersedes(left));
+    }
+
+    private static void assertIsLive(LivenessInfo info, int nowInSec, boolean alive)
+    {
+        assertEquals(info.isLive(nowInSec), alive);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org