You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/02/06 08:50:44 UTC
[1/2] git commit: Correctly handle null in conditions with TTL
Updated Branches:
refs/heads/trunk fc01759f4 -> 7ce5e062e
Correctly handle null in conditions with TTL
patch by slebresne; reviewed by iamaleksey for CASSANDRA-6623
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e59ef16b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e59ef16b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e59ef16b
Branch: refs/heads/trunk
Commit: e59ef16bfcb3bd019202fc12bedeb04302066540
Parents: 58e9481
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Feb 6 08:36:12 2014 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Feb 6 08:36:12 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cql3/statements/ModificationStatement.java | 109 +++++++++++++++----
.../apache/cassandra/service/CASConditions.java | 38 +++++++
.../apache/cassandra/service/StorageProxy.java | 75 ++-----------
.../cassandra/thrift/CassandraServer.java | 64 ++++++++++-
5 files changed, 197 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e59ef16b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bba5f20..7ba8044 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.6
+ * Correctly handle null with IF conditions and TTL (CASSANDRA-6623)
Merged from 1.2:
* Fix partition and range deletes not triggering flush (CASSANDRA-6655)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e59ef16b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index c0bf428..2567043 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -27,19 +27,20 @@ import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.db.filter.SliceQueryFilter;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.marshal.ListType;
import org.apache.cassandra.db.marshal.BooleanType;
import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.service.CASConditions;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.thrift.ThriftValidation;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.ByteBufferUtil;
/*
* Abstract parent class of individual modifications, i.e. INSERT, UPDATE and DELETE.
@@ -415,16 +416,17 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
UpdateParameters updParams = new UpdateParameters(cfm, variables, queryState.getTimestamp(), getTimeToLive(variables), null);
ColumnFamily updates = updateForKey(key, clusteringPrefix, updParams);
- // When building the conditions, we should not use the TTL. It's not useful, and if a very low ttl (1 seconds) is used, it's possible
- // for it to expire before actually build the conditions which would break since we would then test for the presence of tombstones.
- UpdateParameters condParams = new UpdateParameters(cfm, variables, queryState.getTimestamp(), 0, null);
- ColumnFamily expected = buildConditions(key, clusteringPrefix, condParams);
+ // It's cleaner to use the query timestamp below, but it's in seconds while the conditions expects microseconds, so just
+ // put it back in millis (we don't really lose precision because the ultimate consumer, Column.isLive, re-divide it).
+ long now = queryState.getTimestamp() * 1000;
+ CASConditions conditions = ifNotExists
+ ? new NotExistCondition(clusteringPrefix, now)
+ : new ColumnsConditions(clusteringPrefix, cfm, key, columnConditions, variables, now);
ColumnFamily result = StorageProxy.cas(keyspace(),
columnFamily(),
key,
- clusteringPrefix,
- expected,
+ conditions,
updates,
options.getSerialConsistency(),
options.getConsistency());
@@ -542,28 +544,91 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
return isCounter() ? new CounterMutation(rm, cl) : rm;
}
- private ColumnFamily buildConditions(ByteBuffer key, ColumnNameBuilder clusteringPrefix, UpdateParameters params)
- throws InvalidRequestException
+ private static abstract class CQL3CasConditions implements CASConditions
{
- if (ifNotExists)
- return null;
+ protected final ColumnNameBuilder rowPrefix;
+ protected final long now;
- ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(cfm);
+ protected CQL3CasConditions(ColumnNameBuilder rowPrefix, long now)
+ {
+ this.rowPrefix = rowPrefix;
+ this.now = now;
+ }
- // CQL row marker
- CFDefinition cfDef = cfm.getCfDef();
- if (cfDef.isComposite && !cfDef.isCompact && !cfm.isSuper())
+ public IDiskAtomFilter readFilter()
{
- ByteBuffer name = clusteringPrefix.copy().add(ByteBufferUtil.EMPTY_BYTE_BUFFER).build();
- cf.addColumn(params.makeColumn(name, ByteBufferUtil.EMPTY_BYTE_BUFFER));
+ // We always read the row entirely as on CAS failure we want to be able to distinguish between "row exists
+ // but all values on why there were conditions are null" and "row doesn't exists", and we can't rely on the
+ // row marker for that (see #6623)
+ return new SliceQueryFilter(rowPrefix.build(), rowPrefix.buildAsEndOfRange(), false, 1, rowPrefix.componentCount());
}
+ }
- // Conditions
- for (Operation condition : columnConditions)
- condition.execute(key, cf, clusteringPrefix.copy(), params);
+ private static class NotExistCondition extends CQL3CasConditions
+ {
+ private NotExistCondition(ColumnNameBuilder rowPrefix, long now)
+ {
+ super(rowPrefix, now);
+ }
+
+ public boolean appliesTo(ColumnFamily current)
+ {
+ return current == null || current.hasOnlyTombstones(now);
+ }
+ }
+
+ private static class ColumnsConditions extends CQL3CasConditions
+ {
+ private final ColumnFamily expected;
+
+ private ColumnsConditions(ColumnNameBuilder rowPrefix,
+ CFMetaData cfm,
+ ByteBuffer key,
+ Collection<Operation> conditions,
+ List<ByteBuffer> variables,
+ long now) throws InvalidRequestException
+ {
+ super(rowPrefix, now);
+ this.expected = TreeMapBackedSortedColumns.factory.create(cfm);
- assert !cf.isEmpty();
- return cf;
+ // When building the conditions, we should not use a TTL. It's not useful, and if a very low ttl (1 seconds) is used, it's possible
+ // for it to expire before the actual build of the conditions which would break since we would then testing for the presence of tombstones.
+ UpdateParameters params = new UpdateParameters(cfm, variables, now, 0, null);
+
+ // Conditions
+ for (Operation condition : conditions)
+ condition.execute(key, expected, rowPrefix.copy(), params);
+ }
+
+ public boolean appliesTo(ColumnFamily current)
+ {
+ if (current == null)
+ return false;
+
+ for (Column e : expected)
+ {
+ Column c = current.getColumn(e.name());
+ if (e.isLive(now))
+ {
+ if (c == null || !c.isLive(now) || !c.value().equals(e.value()))
+ return false;
+ }
+ else
+ {
+ // If we have a tombstone in expected, it means the condition tests that the column is
+ // null, so check that we have no value
+ if (c != null && c.isLive(now))
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public String toString()
+ {
+ return expected.toString();
+ }
}
public static abstract class Parsed extends CFStatement
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e59ef16b/src/java/org/apache/cassandra/service/CASConditions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CASConditions.java b/src/java/org/apache/cassandra/service/CASConditions.java
new file mode 100644
index 0000000..d4b3e19
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/CASConditions.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.filter.IDiskAtomFilter;
+
+/**
+ * Abstract the conditions to be fulfilled by a CAS operation.
+ */
+public interface CASConditions
+{
+ /**
+ * The filter to use to fetch the value to compare for the CAS.
+ */
+ public IDiskAtomFilter readFilter();
+
+ /**
+ * Returns whether the provided CF, that represents the values fetched using the
+ * readFilter(), match the CAS conditions this object stands for.
+ */
+ public boolean appliesTo(ColumnFamily current);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e59ef16b/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 5671655..8d1f913 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -41,11 +41,8 @@ import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.cql3.ColumnNameBuilder;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.filter.NamesQueryFilter;
-import org.apache.cassandra.db.filter.SliceQueryFilter;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Bounds;
@@ -157,7 +154,7 @@ public class StorageProxy implements StorageProxyMBean
/**
* Apply @param updates if and only if the current values in the row for @param key
- * match the ones given by @param expected. The algorithm is "raw" Paxos: that is, Paxos
+ * match the provided @param conditions. The algorithm is "raw" Paxos: that is, Paxos
* minus leader election -- any node in the cluster may propose changes for any row,
* which (that is, the row) is the unit of values being proposed, not single columns.
*
@@ -189,23 +186,18 @@ public class StorageProxy implements StorageProxyMBean
* @param keyspaceName the keyspace for the CAS
* @param cfName the column family for the CAS
* @param key the row key for the row to CAS
- * @param prefix a column name prefix that selects the CQL3 row to check if {@code expected} is null. If {@code expected}
- * is not null, this is ignored. If {@code expected} is null and this is null, the full row existing is checked (by querying
- * the first live column of the row).
- * @param expected the expected column values. This can be null to check for existence (see {@code prefix}).
- * @param updates the value to insert if {@code expected matches the current values}.
+ * @param conditions the conditions for the CAS to apply.
+ * @param updates the value to insert if {@code condtions} matches the current values.
* @param consistencyForPaxos the consistency for the paxos prepare and propose round. This can only be either SERIAL or LOCAL_SERIAL.
* @param consistencyForCommit the consistency for write done during the commit phase. This can be anything, except SERIAL or LOCAL_SERIAL.
*
- * @return null if the operation succeeds in updating the row, or the current values for the columns contained in
- * expected (since, if the CAS doesn't succeed, it means the current value do not match the one in expected). If
- * expected == null and the CAS is unsuccessfull, the first live column of the CF is returned.
+ * @return null if the operation succeeds in updating the row, or the current values corresponding to conditions.
+ * (since, if the CAS doesn't succeed, it means the current value do not match the conditions).
*/
public static ColumnFamily cas(String keyspaceName,
String cfName,
ByteBuffer key,
- ColumnNameBuilder prefix,
- ColumnFamily expected,
+ CASConditions conditions,
ColumnFamily updates,
ConsistencyLevel consistencyForPaxos,
ConsistencyLevel consistencyForCommit)
@@ -227,27 +219,15 @@ public class StorageProxy implements StorageProxyMBean
UUID ballot = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos);
- // read the current value and compare with expected
+ // read the current values and check they validate the conditions
Tracing.trace("Reading existing values for CAS precondition");
long timestamp = System.currentTimeMillis();
- ReadCommand readCommand;
- if (expected == null || expected.isEmpty())
- {
- SliceQueryFilter filter = prefix == null
- ? new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1)
- : new SliceQueryFilter(prefix.build(), prefix.buildAsEndOfRange(), false, 1, prefix.componentCount());
- readCommand = new SliceFromReadCommand(keyspaceName, key, cfName, timestamp, filter);
- }
- else
- {
- assert !expected.isEmpty();
- readCommand = new SliceByNamesReadCommand(keyspaceName, key, cfName, timestamp, new NamesQueryFilter(ImmutableSortedSet.copyOf(metadata.comparator, expected.getColumnNames())));
- }
+ ReadCommand readCommand = ReadCommand.create(keyspaceName, key, cfName, timestamp, conditions.readFilter());
List<Row> rows = read(Arrays.asList(readCommand), consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM);
ColumnFamily current = rows.get(0).cf;
- if (!casApplies(expected, current))
+ if (!conditions.appliesTo(current))
{
- Tracing.trace("CAS precondition {} does not match current values {}", expected, current);
+ Tracing.trace("CAS precondition {} does not match current values {}", conditions, current);
// We should not return null as this means success
return current == null ? EmptyColumns.factory.create(metadata) : current;
}
@@ -274,41 +254,6 @@ public class StorageProxy implements StorageProxyMBean
throw new WriteTimeoutException(WriteType.CAS, consistencyForPaxos, 0, consistencyForPaxos.blockFor(Keyspace.open(keyspaceName)));
}
- private static boolean hasLiveColumns(ColumnFamily cf, long now)
- {
- return cf != null && !cf.hasOnlyTombstones(now);
- }
-
- private static boolean casApplies(ColumnFamily expected, ColumnFamily current)
- {
- long now = System.currentTimeMillis();
-
- if (!hasLiveColumns(expected, now))
- return !hasLiveColumns(current, now);
- else if (!hasLiveColumns(current, now))
- return false;
-
- // current has been built from expected, so we know that it can't have columns
- // that excepted don't have. So we just check that for each columns in expected:
- // - if it is a tombstone, whether current has no column or a tombstone;
- // - otherwise, that current has a live column with the same value.
- for (Column e : expected)
- {
- Column c = current.getColumn(e.name());
- if (e.isLive(now))
- {
- if (!(c != null && c.isLive(now) && c.value().equals(e.value())))
- return false;
- }
- else
- {
- if (c != null && c.isLive(now))
- return false;
- }
- }
- return true;
- }
-
private static Predicate<InetAddress> sameDCPredicateFor(final String dc)
{
final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e59ef16b/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index beaae78..ef5eeb8 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -30,6 +30,7 @@ import java.util.zip.Inflater;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -59,6 +60,7 @@ import org.apache.cassandra.locator.DynamicEndpointSnitch;
import org.apache.cassandra.metrics.ClientMetrics;
import org.apache.cassandra.scheduler.IRequestScheduler;
import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.service.CASConditions;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.MigrationManager;
import org.apache.cassandra.service.StorageProxy;
@@ -768,8 +770,7 @@ public class CassandraServer implements Cassandra.Iface
ColumnFamily result = StorageProxy.cas(cState.getKeyspace(),
column_family,
key,
- null,
- cfExpected,
+ new ThriftCASConditions(cfExpected),
cfUpdates,
ThriftConversion.fromThrift(serial_consistency_level),
ThriftConversion.fromThrift(commit_consistency_level));
@@ -2158,5 +2159,62 @@ public class CassandraServer implements Cassandra.Iface
}
});
}
- // main method moved to CassandraDaemon
+
+ private static class ThriftCASConditions implements CASConditions
+ {
+ private final ColumnFamily expected;
+
+ private ThriftCASConditions(ColumnFamily expected)
+ {
+ this.expected = expected;
+ }
+
+ public IDiskAtomFilter readFilter()
+ {
+ return expected == null || expected.isEmpty()
+ ? new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1)
+ : new NamesQueryFilter(ImmutableSortedSet.copyOf(expected.getComparator(), expected.getColumnNames()));
+ }
+
+ public boolean appliesTo(ColumnFamily current)
+ {
+ long now = System.currentTimeMillis();
+
+ if (!hasLiveColumns(expected, now))
+ return !hasLiveColumns(current, now);
+ else if (!hasLiveColumns(current, now))
+ return false;
+
+ // current has been built from expected, so we know that it can't have columns
+ // that excepted don't have. So we just check that for each columns in expected:
+ // - if it is a tombstone, whether current has no column or a tombstone;
+ // - otherwise, that current has a live column with the same value.
+ for (org.apache.cassandra.db.Column e : expected)
+ {
+ org.apache.cassandra.db.Column c = current.getColumn(e.name());
+ if (e.isLive(now))
+ {
+ if (c == null || !c.isLive(now) || !c.value().equals(e.value()))
+ return false;
+ }
+ else
+ {
+ if (c != null && c.isLive(now))
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static boolean hasLiveColumns(ColumnFamily cf, long now)
+ {
+ return cf != null && !cf.hasOnlyTombstones(now);
+ }
+
+ @Override
+ public String toString()
+ {
+ return expected.toString();
+ }
+ }
}
[2/2] git commit: Merge branch 'cassandra-2.0' into trunk
Posted by sl...@apache.org.
Merge branch 'cassandra-2.0' into trunk
Conflicts:
src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
src/java/org/apache/cassandra/service/StorageProxy.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7ce5e062
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7ce5e062
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7ce5e062
Branch: refs/heads/trunk
Commit: 7ce5e062ed602dd1c9593a03b554c11ff3cc52d5
Parents: fc01759 e59ef16
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Thu Feb 6 08:50:34 2014 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Feb 6 08:50:34 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cql3/statements/ModificationStatement.java | 109 +++++++++++++++----
.../apache/cassandra/service/CASConditions.java | 38 +++++++
.../apache/cassandra/service/StorageProxy.java | 76 ++-----------
.../cassandra/thrift/CassandraServer.java | 65 ++++++++++-
5 files changed, 200 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ce5e062/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index a139fdc,7ba8044..657bf9e
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,34 -1,5 +1,35 @@@
+2.1
+ * add listsnapshots command to nodetool (CASSANDRA-5742)
+ * Introduce AtomicBTreeColumns (CASSANDRA-6271)
+ * Multithreaded commitlog (CASSANDRA-3578)
+ * allocate fixed index summary memory pool and resample cold index summaries
+ to use less memory (CASSANDRA-5519)
+ * Removed multithreaded compaction (CASSANDRA-6142)
+ * Parallelize fetching rows for low-cardinality indexes (CASSANDRA-1337)
+ * change logging from log4j to logback (CASSANDRA-5883)
+ * switch to LZ4 compression for internode communication (CASSANDRA-5887)
+ * Stop using Thrift-generated Index* classes internally (CASSANDRA-5971)
+ * Remove 1.2 network compatibility code (CASSANDRA-5960)
+ * Remove leveled json manifest migration code (CASSANDRA-5996)
+ * Remove CFDefinition (CASSANDRA-6253)
+ * Use AtomicIntegerFieldUpdater in RefCountedMemory (CASSANDRA-6278)
+ * User-defined types for CQL3 (CASSANDRA-5590)
+ * Use of o.a.c.metrics in nodetool (CASSANDRA-5871, 6406)
+ * Batch read from OTC's queue and cleanup (CASSANDRA-1632)
+ * Secondary index support for collections (CASSANDRA-4511, 6383)
+ * SSTable metadata(Stats.db) format change (CASSANDRA-6356)
+ * Push composites support in the storage engine
+ (CASSANDRA-5417, CASSANDRA-6520)
+ * Add snapshot space used to cfstats (CASSANDRA-6231)
+ * Add cardinality estimator for key count estimation (CASSANDRA-5906)
+ * CF id is changed to be non-deterministic. Data dir/key cache are created
+ uniquely for CF id (CASSANDRA-5202)
+ * New counters implementation (CASSANDRA-6504)
+ * Replace UnsortedColumns usage with ArrayBackedSortedColumns (CASSANDRA-6630)
+
+
2.0.6
+ * Correctly handle null with IF conditions and TTL (CASSANDRA-6623)
Merged from 1.2:
* Fix partition and range deletes not triggering flush (CASSANDRA-6655)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ce5e062/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index e551187,2567043..f35690b
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@@ -24,13 -24,13 +24,14 @@@ import org.github.jamm.MemoryMeter
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.CBuilder;
+import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.db.filter.ColumnSlice;
+ import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.db.filter.SliceQueryFilter;
-import org.apache.cassandra.db.marshal.CompositeType;
-import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.marshal.ListType;
import org.apache.cassandra.db.marshal.BooleanType;
import org.apache.cassandra.exceptions.*;
@@@ -533,29 -539,96 +535,96 @@@ public abstract class ModificationState
}
else
{
- rm = new RowMutation(cfm.ksName, key, cf);
+ mutation = new Mutation(cfm.ksName, key, cf);
}
- return isCounter() ? new CounterMutation(rm, cl) : rm;
+ return isCounter() ? new CounterMutation(mutation, cl) : mutation;
}
- private ColumnFamily buildConditions(ByteBuffer key, Composite clusteringPrefix, UpdateParameters params)
- throws InvalidRequestException
+ private static abstract class CQL3CasConditions implements CASConditions
{
- if (ifNotExists)
- return null;
- protected final ColumnNameBuilder rowPrefix;
++ protected final Composite rowPrefix;
+ protected final long now;
- ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(cfm);
- protected CQL3CasConditions(ColumnNameBuilder rowPrefix, long now)
++ protected CQL3CasConditions(Composite rowPrefix, long now)
+ {
+ this.rowPrefix = rowPrefix;
+ this.now = now;
+ }
- // CQL row marker
- if (cfm.isCQL3Table())
- cf.addColumn(params.makeColumn(cfm.comparator.rowMarker(clusteringPrefix), ByteBufferUtil.EMPTY_BYTE_BUFFER));
+ public IDiskAtomFilter readFilter()
+ {
+ // We always read the row entirely as on CAS failure we want to be able to distinguish between "row exists
+ // but all values on why there were conditions are null" and "row doesn't exists", and we can't rely on the
+ // row marker for that (see #6623)
- return new SliceQueryFilter(rowPrefix.build(), rowPrefix.buildAsEndOfRange(), false, 1, rowPrefix.componentCount());
++ return new SliceQueryFilter(rowPrefix.slice(), false, 1, rowPrefix.size());
+ }
+ }
- // Conditions
- for (Operation condition : columnConditions)
- condition.execute(key, cf, clusteringPrefix, params);
+ private static class NotExistCondition extends CQL3CasConditions
+ {
- private NotExistCondition(ColumnNameBuilder rowPrefix, long now)
++ private NotExistCondition(Composite rowPrefix, long now)
+ {
+ super(rowPrefix, now);
+ }
+
+ public boolean appliesTo(ColumnFamily current)
+ {
+ return current == null || current.hasOnlyTombstones(now);
+ }
+ }
+
+ private static class ColumnsConditions extends CQL3CasConditions
+ {
+ private final ColumnFamily expected;
- assert !cf.isEmpty();
- return cf;
- private ColumnsConditions(ColumnNameBuilder rowPrefix,
++ private ColumnsConditions(Composite rowPrefix,
+ CFMetaData cfm,
+ ByteBuffer key,
+ Collection<Operation> conditions,
+ List<ByteBuffer> variables,
+ long now) throws InvalidRequestException
+ {
+ super(rowPrefix, now);
+ this.expected = TreeMapBackedSortedColumns.factory.create(cfm);
+
+ // When building the conditions, we should not use a TTL. It's not useful, and if a very low ttl (1 seconds) is used, it's possible
+ // for it to expire before the actual build of the conditions which would break since we would then testing for the presence of tombstones.
+ UpdateParameters params = new UpdateParameters(cfm, variables, now, 0, null);
+
+ // Conditions
+ for (Operation condition : conditions)
- condition.execute(key, expected, rowPrefix.copy(), params);
++ condition.execute(key, expected, rowPrefix, params);
+ }
+
+ public boolean appliesTo(ColumnFamily current)
+ {
+ if (current == null)
+ return false;
+
- for (Column e : expected)
++ for (Cell e : expected)
+ {
- Column c = current.getColumn(e.name());
++ Cell c = current.getColumn(e.name());
+ if (e.isLive(now))
+ {
+ if (c == null || !c.isLive(now) || !c.value().equals(e.value()))
+ return false;
+ }
+ else
+ {
+ // If we have a tombstone in expected, it means the condition tests that the column is
+ // null, so check that we have no value
+ if (c != null && c.isLive(now))
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public String toString()
+ {
+ return expected.toString();
+ }
}
public static abstract class Parsed extends CFStatement
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ce5e062/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 6e8d231,8d1f913..69d7123
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -43,12 -43,6 +43,8 @@@ import org.apache.cassandra.config.Data
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.Keyspace;
- import org.apache.cassandra.db.composites.Composite;
- import org.apache.cassandra.db.filter.ColumnSlice;
- import org.apache.cassandra.db.filter.NamesQueryFilter;
- import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.db.index.SecondaryIndexSearcher;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Bounds;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ce5e062/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/thrift/CassandraServer.java
index 44fb22e,ef5eeb8..650c74e
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@@ -47,8 -47,7 +48,9 @@@ import org.apache.cassandra.cql.CQLStat
import org.apache.cassandra.cql.QueryProcessor;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.composites.*;
import org.apache.cassandra.db.context.CounterContext;
++import org.apache.cassandra.db.filter.ColumnSlice;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.db.filter.NamesQueryFilter;
import org.apache.cassandra.db.filter.SliceQueryFilter;
@@@ -2171,5 -2159,62 +2173,62 @@@ public class CassandraServer implement
}
});
}
- // main method moved to CassandraDaemon
+
+ private static class ThriftCASConditions implements CASConditions
+ {
+ private final ColumnFamily expected;
+
+ private ThriftCASConditions(ColumnFamily expected)
+ {
+ this.expected = expected;
+ }
+
+ public IDiskAtomFilter readFilter()
+ {
+ return expected == null || expected.isEmpty()
- ? new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 1)
++ ? new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, 1)
+ : new NamesQueryFilter(ImmutableSortedSet.copyOf(expected.getComparator(), expected.getColumnNames()));
+ }
+
+ public boolean appliesTo(ColumnFamily current)
+ {
+ long now = System.currentTimeMillis();
+
- if (!hasLiveColumns(expected, now))
- return !hasLiveColumns(current, now);
- else if (!hasLiveColumns(current, now))
++ if (!hasLiveCells(expected, now))
++ return !hasLiveCells(current, now);
++ else if (!hasLiveCells(current, now))
+ return false;
+
+ // current has been built from expected, so we know that it can't have columns
+ // that excepted don't have. So we just check that for each columns in expected:
+ // - if it is a tombstone, whether current has no column or a tombstone;
+ // - otherwise, that current has a live column with the same value.
- for (org.apache.cassandra.db.Column e : expected)
++ for (Cell e : expected)
+ {
- org.apache.cassandra.db.Column c = current.getColumn(e.name());
++ Cell c = current.getColumn(e.name());
+ if (e.isLive(now))
+ {
+ if (c == null || !c.isLive(now) || !c.value().equals(e.value()))
+ return false;
+ }
+ else
+ {
+ if (c != null && c.isLive(now))
+ return false;
+ }
+ }
+ return true;
+ }
+
- private static boolean hasLiveColumns(ColumnFamily cf, long now)
++ private static boolean hasLiveCells(ColumnFamily cf, long now)
+ {
+ return cf != null && !cf.hasOnlyTombstones(now);
+ }
+
+ @Override
+ public String toString()
+ {
+ return expected.toString();
+ }
+ }
}