You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pa...@apache.org on 2017/12/06 21:25:25 UTC
[2/6] cassandra git commit: Fix updating base table rows with TTL not
removing materialized view entries
Fix updating base table rows with TTL not removing materialized view entries
Patch by Zhao Yang; Reviewed by Paulo Motta for CASSANDRA-14071
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/461af5b9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/461af5b9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/461af5b9
Branch: refs/heads/cassandra-3.11
Commit: 461af5b9a6f58b6ed3db78a879840816b906cac8
Parents: 10ca7e4
Author: Zhao Yang <zh...@gmail.com>
Authored: Tue Nov 28 12:03:25 2017 +0800
Committer: Paulo Motta <pa...@apache.org>
Committed: Thu Dec 7 08:17:06 2017 +1100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/cql3/Attributes.java | 6 +
.../org/apache/cassandra/db/LivenessInfo.java | 62 ++++-
.../cassandra/db/view/ViewUpdateGenerator.java | 11 +-
.../apache/cassandra/schema/TableParams.java | 4 +
.../apache/cassandra/tools/JsonTransformer.java | 2 +-
.../org/apache/cassandra/cql3/ViewLongTest.java | 228 +++++++++++++++++++
.../cql3/validation/operations/TTLTest.java | 104 +++++++++
.../apache/cassandra/db/LivenessInfoTest.java | 112 +++++++++
9 files changed, 521 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/461af5b9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cf8883a..54a8538 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.16
+ * Fix updating base table rows with TTL not removing materialized view entries (CASSANDRA-14071)
* Reduce garbage created by DynamicSnitch (CASSANDRA-14091)
* More frequent commitlog chained markers (CASSANDRA-13987)
* Fix serialized size of DataLimits (CASSANDRA-14057)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/461af5b9/src/java/org/apache/cassandra/cql3/Attributes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/Attributes.java b/src/java/org/apache/cassandra/cql3/Attributes.java
index e1d2522..4ed0f83 100644
--- a/src/java/org/apache/cassandra/cql3/Attributes.java
+++ b/src/java/org/apache/cassandra/cql3/Attributes.java
@@ -36,6 +36,12 @@ import org.apache.cassandra.utils.ByteBufferUtil;
*/
public class Attributes
{
+ /**
+ * If this limit is ever raised, make sure @{@link Integer#MAX_VALUE} is not allowed,
+ * as this is used as a flag to represent expired liveness.
+ *
+ * See {@link org.apache.cassandra.db.LivenessInfo#EXPIRED_LIVENESS_TTL}
+ */
public static final int MAX_TTL = 20 * 365 * 24 * 60 * 60; // 20 years in seconds
private final Term timestamp;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/461af5b9/src/java/org/apache/cassandra/db/LivenessInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LivenessInfo.java b/src/java/org/apache/cassandra/db/LivenessInfo.java
index ab61a23..89e0578 100644
--- a/src/java/org/apache/cassandra/db/LivenessInfo.java
+++ b/src/java/org/apache/cassandra/db/LivenessInfo.java
@@ -41,6 +41,13 @@ public class LivenessInfo
{
public static final long NO_TIMESTAMP = Long.MIN_VALUE;
public static final int NO_TTL = 0;
+ /**
+ * Used as flag for representing an expired liveness.
+ *
+ * TTL per request is at most 20 yrs, so this shouldn't conflict
+ * (See {@link org.apache.cassandra.cql3.Attributes#MAX_TTL})
+ */
+ public static final int EXPIRED_LIVENESS_TTL = Integer.MAX_VALUE;
public static final int NO_EXPIRATION_TIME = Integer.MAX_VALUE;
public static final LivenessInfo EMPTY = new LivenessInfo(NO_TIMESTAMP);
@@ -63,6 +70,7 @@ public class LivenessInfo
public static LivenessInfo expiring(long timestamp, int ttl, int nowInSec)
{
+ assert ttl != EXPIRED_LIVENESS_TTL;
return new ExpiringLivenessInfo(timestamp, ttl, nowInSec + ttl);
}
@@ -77,6 +85,8 @@ public class LivenessInfo
// Use when you know that's what you want.
public static LivenessInfo create(long timestamp, int ttl, int localExpirationTime)
{
+ if (ttl == EXPIRED_LIVENESS_TTL)
+ return new ExpiredLivenessInfo(timestamp, ttl, localExpirationTime);
return ttl == NO_TTL ? new LivenessInfo(timestamp) : new ExpiringLivenessInfo(timestamp, ttl, localExpirationTime);
}
@@ -178,11 +188,15 @@ public class LivenessInfo
*
* </br>
*
- * If timestamps are the same, livenessInfo with greater TTL supersedes another.
+ * If timestamps are the same and none of them are expired livenessInfo,
+ * livenessInfo with greater TTL supersedes another. It also means, if timestamps are the same,
+ * ttl superseders no-ttl. This is the same rule as {@link Conflicts#resolveRegular}
*
- * It also means, if timestamps are the same, ttl superseders no-ttl.
+ * If timestamps are the same and one of them is expired livenessInfo. Expired livenessInfo
+ * supersedes, ie. tombstone supersedes.
*
- * This is the same rule as {@link Conflicts#resolveRegular}
+ * If timestamps are the same and both of them are expired livenessInfo(Ideally it shouldn't happen),
+ * greater localDeletionTime wins.
*
* @param other
* the {@code LivenessInfo} to compare this info to.
@@ -193,11 +207,18 @@ public class LivenessInfo
{
if (timestamp != other.timestamp)
return timestamp > other.timestamp;
+ if (isExpired() ^ other.isExpired())
+ return isExpired();
if (isExpiring() == other.isExpiring())
return localExpirationTime() > other.localExpirationTime();
return isExpiring();
}
+ protected boolean isExpired()
+ {
+ return false;
+ }
+
/**
* Returns a copy of this liveness info updated with the provided timestamp.
*
@@ -235,6 +256,41 @@ public class LivenessInfo
return Objects.hash(timestamp(), ttl(), localExpirationTime());
}
+ /**
+ * Effectively acts as a PK tombstone. This is used for Materialized Views to shadow
+ * updated entries while co-existing with row tombstones.
+ *
+ * See {@link org.apache.cassandra.db.view.ViewUpdateGenerator#deleteOldEntryInternal}.
+ */
+ private static class ExpiredLivenessInfo extends ExpiringLivenessInfo
+ {
+ private ExpiredLivenessInfo(long timestamp, int ttl, int localExpirationTime)
+ {
+ super(timestamp, ttl, localExpirationTime);
+ assert ttl == EXPIRED_LIVENESS_TTL;
+ assert timestamp != NO_TIMESTAMP;
+ }
+
+ @Override
+ public boolean isExpired()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean isLive(int nowInSec)
+ {
+ // used as tombstone to shadow entire PK
+ return false;
+ }
+
+ @Override
+ public LivenessInfo withUpdatedTimestamp(long newTimestamp)
+ {
+ return new ExpiredLivenessInfo(newTimestamp, ttl(), localExpirationTime());
+ }
+ }
+
private static class ExpiringLivenessInfo extends LivenessInfo
{
private final int ttl;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/461af5b9/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
index 341c511..74d3e52 100644
--- a/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
+++ b/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
@@ -25,7 +25,6 @@ import com.google.common.collect.PeekingIterator;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.ViewDefinition;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.partitions.*;
@@ -404,11 +403,13 @@ public class ViewUpdateGenerator
if (timestamp > rowDeletion)
{
/**
- * TODO: This is a hack and overload of LivenessInfo and we should probably modify
- * the storage engine to properly support this, but on the meantime this
- * should be fine because it only happens in some specific scenarios explained above.
+ * We use an expired liveness instead of a row tombstone to allow a shadowed MV
+ * entry to co-exist with a row tombstone, see ViewComplexTest#testCommutativeRowDeletion.
+ *
+ * TODO This is a dirty overload of LivenessInfo and we should modify
+ * the storage engine to properly support this on CASSANDRA-13826.
*/
- LivenessInfo info = LivenessInfo.create(timestamp, Integer.MAX_VALUE, nowInSec);
+ LivenessInfo info = LivenessInfo.create(timestamp, LivenessInfo.EXPIRED_LIVENESS_TTL, nowInSec);
currentViewEntryBuilder.addPrimaryKeyLivenessInfo(info);
}
currentViewEntryBuilder.addRowDeletion(mergedBaseRow.deletion());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/461af5b9/src/java/org/apache/cassandra/schema/TableParams.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/TableParams.java b/src/java/org/apache/cassandra/schema/TableParams.java
index 29d3e29..dfa8603 100644
--- a/src/java/org/apache/cassandra/schema/TableParams.java
+++ b/src/java/org/apache/cassandra/schema/TableParams.java
@@ -24,6 +24,7 @@ import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableMap;
+import org.apache.cassandra.cql3.Attributes;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.utils.BloomCalculations;
import static java.lang.String.format;
@@ -166,6 +167,9 @@ public final class TableParams
if (defaultTimeToLive < 0)
fail("%s must be greater than or equal to 0 (got %s)", Option.DEFAULT_TIME_TO_LIVE, defaultTimeToLive);
+ if (defaultTimeToLive > Attributes.MAX_TTL)
+ fail("%s must be less than or equal to %d (got %s)", Option.DEFAULT_TIME_TO_LIVE, Attributes.MAX_TTL, defaultTimeToLive);
+
if (gcGraceSeconds < 0)
fail("%s must be greater than or equal to 0 (got %s)", Option.GC_GRACE_SECONDS, gcGraceSeconds);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/461af5b9/src/java/org/apache/cassandra/tools/JsonTransformer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/JsonTransformer.java b/src/java/org/apache/cassandra/tools/JsonTransformer.java
index 5c32035..c679fc3 100644
--- a/src/java/org/apache/cassandra/tools/JsonTransformer.java
+++ b/src/java/org/apache/cassandra/tools/JsonTransformer.java
@@ -347,7 +347,7 @@ public final class JsonTransformer
}
else
{
- json.writeString(column.cellValueType().getString(clustering.get(i)));
+ json.writeRawValue(column.cellValueType().toJSONString(clustering.get(i), Server.CURRENT_VERSION));
}
}
json.writeEndArray();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/461af5b9/test/long/org/apache/cassandra/cql3/ViewLongTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/cql3/ViewLongTest.java b/test/long/org/apache/cassandra/cql3/ViewLongTest.java
index 3808b73..68931e2 100644
--- a/test/long/org/apache/cassandra/cql3/ViewLongTest.java
+++ b/test/long/org/apache/cassandra/cql3/ViewLongTest.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.cql3;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -35,7 +36,9 @@ import com.datastax.driver.core.exceptions.WriteTimeoutException;
import org.apache.cassandra.concurrent.SEPExecutor;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.batchlog.BatchlogManager;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
public class ViewLongTest extends CQLTester
@@ -185,4 +188,229 @@ public class ViewLongTest extends CQLTester
throw new AssertionError(String.format("Single row had c = %d, expected %d", rows.get(0).getInt("c"), value));
}
}
+
+ @Test
+ public void testExpiredLivenessInfoWithDefaultTTLWithFlush() throws Throwable
+ {
+ testExpiredLivenessInfoWithDefaultTTL(true);
+ }
+
+ @Test
+ public void testExpiredLivenessInfoWithDefaultTTLWithoutFlush() throws Throwable
+ {
+ testExpiredLivenessInfoWithDefaultTTL(false);
+ }
+
+ private void testExpiredLivenessInfoWithDefaultTTL(boolean flush) throws Throwable
+ {
+ createTable("CREATE TABLE %s (field1 int,field2 int,date int,PRIMARY KEY ((field1), field2)) WITH default_time_to_live = 5;");
+
+ execute("USE " + keyspace());
+ executeNet(protocolVersion, "USE " + keyspace());
+
+ createView("mv",
+ "CREATE MATERIALIZED VIEW mv AS SELECT * FROM %%s WHERE field1 IS NOT NULL AND field2 IS NOT NULL AND date IS NOT NULL PRIMARY KEY ((field1), date, field2) WITH CLUSTERING ORDER BY (date desc, field2 asc);");
+
+ updateViewWithFlush("insert into %s (field1, field2, date) values (1, 2, 111);", flush);
+ assertRows(execute("select * from %s"), row(1, 2, 111));
+ assertRows(execute("select * from mv"), row(1, 111, 2));
+
+ updateViewWithFlush("insert into %s (field1, field2, date) values (1, 2, 222);", flush);
+ assertRows(execute("select * from %s"), row(1, 2, 222));
+ assertRows(execute("select * from mv"), row(1, 222, 2));
+
+ updateViewWithFlush("insert into %s (field1, field2, date) values (1, 2, 333);", flush);
+
+ assertRows(execute("select * from %s"), row(1, 2, 333));
+ assertRows(execute("select * from mv"), row(1, 333, 2));
+
+ if (flush)
+ {
+ Keyspace.open(keyspace()).getColumnFamilyStore("mv").forceMajorCompaction();
+ assertRows(execute("select * from %s"), row(1, 2, 333));
+ assertRows(execute("select * from mv"), row(1, 333, 2));
+ }
+
+ // wait for ttl, data should be removed
+ updateViewWithFlush("insert into %s (field1, field2, date) values (1, 2, 444);", flush);
+ assertRows(execute("select * from %s"), row(1, 2, 444));
+ assertRows(execute("select * from mv"), row(1, 444, 2));
+
+ Thread.sleep(5000);
+ assertRows(execute("select * from %s"));
+ assertRows(execute("select * from mv"));
+
+ // shadow mv with date=555 and then update it back to live, wait for ttl
+ updateView("update %s set date=555 where field1=1 and field2=2;");
+ updateView("update %s set date=666 where field1=1 and field2=2;");
+ updateViewWithFlush("update %s set date=555 where field1=1 and field2=2;", flush);
+ assertRows(execute("select * from %s"), row(1, 2, 555));
+ assertRows(execute("select * from mv"), row(1, 555, 2));
+
+ Thread.sleep(5000);
+ assertRows(execute("select * from %s"));
+ assertRows(execute("select * from mv"));
+
+ // test user-provided ttl for table with/without default-ttl
+ for (boolean withDefaultTTL : Arrays.asList(true, false))
+ {
+ execute("TRUNCATE %s");
+ if (withDefaultTTL)
+ execute("ALTER TABLE %s with default_time_to_live=" + (withDefaultTTL ? 10 : 0));
+ updateViewWithFlush("insert into %s (field1, field2, date) values (1, 2, 666) USING TTL 1000;", flush);
+
+ assertRows(execute("select * from %s"), row(1, 2, 666));
+ assertRows(execute("select * from mv"), row(1, 666, 2));
+
+ updateViewWithFlush("insert into %s (field1, field2, date) values (1, 2, 777) USING TTL 1100;", flush);
+ assertRows(execute("select * from %s"), row(1, 2, 777));
+ assertRows(execute("select * from mv"), row(1, 777, 2));
+
+ updateViewWithFlush("insert into %s (field1, field2, date) values (1, 2, 888) USING TTL 800;", flush);
+
+ assertRows(execute("select * from %s"), row(1, 2, 888));
+ assertRows(execute("select * from mv"), row(1, 888, 2));
+
+ if (flush)
+ {
+ Keyspace.open(keyspace()).getColumnFamilyStore("mv").forceMajorCompaction();
+ assertRows(execute("select * from %s"), row(1, 2, 888));
+ assertRows(execute("select * from mv"), row(1, 888, 2));
+ }
+
+ // wait for ttl, data should be removed
+ updateViewWithFlush("insert into %s (field1, field2, date) values (1, 2, 999) USING TTL 5;", flush);
+ assertRows(execute("select * from %s"), row(1, 2, 999));
+ assertRows(execute("select * from mv"), row(1, 999, 2));
+
+ Thread.sleep(5000);
+ assertRows(execute("select * from %s"));
+ assertRows(execute("select * from mv"));
+
+ // shadow mv with date=555 and then update it back to live with ttl=5, wait for ttl to expire
+ updateViewWithFlush("update %s USING TTL 800 set date=555 where field1=1 and field2=2;", flush);
+ assertRows(execute("select * from %s"), row(1, 2, 555));
+ assertRows(execute("select * from mv"), row(1, 555, 2));
+
+ updateViewWithFlush("update %s set date=666 where field1=1 and field2=2;", flush);
+ assertRows(execute("select * from %s"), row(1, 2, 666));
+ assertRows(execute("select * from mv"), row(1, 666, 2));
+
+ updateViewWithFlush("update %s USING TTL 5 set date=555 where field1=1 and field2=2;", flush);
+ assertRows(execute("select * from %s"), row(1, 2, 555));
+ assertRows(execute("select * from mv"), row(1, 555, 2));
+
+ Thread.sleep(5000);
+ assertRows(execute("select * from %s"));
+ assertRows(execute("select * from mv"));
+ }
+ }
+
+ @Test
+ public void testExpiredLivenessInfoWithUnselectedColumnAndDefaultTTLWithFlush() throws Throwable
+ {
+ testExpiredLivenessInfoWithUnselectedColumnAndDefaultTTL(true);
+ }
+
+ @Test
+ public void testExpiredLivenessInfoWithUnselectedColumnAndDefaultTTLWithoutFlush() throws Throwable
+ {
+ testExpiredLivenessInfoWithUnselectedColumnAndDefaultTTL(false);
+ }
+
+ private void testExpiredLivenessInfoWithUnselectedColumnAndDefaultTTL(boolean flush) throws Throwable
+ {
+ createTable("CREATE TABLE %s (k int,c int,a int, b int, PRIMARY KEY ((k), c)) WITH default_time_to_live = 1000;");
+
+ execute("USE " + keyspace());
+ executeNet(protocolVersion, "USE " + keyspace());
+
+ createView("mv",
+ "CREATE MATERIALIZED VIEW mv AS SELECT k,c,a FROM %%s WHERE k IS NOT NULL AND c IS NOT NULL "
+ + "PRIMARY KEY (c, k)");
+
+ // table default ttl
+ updateViewWithFlush("UPDATE %s SET b = 111 WHERE k = 1 AND c = 2", flush);
+ assertRows(execute("select k,c,a,b from %s"), row(1, 2, null, 111));
+ assertRows(execute("select k,c,a from mv"), row(1, 2, null));
+
+ updateViewWithFlush("UPDATE %s SET b = null WHERE k = 1 AND c = 2", flush);
+ assertRows(execute("select k,c,a,b from %s"));
+ assertRows(execute("select k,c,a from mv"));
+
+ updateViewWithFlush("UPDATE %s SET b = 222 WHERE k = 1 AND c = 2", flush);
+ assertRows(execute("select k,c,a,b from %s"), row(1, 2, null, 222));
+ assertRows(execute("select k,c,a from mv"), row(1, 2, null));
+
+ updateViewWithFlush("DELETE b FROM %s WHERE k = 1 AND c = 2", flush);
+ assertRows(execute("select k,c,a,b from %s"));
+ assertRows(execute("select k,c,a from mv"));
+
+ if (flush)
+ {
+ Keyspace.open(keyspace()).getColumnFamilyStore("mv").forceMajorCompaction();
+ assertRows(execute("select k,c,a,b from %s"));
+ assertRows(execute("select k,c,a from mv"));
+ }
+
+ // test user-provided ttl for table with/without default-ttl
+ for (boolean withDefaultTTL : Arrays.asList(true, false))
+ {
+ execute("TRUNCATE %s");
+ if (withDefaultTTL)
+ execute("ALTER TABLE %s with default_time_to_live=" + (withDefaultTTL ? 10 : 0));
+
+ updateViewWithFlush("UPDATE %s USING TTL 100 SET b = 666 WHERE k = 1 AND c = 2", flush);
+ assertRows(execute("select k,c,a,b from %s"), row(1, 2, null, 666));
+ assertRows(execute("select k,c,a from mv"), row(1, 2, null));
+
+ updateViewWithFlush("UPDATE %s USING TTL 90 SET b = null WHERE k = 1 AND c = 2", flush);
+ if (flush)
+ FBUtilities.waitOnFutures(Keyspace.open(keyspace()).flush());
+ assertRows(execute("select k,c,a,b from %s"));
+ assertRows(execute("select k,c,a from mv"));
+
+ updateViewWithFlush("UPDATE %s USING TTL 80 SET b = 777 WHERE k = 1 AND c = 2", flush);
+ assertRows(execute("select k,c,a,b from %s"), row(1, 2, null, 777));
+ assertRows(execute("select k,c,a from mv"), row(1, 2, null));
+
+ updateViewWithFlush("DELETE b FROM %s WHERE k = 1 AND c = 2", flush);
+ assertRows(execute("select k,c,a,b from %s"));
+ assertRows(execute("select k,c,a from mv"));
+
+ updateViewWithFlush("UPDATE %s USING TTL 110 SET b = 888 WHERE k = 1 AND c = 2", flush);
+ assertRows(execute("select k,c,a,b from %s"), row(1, 2, null, 888));
+ assertRows(execute("select k,c,a from mv"), row(1, 2, null));
+
+ updateViewWithFlush("UPDATE %s USING TTL 5 SET b = 999 WHERE k = 1 AND c = 2", flush);
+ assertRows(execute("select k,c,a,b from %s"), row(1, 2, null, 999));
+ assertRows(execute("select k,c,a from mv"), row(1, 2, null));
+
+ Thread.sleep(5000); // wait for ttl expired
+
+ if (flush)
+ {
+ Keyspace.open(keyspace()).getColumnFamilyStore("mv").forceMajorCompaction();
+ assertRows(execute("select k,c,a,b from %s"));
+ assertRows(execute("select k,c,a from mv"));
+ }
+ }
+ }
+
+ private void updateView(String query, Object... params) throws Throwable
+ {
+ updateViewWithFlush(query, false, params);
+ }
+
+ private void updateViewWithFlush(String query, boolean flush, Object... params) throws Throwable
+ {
+ executeNet(protocolVersion, query, params);
+ while (!(((SEPExecutor) StageManager.getStage(Stage.VIEW_MUTATION)).getPendingTasks() == 0
+ && ((SEPExecutor) StageManager.getStage(Stage.VIEW_MUTATION)).getActiveCount() == 0))
+ {
+ Thread.sleep(1);
+ }
+ if (flush)
+ Keyspace.open(keyspace()).flush();
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/461af5b9/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java
new file mode 100644
index 0000000..9f375d4
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java
@@ -0,0 +1,104 @@
+package org.apache.cassandra.cql3.validation.operations;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.cassandra.cql3.Attributes;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.junit.Test;
+
+public class TTLTest extends CQLTester
+{
+
+ @Test
+ public void testTTLPerRequestLimit() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, i int)");
+ // insert
+ execute("INSERT INTO %s (k, i) VALUES (1, 1) USING TTL ?", Attributes.MAX_TTL); // max ttl
+ int ttl = execute("SELECT ttl(i) FROM %s").one().getInt("ttl(i)");
+ assertTrue(ttl > Attributes.MAX_TTL - 10);
+
+ try
+ {
+ execute("INSERT INTO %s (k, i) VALUES (1, 1) USING TTL ?", Attributes.MAX_TTL + 1);
+ fail("Expect InvalidRequestException");
+ }
+ catch (InvalidRequestException e)
+ {
+ assertTrue(e.getMessage().contains("ttl is too large."));
+ }
+
+ try
+ {
+ execute("INSERT INTO %s (k, i) VALUES (1, 1) USING TTL ?", -1);
+ fail("Expect InvalidRequestException");
+ }
+ catch (InvalidRequestException e)
+ {
+ assertTrue(e.getMessage().contains("A TTL must be greater or equal to 0, but was -1"));
+ }
+ execute("TRUNCATE %s");
+
+ // update
+ execute("UPDATE %s USING TTL ? SET i = 1 WHERE k = 2", Attributes.MAX_TTL); // max ttl
+ ttl = execute("SELECT ttl(i) FROM %s").one().getInt("ttl(i)");
+ assertTrue(ttl > Attributes.MAX_TTL - 10);
+
+ try
+ {
+ execute("UPDATE %s USING TTL ? SET i = 1 WHERE k = 2", Attributes.MAX_TTL + 1);
+ fail("Expect InvalidRequestException");
+ }
+ catch (InvalidRequestException e)
+ {
+ assertTrue(e.getMessage().contains("ttl is too large."));
+ }
+
+ try
+ {
+ execute("UPDATE %s USING TTL ? SET i = 1 WHERE k = 2", -1);
+ fail("Expect InvalidRequestException");
+ }
+ catch (InvalidRequestException e)
+ {
+ assertTrue(e.getMessage().contains("A TTL must be greater or equal to 0, but was -1"));
+ }
+ }
+
+ @Test
+ public void testTTLDefaultLimit() throws Throwable
+ {
+ try
+ {
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, i int) WITH default_time_to_live=-1");
+ fail("Expect Invalid schema");
+ }
+ catch (RuntimeException e)
+ {
+ assertTrue(e.getCause()
+ .getMessage()
+ .contains("default_time_to_live must be greater than or equal to 0 (got -1)"));
+ }
+ try
+ {
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, i int) WITH default_time_to_live="
+ + (Attributes.MAX_TTL + 1));
+ fail("Expect Invalid schema");
+ }
+ catch (RuntimeException e)
+ {
+ assertTrue(e.getCause()
+ .getMessage()
+ .contains("default_time_to_live must be less than or equal to " + Attributes.MAX_TTL + " (got "
+ + (Attributes.MAX_TTL + 1) + ")"));
+ }
+
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, i int) WITH default_time_to_live=" + Attributes.MAX_TTL);
+ execute("INSERT INTO %s (k, i) VALUES (1, 1)");
+ int ttl = execute("SELECT ttl(i) FROM %s").one().getInt("ttl(i)");
+ assertTrue(ttl > 10000 - 10); // within 10 second
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/461af5b9/test/unit/org/apache/cassandra/db/LivenessInfoTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/LivenessInfoTest.java b/test/unit/org/apache/cassandra/db/LivenessInfoTest.java
new file mode 100644
index 0000000..b08023c
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/LivenessInfoTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import static org.junit.Assert.*;
+
+import org.apache.cassandra.utils.FBUtilities;
+
+import org.junit.Test;
+
+public class LivenessInfoTest
+{
+ @Test
+ public void testSupersedes()
+ {
+ LivenessInfo first;
+ LivenessInfo second;
+ int nowInSeconds = FBUtilities.nowInSeconds();
+
+ // timestamp supersedes for normal liveness info
+ first = LivenessInfo.create(100, 0, nowInSeconds);
+ second = LivenessInfo.create(101, 0, nowInSeconds);
+ assertSupersedes(second, first);
+
+ // timestamp supersedes for ttl
+ first = LivenessInfo.create(100, 0, nowInSeconds);
+ second = LivenessInfo.expiring(99, 1, nowInSeconds);
+ assertSupersedes(first, second);
+
+ // timestamp supersedes for mv expired liveness
+ first = LivenessInfo.create(100, 0, nowInSeconds);
+ second = LivenessInfo.create(99, LivenessInfo.EXPIRED_LIVENESS_TTL, nowInSeconds);
+ assertSupersedes(first, second);
+
+ // timestamp ties, ttl supersedes non-ttl
+ first = LivenessInfo.expiring(100, 1, nowInSeconds);
+ second = LivenessInfo.create(100, 0, nowInSeconds);
+ assertSupersedes(first, second);
+
+ // timestamp ties, greater localDeletionTime supersedes
+ first = LivenessInfo.expiring(100, 2, nowInSeconds);
+ second = LivenessInfo.expiring(100, 1, nowInSeconds);
+ assertSupersedes(first, second);
+
+ first = LivenessInfo.expiring(100, 5, nowInSeconds - 4);
+ second = LivenessInfo.expiring(100, 2, nowInSeconds);
+ assertSupersedes(second, first);
+
+ // timestamp ties, mv expired liveness supersedes normal ttl
+ first = LivenessInfo.create(100, LivenessInfo.EXPIRED_LIVENESS_TTL, nowInSeconds);
+ second = LivenessInfo.expiring(100, 1000, nowInSeconds);
+ assertSupersedes(first, second);
+
+ // timestamp ties, mv expired liveness supersedes non-ttl
+ first = LivenessInfo.create(100, LivenessInfo.EXPIRED_LIVENESS_TTL, nowInSeconds);
+ second = LivenessInfo.create(100, 0, nowInSeconds);
+ assertSupersedes(first, second);
+
+ // timestamp ties, both are mv expired liveness, local deletion time win
+ first = LivenessInfo.create(100, LivenessInfo.EXPIRED_LIVENESS_TTL, nowInSeconds + 1);
+ second = LivenessInfo.create(100, LivenessInfo.EXPIRED_LIVENESS_TTL, nowInSeconds);
+ assertSupersedes(first, second);
+ }
+
+ @Test
+ public void testIsLive()
+ {
+ int nowInSeconds = FBUtilities.nowInSeconds();
+
+ assertIsLive(LivenessInfo.create(100, 0, nowInSeconds), nowInSeconds - 3, true);
+ assertIsLive(LivenessInfo.create(100, 0, nowInSeconds), nowInSeconds, true);
+ assertIsLive(LivenessInfo.create(100, 0, nowInSeconds), nowInSeconds + 3, true);
+
+ assertIsLive(LivenessInfo.expiring(100, 2, nowInSeconds), nowInSeconds - 3, true);
+ assertIsLive(LivenessInfo.expiring(100, 2, nowInSeconds), nowInSeconds, true);
+ assertIsLive(LivenessInfo.expiring(100, 2, nowInSeconds), nowInSeconds + 3, false);
+
+ assertIsLive(LivenessInfo.create(100, LivenessInfo.EXPIRED_LIVENESS_TTL, nowInSeconds), nowInSeconds - 3, false);
+ assertIsLive(LivenessInfo.create(100, LivenessInfo.EXPIRED_LIVENESS_TTL, nowInSeconds), nowInSeconds, false);
+ assertIsLive(LivenessInfo.create(100, LivenessInfo.EXPIRED_LIVENESS_TTL, nowInSeconds), nowInSeconds + 3, false);
+ }
+
+ /**
+ * left supersedes right, right doesn't supersede left.
+ */
+ private static void assertSupersedes(LivenessInfo left, LivenessInfo right)
+ {
+ assertTrue(left.supersedes(right));
+ assertFalse(right.supersedes(left));
+ }
+
+ private static void assertIsLive(LivenessInfo info, int nowInSec, boolean alive)
+ {
+ assertEquals(info.isLive(nowInSec), alive);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org