You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/02/06 08:39:54 UTC
git commit: Correctly handle null in conditions with TTL
Updated Branches:
refs/heads/cassandra-2.0 58e948185 -> e59ef16bf
Correctly handle null in conditions with TTL
patch by slebresne; reviewed by iamaleksey for CASSANDRA-6623
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e59ef16b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e59ef16b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e59ef16b
Branch: refs/heads/cassandra-2.0
Commit: e59ef16bfcb3bd019202fc12bedeb04302066540
Parents: 58e9481
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Feb 6 08:36:12 2014 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Feb 6 08:36:12 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cql3/statements/ModificationStatement.java | 109 +++++++++++++++----
.../apache/cassandra/service/CASConditions.java | 38 +++++++
.../apache/cassandra/service/StorageProxy.java | 75 ++-----------
.../cassandra/thrift/CassandraServer.java | 64 ++++++++++-
5 files changed, 197 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e59ef16b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bba5f20..7ba8044 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.6
+ * Correctly handle null with IF conditions and TTL (CASSANDRA-6623)
Merged from 1.2:
* Fix partition and range deletes not triggering flush (CASSANDRA-6655)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e59ef16b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index c0bf428..2567043 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -27,19 +27,20 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.db.filter.SliceQueryFilter;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.marshal.ListType;
import org.apache.cassandra.db.marshal.BooleanType;
import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.service.CASConditions;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.thrift.ThriftValidation;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.ByteBufferUtil;
/*
* Abstract parent class of individual modifications, i.e. INSERT, UPDATE and DELETE.
@@ -415,16 +416,17 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
UpdateParameters updParams = new UpdateParameters(cfm, variables, queryState.getTimestamp(), getTimeToLive(variables), null);
ColumnFamily updates = updateForKey(key, clusteringPrefix, updParams);
- // When building the conditions, we should not use the TTL. It's not useful, and if a very low ttl (1 seconds) is used, it's possible
- // for it to expire before actually build the conditions which would break since we would then test for the presence of tombstones.
- UpdateParameters condParams = new UpdateParameters(cfm, variables, queryState.getTimestamp(), 0, null);
- ColumnFamily expected = buildConditions(key, clusteringPrefix, condParams);
+ // It's cleaner to use the query timestamp below, but it's in seconds while the conditions expects microseconds, so just
+ // put it back in millis (we don't really lose precision because the ultimate consumer, Column.isLive, re-divide it).
+ long now = queryState.getTimestamp() * 1000;
+ CASConditions conditions = ifNotExists
+ ? new NotExistCondition(clusteringPrefix, now)
+ : new ColumnsConditions(clusteringPrefix, cfm, key, columnConditions, variables, now);
ColumnFamily result = StorageProxy.cas(keyspace(),
columnFamily(),
key,
- clusteringPrefix,
- expected,
+ conditions,
updates,
options.getSerialConsistency(),
options.getConsistency());
@@ -542,28 +544,91 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
return isCounter() ? new CounterMutation(rm, cl) : rm;
}
- private ColumnFamily buildConditions(ByteBuffer key, ColumnNameBuilder clusteringPrefix, UpdateParameters params)
- throws InvalidRequestException
+ private static abstract class CQL3CasConditions implements CASConditions
{
- if (ifNotExists)
- return null;
+ protected final ColumnNameBuilder rowPrefix;
+ protected final long now;
- ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(cfm);
+ protected CQL3CasConditions(ColumnNameBuilder rowPrefix, long now)
+ {
+ this.rowPrefix = rowPrefix;
+ this.now = now;
+ }
- // CQL row marker
- CFDefinition cfDef = cfm.getCfDef();
- if (cfDef.isComposite && !cfDef.isCompact && !cfm.isSuper())
+ public IDiskAtomFilter readFilter()
{
- ByteBuffer name = clusteringPrefix.copy().add(ByteBufferUtil.EMPTY_BYTE_BUFFER).build();
- cf.addColumn(params.makeColumn(name, ByteBufferUtil.EMPTY_BYTE_BUFFER));
+ // We always read the row entirely as on CAS failure we want to be able to distinguish between "row exists
+ // but all values on why there were conditions are null" and "row doesn't exists", and we can't rely on the
+ // row marker for that (see #6623)
+ return new SliceQueryFilter(rowPrefix.build(), rowPrefix.buildAsEndOfRange(), false, 1, rowPrefix.componentCount());
}
+ }
- // Conditions
- for (Operation condition : columnConditions)
- condition.execute(key, cf, clusteringPrefix.copy(), params);
+ private static class NotExistCondition extends CQL3CasConditions
+ {
+ private NotExistCondition(ColumnNameBuilder rowPrefix, long now)
+ {
+ super(rowPrefix, now);
+ }
+
+ public boolean appliesTo(ColumnFamily current)
+ {
+ return current == null || current.hasOnlyTombstones(now);
+ }
+ }
+
+ private static class ColumnsConditions extends CQL3CasConditions
+ {
+ private final ColumnFamily expected;
+
+ private ColumnsConditions(ColumnNameBuilder rowPrefix,
+ CFMetaData cfm,
+ ByteBuffer key,
+ Collection<Operation> conditions,
+ List<ByteBuffer> variables,
+ long now) throws InvalidRequestException
+ {
+ super(rowPrefix, now);
+ this.expected = TreeMapBackedSortedColumns.factory.create(cfm);
- assert !cf.isEmpty();
- return cf;
+ // When building the conditions, we should not use a TTL. It's not useful, and if a very low ttl (1 seconds) is used, it's possible
+ // for it to expire before the actual build of the conditions which would break since we would then testing for the presence of tombstones.
+ UpdateParameters params = new UpdateParameters(cfm, variables, now, 0, null);
+
+ // Conditions
+ for (Operation condition : conditions)
+ condition.execute(key, expected, rowPrefix.copy(), params);
+ }
+
+ public boolean appliesTo(ColumnFamily current)
+ {
+ if (current == null)
+ return false;
+
+ for (Column e : expected)
+ {
+ Column c = current.getColumn(e.name());
+ if (e.isLive(now))
+ {
+ if (c == null || !c.isLive(now) || !c.value().equals(e.value()))
+ return false;
+ }
+ else
+ {
+ // If we have a tombstone in expected, it means the condition tests that the column is
+ // null, so check that we have no value
+ if (c != null && c.isLive(now))
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public String toString()
+ {
+ return expected.toString();
+ }
}
public static abstract class Parsed extends CFStatement
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e59ef16b/src/java/org/apache/cassandra/service/CASConditions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CASConditions.java b/src/java/org/apache/cassandra/service/CASConditions.java
new file mode 100644
index 0000000..d4b3e19
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/CASConditions.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.filter.IDiskAtomFilter;
+
+/**
+ * Abstract the conditions to be fulfilled by a CAS operation.
+ */
+public interface CASConditions
+{
+ /**
+ * The filter to use to fetch the value to compare for the CAS.
+ */
+ public IDiskAtomFilter readFilter();
+
+ /**
+ * Returns whether the provided CF, that represents the values fetched using the
+ * readFilter(), match the CAS conditions this object stands for.
+ */
+ public boolean appliesTo(ColumnFamily current);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e59ef16b/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 5671655..8d1f913 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -41,11 +41,8 @@ import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.cql3.ColumnNameBuilder;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.filter.NamesQueryFilter;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Bounds;
@@ -157,7 +154,7 @@ public class StorageProxy implements StorageProxyMBean
/**
* Apply @param updates if and only if the current values in the row for @param key
- * match the ones given by @param expected. The algorithm is "raw" Paxos: that is, Paxos
+ * match the provided @param conditions. The algorithm is "raw" Paxos: that is, Paxos
* minus leader election -- any node in the cluster may propose changes for any row,
* which (that is, the row) is the unit of values being proposed, not single columns.
*
@@ -189,23 +186,18 @@ public class StorageProxy implements StorageProxyMBean
* @param keyspaceName the keyspace for the CAS
* @param cfName the column family for the CAS
* @param key the row key for the row to CAS
- * @param prefix a column name prefix that selects the CQL3 row to check if {@code expected} is null. If {@code expected}
- * is not null, this is ignored. If {@code expected} is null and this is null, the full row existing is checked (by querying
- * the first live column of the row).
- * @param expected the expected column values. This can be null to check for existence (see {@code prefix}).
- * @param updates the value to insert if {@code expected matches the current values}.
+ * @param conditions the conditions for the CAS to apply.
+ * @param updates the value to insert if {@code condtions} matches the current values.
* @param consistencyForPaxos the consistency for the paxos prepare and propose round. This can only be either SERIAL or LOCAL_SERIAL.
* @param consistencyForCommit the consistency for write done during the commit phase. This can be anything, except SERIAL or LOCAL_SERIAL.
*
- * @return null if the operation succeeds in updating the row, or the current values for the columns contained in
- * expected (since, if the CAS doesn't succeed, it means the current value do not match the one in expected). If
- * expected == null and the CAS is unsuccessfull, the first live column of the CF is returned.
+ * @return null if the operation succeeds in updating the row, or the current values corresponding to conditions.
+ * (since, if the CAS doesn't succeed, it means the current value do not match the conditions).
*/
public static ColumnFamily cas(String keyspaceName,
String cfName,
ByteBuffer key,
- ColumnNameBuilder prefix,
- ColumnFamily expected,
+ CASConditions conditions,
ColumnFamily updates,
ConsistencyLevel consistencyForPaxos,
ConsistencyLevel consistencyForCommit)
@@ -227,27 +219,15 @@ public class StorageProxy implements StorageProxyMBean
UUID ballot = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos);
- // read the current value and compare with expected
+ // read the current values and check they validate the conditions
Tracing.trace("Reading existing values for CAS precondition");
long timestamp = System.currentTimeMillis();
- ReadCommand readCommand;
- if (expected == null || expected.isEmpty())
- {
- SliceQueryFilter filter = prefix == null
- ? new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1)
- : new SliceQueryFilter(prefix.build(), prefix.buildAsEndOfRange(), false, 1, prefix.componentCount());
- readCommand = new SliceFromReadCommand(keyspaceName, key, cfName, timestamp, filter);
- }
- else
- {
- assert !expected.isEmpty();
- readCommand = new SliceByNamesReadCommand(keyspaceName, key, cfName, timestamp, new NamesQueryFilter(ImmutableSortedSet.copyOf(metadata.comparator, expected.getColumnNames())));
- }
+ ReadCommand readCommand = ReadCommand.create(keyspaceName, key, cfName, timestamp, conditions.readFilter());
List<Row> rows = read(Arrays.asList(readCommand), consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM);
ColumnFamily current = rows.get(0).cf;
- if (!casApplies(expected, current))
+ if (!conditions.appliesTo(current))
{
- Tracing.trace("CAS precondition {} does not match current values {}", expected, current);
+ Tracing.trace("CAS precondition {} does not match current values {}", conditions, current);
// We should not return null as this means success
return current == null ? EmptyColumns.factory.create(metadata) : current;
}
@@ -274,41 +254,6 @@ public class StorageProxy implements StorageProxyMBean
throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(Keyspace.open(keyspaceName)));
}
- private static boolean hasLiveColumns(ColumnFamily cf, long now)
- {
- return cf != null && !cf.hasOnlyTombstones(now);
- }
-
- private static boolean casApplies(ColumnFamily expected, ColumnFamily current)
- {
- long now = System.currentTimeMillis();
-
- if (!hasLiveColumns(expected, now))
- return !hasLiveColumns(current, now);
- else if (!hasLiveColumns(current, now))
- return false;
-
- // current has been built from expected, so we know that it can't have columns
- // that excepted don't have. So we just check that for each columns in expected:
- // - if it is a tombstone, whether current has no column or a tombstone;
- // - otherwise, that current has a live column with the same value.
- for (Column e : expected)
- {
- Column c = current.getColumn(e.name());
- if (e.isLive(now))
- {
- if (!(c != null && c.isLive(now) && c.value().equals(e.value())))
- return false;
- }
- else
- {
- if (c != null && c.isLive(now))
- return false;
- }
- }
- return true;
- }
-
private static Predicate<InetAddress> sameDCPredicateFor(final String dc)
{
final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e59ef16b/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index beaae78..ef5eeb8 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -30,6 +30,7 @@ import java.util.zip.Inflater;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -59,6 +60,7 @@ import org.apache.cassandra.locator.DynamicEndpointSnitch;
import org.apache.cassandra.metrics.ClientMetrics;
import org.apache.cassandra.scheduler.IRequestScheduler;
import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.service.CASConditions;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.MigrationManager;
import org.apache.cassandra.service.StorageProxy;
@@ -768,8 +770,7 @@ public class CassandraServer implements Cassandra.Iface
ColumnFamily result = StorageProxy.cas(cState.getKeyspace(),
column_family,
key,
- null,
- cfExpected,
+ new ThriftCASConditions(cfExpected),
cfUpdates,
ThriftConversion.fromThrift(serial_consistency_level),
ThriftConversion.fromThrift(commit_consistency_level));
@@ -2158,5 +2159,62 @@ public class CassandraServer implements Cassandra.Iface
}
});
}
- // main method moved to CassandraDaemon
+
+ private static class ThriftCASConditions implements CASConditions
+ {
+ private final ColumnFamily expected;
+
+ private ThriftCASConditions(ColumnFamily expected)
+ {
+ this.expected = expected;
+ }
+
+ public IDiskAtomFilter readFilter()
+ {
+ return expected == null || expected.isEmpty()
+ ? new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1)
+ : new NamesQueryFilter(ImmutableSortedSet.copyOf(expected.getComparator(), expected.getColumnNames()));
+ }
+
+ public boolean appliesTo(ColumnFamily current)
+ {
+ long now = System.currentTimeMillis();
+
+ if (!hasLiveColumns(expected, now))
+ return !hasLiveColumns(current, now);
+ else if (!hasLiveColumns(current, now))
+ return false;
+
+ // current has been built from expected, so we know that it can't have columns
+ // that excepted don't have. So we just check that for each columns in expected:
+ // - if it is a tombstone, whether current has no column or a tombstone;
+ // - otherwise, that current has a live column with the same value.
+ for (org.apache.cassandra.db.Column e : expected)
+ {
+ org.apache.cassandra.db.Column c = current.getColumn(e.name());
+ if (e.isLive(now))
+ {
+ if (c == null || !c.isLive(now) || !c.value().equals(e.value()))
+ return false;
+ }
+ else
+ {
+ if (c != null && c.isLive(now))
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static boolean hasLiveColumns(ColumnFamily cf, long now)
+ {
+ return cf != null && !cf.hasOnlyTombstones(now);
+ }
+
+ @Override
+ public String toString()
+ {
+ return expected.toString();
+ }
+ }
}