You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/02/12 00:05:24 UTC
[1/5] git commit: Fix CleanupTest
Updated Branches:
refs/heads/trunk f007a3535 -> aac421aaa
Fix CleanupTest
patch by marcuse, reviewed by jbellis for CASSANDRA-6679
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1b8b7b54
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1b8b7b54
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1b8b7b54
Branch: refs/heads/trunk
Commit: 1b8b7b54087dd2dc2aeb2b9a0b06fbb4f13030be
Parents: c562700
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Feb 10 18:32:06 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Feb 10 18:36:28 2014 +0100
----------------------------------------------------------------------
test/unit/org/apache/cassandra/db/CleanupTest.java | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1b8b7b54/test/unit/org/apache/cassandra/db/CleanupTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java b/test/unit/org/apache/cassandra/db/CleanupTest.java
index 5872407..d5c2b07 100644
--- a/test/unit/org/apache/cassandra/db/CleanupTest.java
+++ b/test/unit/org/apache/cassandra/db/CleanupTest.java
@@ -64,8 +64,7 @@ public class CleanupTest extends SchemaLoader
@Test
public void testCleanup() throws IOException, ExecutionException, InterruptedException, ConfigurationException
{
- StorageService.instance.initServer(0);
-
+ StorageService.instance.getTokenMetadata().clearUnsafe();
Table table = Table.open(TABLE1);
ColumnFamilyStore cfs = table.getColumnFamilyStore(CF2);
[4/5] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0
Posted by al...@apache.org.
Merge branch 'cassandra-1.2' into cassandra-2.0
Conflicts:
src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
test/unit/org/apache/cassandra/db/CleanupTest.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b0e4f00c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b0e4f00c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b0e4f00c
Branch: refs/heads/trunk
Commit: b0e4f00c8c3986c0702f2b08b0d2cd4dd18b1dbf
Parents: 662f546 7787dea
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Feb 12 01:51:09 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Feb 12 01:51:09 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cql3/statements/ModificationStatement.java | 28 +++++++++++++++++++-
2 files changed, 28 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0e4f00c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 16cbd0a,56059a1..aec6f5e
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -16,33 -3,24 +16,34 @@@ Merged from 1.2
* Fix partition and range deletes not triggering flush (CASSANDRA-6655)
* Fix mean cells and mean row size per sstable calculations (CASSANDRA-6667)
* Compact hints after partial replay to clean out tombstones (CASSANDRA-6666)
+ * Log USING TTL/TIMESTAMP in a counter update warning (CASSANDRA-6649)
-
-1.2.15
- * Move handling of migration event source to solve bootstrap race (CASSANDRA-6648)
- * Make sure compaction throughput value doesn't overflow with int math (CASSANDRA-6647)
-
-
-1.2.14
- * Reverted code to limit CQL prepared statement cache by size (CASSANDRA-6592)
- * add cassandra.default_messaging_version property to allow easier
- upgrading from 1.1 (CASSANDRA-6619)
- * Allow executing CREATE statements multiple times (CASSANDRA-6471)
- * Don't send confusing info with timeouts (CASSANDRA-6491)
- * Don't resubmit counter mutation runnables internally (CASSANDRA-6427)
- * Don't drop local mutations without a hint (CASSANDRA-6510)
- * Don't allow null max_hint_window_in_ms (CASSANDRA-6419)
- * Validate SliceRange start and finish lengths (CASSANDRA-6521)
+2.0.5
+ * Reduce garbage generated by bloom filter lookups (CASSANDRA-6609)
+ * Add ks.cf names to tombstone logging (CASSANDRA-6597)
+ * Use LOCAL_QUORUM for LWT operations at LOCAL_SERIAL (CASSANDRA-6495)
+ * Wait for gossip to settle before accepting client connections (CASSANDRA-4288)
+ * Delete unfinished compaction incrementally (CASSANDRA-6086)
+ * Allow specifying custom secondary index options in CQL3 (CASSANDRA-6480)
+ * Improve replica pinning for cache efficiency in DES (CASSANDRA-6485)
+ * Fix LOCAL_SERIAL from thrift (CASSANDRA-6584)
+ * Don't special case received counts in CAS timeout exceptions (CASSANDRA-6595)
+ * Add support for 2.1 global counter shards (CASSANDRA-6505)
+ * Fix NPE when streaming connection is not yet established (CASSANDRA-6210)
+ * Avoid rare duplicate read repair triggering (CASSANDRA-6606)
+ * Fix paging discardFirst (CASSANDRA-6555)
+ * Fix ArrayIndexOutOfBoundsException in 2ndary index query (CASSANDRA-6470)
+ * Release sstables upon rebuilding 2i (CASSANDRA-6635)
+ * Add AbstractCompactionStrategy.startup() method (CASSANDRA-6637)
+ * SSTableScanner may skip rows during cleanup (CASSANDRA-6638)
+ * sstables from stalled repair sessions can resurrect deleted data (CASSANDRA-6503)
+ * Switch stress to use ITransportFactory (CASSANDRA-6641)
+ * Fix IllegalArgumentException during prepare (CASSANDRA-6592)
+ * Fix possible loss of 2ndary index entries during compaction (CASSANDRA-6517)
+ * Fix direct Memory on architectures that do not support unaligned long access
+ (CASSANDRA-6628)
+ * Let scrub optionally skip broken counter partitions (CASSANDRA-5930)
+Merged from 1.2:
* fsync compression metadata (CASSANDRA-6531)
* Validate CF existence on execution for prepared statement (CASSANDRA-6535)
* Add ability to throttle batchlog replay (CASSANDRA-6550)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0e4f00c/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 2567043,1b4dc37..676286c
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@@ -20,10 -20,10 +20,12 @@@ package org.apache.cassandra.cql3.state
import java.nio.ByteBuffer;
import java.util.*;
+import org.github.jamm.MemoryMeter;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnSlice;
@@@ -38,78 -36,38 +40,83 @@@ import org.apache.cassandra.service.CAS
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.thrift.ThriftValidation;
import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.Pair;
-/**
- * Abstract class for statements that apply on a given column family.
+/*
+ * Abstract parent class of individual modifications, i.e. INSERT, UPDATE and DELETE.
*/
-public abstract class ModificationStatement extends CFStatement implements CQLStatement, MeasurableForPreparedCache
+public abstract class ModificationStatement implements CQLStatement, MeasurableForPreparedCache
{
- public static enum Type
- {
- LOGGED, UNLOGGED, COUNTER
- }
+ private static final ColumnIdentifier CAS_RESULT_COLUMN = new ColumnIdentifier("[applied]", false);
+ private static final Logger logger = LoggerFactory.getLogger(ModificationStatement.class);
+
+ private static boolean loggedCounterTTL = false;
+ private static boolean loggedCounterTimestamp = false;
+
- protected Type type;
+ public final CFMetaData cfm;
+ public final Attributes attrs;
+
+ private final Map<ColumnIdentifier, Restriction> processedKeys = new HashMap<ColumnIdentifier, Restriction>();
+ private final List<Operation> columnOperations = new ArrayList<Operation>();
+
+ private int boundTerms;
+ private List<Operation> columnConditions;
+ private boolean ifNotExists;
+
+ public ModificationStatement(CFMetaData cfm, Attributes attrs)
+ {
+ this.cfm = cfm;
+ this.attrs = attrs;
+ }
+
+ public long measureForPreparedCache(MemoryMeter meter)
+ {
+ return meter.measure(this)
+ + meter.measureDeep(attrs)
+ + meter.measureDeep(processedKeys)
+ + meter.measureDeep(columnOperations)
+ + (columnConditions == null ? 0 : meter.measureDeep(columnConditions));
+ }
+
+ public abstract boolean requireFullClusteringKey();
+ public abstract ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder, UpdateParameters params) throws InvalidRequestException;
+
+ public int getBoundTerms()
+ {
+ return boundTerms;
+ }
+
+ public String keyspace()
+ {
+ return cfm.ksName;
+ }
+
+ public String columnFamily()
+ {
+ return cfm.cfName;
+ }
+
+ public boolean isCounter()
+ {
+ return cfm.getDefaultValidator().isCommutative();
+ }
- private Long timestamp;
- private final int timeToLive;
+ public long getTimestamp(long now, List<ByteBuffer> variables) throws InvalidRequestException
+ {
+ return attrs.getTimestamp(now, variables);
+ }
- public ModificationStatement(CFName name, Attributes attrs)
+ public boolean isTimestampSet()
{
- this(name, attrs.timestamp, attrs.timeToLive);
+ return attrs.isTimestampSet();
}
- public ModificationStatement(CFName name, Long timestamp, int timeToLive)
+ public int getTimeToLive(List<ByteBuffer> variables) throws InvalidRequestException
{
- super(name);
- this.timestamp = timestamp;
- this.timeToLive = timeToLive;
+ return attrs.getTimeToLive(variables);
}
public void checkAccess(ClientState state) throws InvalidRequestException, UnauthorizedException
@@@ -123,196 -77,95 +130,215 @@@
public void validate(ClientState state) throws InvalidRequestException
{
- if (timeToLive < 0)
- throw new InvalidRequestException("A TTL must be greater or equal to 0");
+ if (hasConditions() && attrs.isTimestampSet())
+ throw new InvalidRequestException("Custom timestamps are not allowed when conditions are used");
+
- if (timeToLive > ExpiringColumn.MAX_TTL)
- throw new InvalidRequestException(String.format("ttl is too large. requested (%d) maximum (%d)", timeToLive, ExpiringColumn.MAX_TTL));
-
- if (type == Type.COUNTER)
++ if (isCounter())
+ {
- if (timestamp != null && !loggedCounterTimestamp)
++ if (attrs.isTimestampSet() && !loggedCounterTimestamp)
+ {
+ logger.warn("Detected use of 'USING TIMESTAMP' in a counter UPDATE. This is invalid " +
+ "because counters do not use timestamps, and the timestamp has been ignored. " +
+ "Such queries will be rejected in Cassandra 2.1+ - please fix your queries before then.");
+ loggedCounterTimestamp = true;
+ }
+
- if (timeToLive != 0 && !loggedCounterTTL)
++ if (attrs.isTimeToLiveSet() && !loggedCounterTTL)
+ {
+ logger.warn("Detected use of 'USING TTL' in a counter UPDATE. This is invalid " +
+ "because counter tables do not support TTL, and the TTL value has been ignored. " +
+ "Such queries will be rejected in Cassandra 2.1+ - please fix your queries before then.");
+ loggedCounterTTL = true;
+ }
+ }
}
- protected abstract void validateConsistency(ConsistencyLevel cl) throws InvalidRequestException;
+ public void addOperation(Operation op)
+ {
+ columnOperations.add(op);
+ }
- public ResultMessage execute(ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables) throws RequestExecutionException, RequestValidationException
+ public List<Operation> getOperations()
{
- if (cl == null)
- throw new InvalidRequestException("Invalid empty consistency level");
+ return columnOperations;
+ }
- validateConsistency(cl);
+ public void addCondition(Operation op)
+ {
+ if (columnConditions == null)
+ columnConditions = new ArrayList<Operation>();
- // The type should have been set by now or we have a bug
- assert type != null;
+ columnConditions.add(op);
+ }
- Collection<? extends IMutation> mutations = getMutations(variables, false, cl, queryState.getTimestamp());
- if (mutations.isEmpty())
- return null;
+ public void setIfNotExistCondition()
+ {
+ ifNotExists = true;
+ }
- switch (type)
- {
- case LOGGED:
- if (mutations.size() > 1)
- StorageProxy.mutateAtomically((Collection<RowMutation>) mutations, cl);
- else
- StorageProxy.mutate(mutations, cl);
- break;
- case UNLOGGED:
- case COUNTER:
- StorageProxy.mutate(mutations, cl);
- break;
- default:
- throw new AssertionError();
- }
+ private void addKeyValues(ColumnIdentifier name, Restriction values) throws InvalidRequestException
+ {
+ if (processedKeys.put(name, values) != null)
+ throw new InvalidRequestException(String.format("Multiple definitions found for PRIMARY KEY part %s", name));
+ }
- return null;
+ public void addKeyValue(ColumnIdentifier name, Term value) throws InvalidRequestException
+ {
+ addKeyValues(name, new Restriction.EQ(value, false));
}
- public ResultMessage executeInternal(QueryState queryState) throws RequestValidationException, RequestExecutionException
+ public void processWhereClause(List<Relation> whereClause, VariableSpecifications names) throws InvalidRequestException
{
- for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(), true, null, queryState.getTimestamp()))
- mutation.apply();
- return null;
+ CFDefinition cfDef = cfm.getCfDef();
+ for (Relation rel : whereClause)
+ {
+ CFDefinition.Name name = cfDef.get(rel.getEntity());
+ if (name == null)
+ throw new InvalidRequestException(String.format("Unknown key identifier %s", rel.getEntity()));
+
+ switch (name.kind)
+ {
+ case KEY_ALIAS:
+ case COLUMN_ALIAS:
+ Restriction restriction;
+
+ if (rel.operator() == Relation.Type.EQ)
+ {
+ Term t = rel.getValue().prepare(name);
+ t.collectMarkerSpecification(names);
+ restriction = new Restriction.EQ(t, false);
+ }
+ else if (name.kind == CFDefinition.Name.Kind.KEY_ALIAS && rel.operator() == Relation.Type.IN)
+ {
+ if (rel.getValue() != null)
+ {
+ Term t = rel.getValue().prepare(name);
+ t.collectMarkerSpecification(names);
+ restriction = Restriction.IN.create(t);
+ }
+ else
+ {
+ List<Term> values = new ArrayList<Term>(rel.getInValues().size());
+ for (Term.Raw raw : rel.getInValues())
+ {
+ Term t = raw.prepare(name);
+ t.collectMarkerSpecification(names);
+ values.add(t);
+ }
+ restriction = Restriction.IN.create(values);
+ }
+ }
+ else
+ {
+ throw new InvalidRequestException(String.format("Invalid operator %s for PRIMARY KEY part %s", rel.operator(), name));
+ }
+
+ addKeyValues(name.name, restriction);
+ break;
+ case VALUE_ALIAS:
+ case COLUMN_METADATA:
+ throw new InvalidRequestException(String.format("Non PRIMARY KEY %s found in where clause", name));
+ }
+ }
}
- public long getTimestamp(long now)
+ public List<ByteBuffer> buildPartitionKeyNames(List<ByteBuffer> variables)
+ throws InvalidRequestException
{
- return timestamp == null ? now : timestamp;
+ CFDefinition cfDef = cfm.getCfDef();
+ ColumnNameBuilder keyBuilder = cfDef.getKeyNameBuilder();
+ List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
+ for (CFDefinition.Name name : cfDef.keys.values())
+ {
+ Restriction r = processedKeys.get(name.name);
+ if (r == null)
+ throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s", name));
+
+ List<ByteBuffer> values = r.values(variables);
+
+ if (keyBuilder.remainingCount() == 1)
+ {
+ for (ByteBuffer val : values)
+ {
+ if (val == null)
+ throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", name));
+ keys.add(keyBuilder.copy().add(val).build());
+ }
+ }
+ else
+ {
+ if (values.size() != 1)
+ throw new InvalidRequestException("IN is only supported on the last column of the partition key");
+ ByteBuffer val = values.get(0);
+ if (val == null)
+ throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", name));
+ keyBuilder.add(val);
+ }
+ }
+ return keys;
}
- public void setTimestamp(long timestamp)
+ public ColumnNameBuilder createClusteringPrefixBuilder(List<ByteBuffer> variables)
+ throws InvalidRequestException
{
- this.timestamp = timestamp;
+ CFDefinition cfDef = cfm.getCfDef();
+ ColumnNameBuilder builder = cfDef.getColumnNameBuilder();
+ CFDefinition.Name firstEmptyKey = null;
+ for (CFDefinition.Name name : cfDef.columns.values())
+ {
+ Restriction r = processedKeys.get(name.name);
+ if (r == null)
+ {
+ firstEmptyKey = name;
+ if (requireFullClusteringKey() && cfDef.isComposite && !cfDef.isCompact)
+ throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s", name));
+ }
+ else if (firstEmptyKey != null)
+ {
+ throw new InvalidRequestException(String.format("Missing PRIMARY KEY part %s since %s is set", firstEmptyKey.name, name.name));
+ }
+ else
+ {
+ List<ByteBuffer> values = r.values(variables);
+ assert values.size() == 1; // We only allow IN for row keys so far
+ ByteBuffer val = values.get(0);
+ if (val == null)
+ throw new InvalidRequestException(String.format("Invalid null value for clustering key part %s", name));
+ builder.add(val);
+ }
+ }
+ return builder;
}
- public boolean isSetTimestamp()
+ protected CFDefinition.Name getFirstEmptyKey()
{
- return timestamp != null;
+ for (CFDefinition.Name name : cfm.getCfDef().columns.values())
+ {
+ if (processedKeys.get(name.name) == null)
+ return name;
+ }
+ return null;
}
- public int getTimeToLive()
+ protected Map<ByteBuffer, ColumnGroupMap> readRequiredRows(List<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, boolean local, ConsistencyLevel cl)
+ throws RequestExecutionException, RequestValidationException
{
- return timeToLive;
+ // Lists SET operation incurs a read.
+ Set<ByteBuffer> toRead = null;
+ for (Operation op : columnOperations)
+ {
+ if (op.requiresRead())
+ {
+ if (toRead == null)
+ toRead = new TreeSet<ByteBuffer>(UTF8Type.instance);
+ toRead.add(op.columnName.key);
+ }
+ }
+
+ return toRead == null ? null : readRows(partitionKeys, clusteringPrefix, toRead, (CompositeType)cfm.comparator, local, cl);
}
- protected Map<ByteBuffer, ColumnGroupMap> readRows(List<ByteBuffer> keys, ColumnNameBuilder builder, Set<ByteBuffer> toRead, CompositeType composite, boolean local, ConsistencyLevel cl)
+ private Map<ByteBuffer, ColumnGroupMap> readRows(List<ByteBuffer> partitionKeys, ColumnNameBuilder clusteringPrefix, Set<ByteBuffer> toRead, CompositeType composite, boolean local, ConsistencyLevel cl)
throws RequestExecutionException, RequestValidationException
{
try
@@@ -508,219 -226,8 +534,219 @@@
* @return list of the mutations
* @throws InvalidRequestException on invalid requests
*/
- protected abstract Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now)
- throws RequestExecutionException, RequestValidationException;
+ public Collection<? extends IMutation> getMutations(List<ByteBuffer> variables, boolean local, ConsistencyLevel cl, long now, boolean isBatch)
+ throws RequestExecutionException, RequestValidationException
+ {
+ List<ByteBuffer> keys = buildPartitionKeyNames(variables);
+ ColumnNameBuilder clusteringPrefix = createClusteringPrefixBuilder(variables);
+
+ // Some lists operation requires reading
+ Map<ByteBuffer, ColumnGroupMap> rows = readRequiredRows(keys, clusteringPrefix, local, cl);
+ UpdateParameters params = new UpdateParameters(cfm, variables, getTimestamp(now, variables), getTimeToLive(variables), rows);
+
+ Collection<IMutation> mutations = new ArrayList<IMutation>();
+ for (ByteBuffer key: keys)
+ {
+ ThriftValidation.validateKey(cfm, key);
+ ColumnFamily cf = updateForKey(key, clusteringPrefix, params);
+ mutations.add(makeMutation(key, cf, cl, isBatch));
+ }
+ return mutations;
+ }
+
+ private IMutation makeMutation(ByteBuffer key, ColumnFamily cf, ConsistencyLevel cl, boolean isBatch)
+ {
+ RowMutation rm;
+ if (isBatch)
+ {
+ // we might group other mutations together with this one later, so make it mutable
+ rm = new RowMutation(cfm.ksName, key);
+ rm.add(cf);
+ }
+ else
+ {
+ rm = new RowMutation(cfm.ksName, key, cf);
+ }
+ return isCounter() ? new CounterMutation(rm, cl) : rm;
+ }
+
+ private static abstract class CQL3CasConditions implements CASConditions
+ {
+ protected final ColumnNameBuilder rowPrefix;
+ protected final long now;
+
+ protected CQL3CasConditions(ColumnNameBuilder rowPrefix, long now)
+ {
+ this.rowPrefix = rowPrefix;
+ this.now = now;
+ }
- public abstract ParsedStatement.Prepared prepare(ColumnSpecification[] boundNames) throws InvalidRequestException;
+ public IDiskAtomFilter readFilter()
+ {
+ // We always read the row entirely as on CAS failure we want to be able to distinguish between "row exists
+ // but all values on why there were conditions are null" and "row doesn't exists", and we can't rely on the
+ // row marker for that (see #6623)
+ return new SliceQueryFilter(rowPrefix.build(), rowPrefix.buildAsEndOfRange(), false, 1, rowPrefix.componentCount());
+ }
+ }
+
+ private static class NotExistCondition extends CQL3CasConditions
+ {
+ private NotExistCondition(ColumnNameBuilder rowPrefix, long now)
+ {
+ super(rowPrefix, now);
+ }
+
+ public boolean appliesTo(ColumnFamily current)
+ {
+ return current == null || current.hasOnlyTombstones(now);
+ }
+ }
+
+ private static class ColumnsConditions extends CQL3CasConditions
+ {
+ private final ColumnFamily expected;
+
+ private ColumnsConditions(ColumnNameBuilder rowPrefix,
+ CFMetaData cfm,
+ ByteBuffer key,
+ Collection<Operation> conditions,
+ List<ByteBuffer> variables,
+ long now) throws InvalidRequestException
+ {
+ super(rowPrefix, now);
+ this.expected = TreeMapBackedSortedColumns.factory.create(cfm);
+
+ // When building the conditions, we should not use a TTL. It's not useful, and if a very low ttl (1 seconds) is used, it's possible
+ // for it to expire before the actual build of the conditions which would break since we would then testing for the presence of tombstones.
+ UpdateParameters params = new UpdateParameters(cfm, variables, now, 0, null);
+
+ // Conditions
+ for (Operation condition : conditions)
+ condition.execute(key, expected, rowPrefix.copy(), params);
+ }
+
+ public boolean appliesTo(ColumnFamily current)
+ {
+ if (current == null)
+ return false;
+
+ for (Column e : expected)
+ {
+ Column c = current.getColumn(e.name());
+ if (e.isLive(now))
+ {
+ if (c == null || !c.isLive(now) || !c.value().equals(e.value()))
+ return false;
+ }
+ else
+ {
+ // If we have a tombstone in expected, it means the condition tests that the column is
+ // null, so check that we have no value
+ if (c != null && c.isLive(now))
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public String toString()
+ {
+ return expected.toString();
+ }
+ }
+
+ public static abstract class Parsed extends CFStatement
+ {
+ protected final Attributes.Raw attrs;
+ private final List<Pair<ColumnIdentifier, Operation.RawUpdate>> conditions;
+ private final boolean ifNotExists;
+
+ protected Parsed(CFName name, Attributes.Raw attrs, List<Pair<ColumnIdentifier, Operation.RawUpdate>> conditions, boolean ifNotExists)
+ {
+ super(name);
+ this.attrs = attrs;
+ this.conditions = conditions == null ? Collections.<Pair<ColumnIdentifier, Operation.RawUpdate>>emptyList() : conditions;
+ this.ifNotExists = ifNotExists;
+ }
+
+ public ParsedStatement.Prepared prepare() throws InvalidRequestException
+ {
+ VariableSpecifications boundNames = getBoundVariables();
+ ModificationStatement statement = prepare(boundNames);
+ return new ParsedStatement.Prepared(statement, boundNames);
+ }
+
+ public ModificationStatement prepare(VariableSpecifications boundNames) throws InvalidRequestException
+ {
+ CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
+ CFDefinition cfDef = metadata.getCfDef();
+
+ // The collected count in the beginning of preparation.
+ // Will start at non-zero for statements nested inside a BatchStatement (the second and the further ones).
+ int collected = boundNames.getCollectedCount();
+
+ Attributes preparedAttributes = attrs.prepare(keyspace(), columnFamily());
+ preparedAttributes.collectMarkerSpecification(boundNames);
+
+ ModificationStatement stmt = prepareInternal(cfDef, boundNames, preparedAttributes);
+
- if (ifNotExists || (conditions != null && !conditions.isEmpty()))
++ if (ifNotExists || !conditions.isEmpty())
+ {
+ if (stmt.isCounter())
+ throw new InvalidRequestException("Conditional updates are not supported on counter tables");
+
+ if (attrs.timestamp != null)
+ throw new InvalidRequestException("Cannot provide custom timestamp for conditional update");
+
+ if (ifNotExists)
+ {
+ // To have both 'IF NOT EXISTS' and some other conditions doesn't make sense.
+ // So far this is enforced by the parser, but let's assert it for sanity if ever the parse changes.
+ assert conditions.isEmpty();
+ stmt.setIfNotExistCondition();
+ }
+ else
+ {
+ for (Pair<ColumnIdentifier, Operation.RawUpdate> entry : conditions)
+ {
+ CFDefinition.Name name = cfDef.get(entry.left);
+ if (name == null)
+ throw new InvalidRequestException(String.format("Unknown identifier %s", entry.left));
+
+ /*
+ * Lists column names are based on a server-side generated timeuuid. So we can't allow lists
+ * operation or that would yield unexpected results (update that should apply wouldn't). So for
+ * now, we just refuse lists, which also save use from having to bother about the read that some
+ * list operation involve.
+ */
+ if (name.type instanceof ListType)
+ throw new InvalidRequestException(String.format("List operation (%s) are not allowed in conditional updates", name));
+
+ Operation condition = entry.right.prepare(name);
+ assert !condition.requiresRead();
+
+ condition.collectMarkerSpecification(boundNames);
+
+ switch (name.kind)
+ {
+ case KEY_ALIAS:
+ case COLUMN_ALIAS:
+ throw new InvalidRequestException(String.format("PRIMARY KEY part %s found in SET part", entry.left));
+ case VALUE_ALIAS:
+ case COLUMN_METADATA:
+ stmt.addCondition(condition);
+ break;
+ }
+ }
+ }
+ }
+
+ stmt.boundTerms = boundNames.getCollectedCount() - collected;
+ return stmt;
+ }
+
+ protected abstract ModificationStatement prepareInternal(CFDefinition cfDef, VariableSpecifications boundNames, Attributes attrs) throws InvalidRequestException;
+ }
}
[2/5] git commit: who forgets to add files? I do.
Posted by al...@apache.org.
who forgets to add files? I do.
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/662f546a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/662f546a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/662f546a
Branch: refs/heads/trunk
Commit: 662f546aa76b7f6473b8da8b9ea675a9fd9757e1
Parents: b9aece4
Author: Brandon Williams <br...@apache.org>
Authored: Tue Feb 11 11:30:02 2014 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Feb 11 11:30:02 2014 -0600
----------------------------------------------------------------------
.../cassandra/hadoop/ReporterWrapper.java | 57 ++++++++++++++++++++
1 file changed, 57 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/662f546a/src/java/org/apache/cassandra/hadoop/ReporterWrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ReporterWrapper.java b/src/java/org/apache/cassandra/hadoop/ReporterWrapper.java
new file mode 100644
index 0000000..9940ba4
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/ReporterWrapper.java
@@ -0,0 +1,57 @@
+package org.apache.cassandra.hadoop;
+
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.StatusReporter;
+
+/**
+ * A reporter that works with both mapred and mapreduce APIs.
+ */
+public class ReporterWrapper extends StatusReporter implements Reporter {
+ private Reporter wrappedReporter;
+
+ public ReporterWrapper(Reporter reporter) {
+ wrappedReporter = reporter;
+ }
+
+ @Override
+ public Counters.Counter getCounter(Enum<?> anEnum) {
+ return wrappedReporter.getCounter(anEnum);
+ }
+
+ @Override
+ public Counters.Counter getCounter(String s, String s1) {
+ return wrappedReporter.getCounter(s, s1);
+ }
+
+ @Override
+ public void incrCounter(Enum<?> anEnum, long l) {
+ wrappedReporter.incrCounter(anEnum, l);
+ }
+
+ @Override
+ public void incrCounter(String s, String s1, long l) {
+ wrappedReporter.incrCounter(s, s1, l);
+ }
+
+ @Override
+ public InputSplit getInputSplit() throws UnsupportedOperationException {
+ return wrappedReporter.getInputSplit();
+ }
+
+ @Override
+ public void progress() {
+ wrappedReporter.progress();
+ }
+
+ // @Override
+ public float getProgress() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setStatus(String s) {
+ wrappedReporter.setStatus(s);
+ }
+}
\ No newline at end of file
[3/5] git commit: Log USING TTL/TIMESTAMP in a counter update warning
Posted by al...@apache.org.
Log USING TTL/TIMESTAMP in a counter update warning
patch by Aleksey Yeschenko; reviewed by Sylvain Lebresne for
CASSANDRA-6649
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7787deab
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7787deab
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7787deab
Branch: refs/heads/trunk
Commit: 7787deab72fe5772170c0cf6b6712ed2211158ed
Parents: 1b8b7b5
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Feb 12 01:39:40 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Feb 12 01:39:40 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cql3/statements/ModificationStatement.java | 27 ++++++++++++++++++++
2 files changed, 28 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7787deab/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 82783f8..56059a1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,6 +3,7 @@
* Fix partition and range deletes not triggering flush (CASSANDRA-6655)
* Fix mean cells and mean row size per sstable calculations (CASSANDRA-6667)
* Compact hints after partial replay to clean out tombstones (CASSANDRA-6666)
+ * Log USING TTL/TIMESTAMP in a counter update warning (CASSANDRA-6649)
1.2.15
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7787deab/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index bfbf511..1b4dc37 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -20,6 +20,9 @@ package org.apache.cassandra.cql3.statements;
import java.nio.ByteBuffer;
import java.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.db.*;
@@ -45,6 +48,11 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
LOGGED, UNLOGGED, COUNTER
}
+ private static final Logger logger = LoggerFactory.getLogger(ModificationStatement.class);
+
+ private static boolean loggedCounterTTL = false;
+ private static boolean loggedCounterTimestamp = false;
+
protected Type type;
private Long timestamp;
@@ -74,6 +82,25 @@ public abstract class ModificationStatement extends CFStatement implements CQLSt
if (timeToLive > ExpiringColumn.MAX_TTL)
throw new InvalidRequestException(String.format("ttl is too large. requested (%d) maximum (%d)", timeToLive, ExpiringColumn.MAX_TTL));
+
+ if (type == Type.COUNTER)
+ {
+ if (timestamp != null && !loggedCounterTimestamp)
+ {
+ logger.warn("Detected use of 'USING TIMESTAMP' in a counter UPDATE. This is invalid " +
+ "because counters do not use timestamps, and the timestamp has been ignored. " +
+ "Such queries will be rejected in Cassandra 2.1+ - please fix your queries before then.");
+ loggedCounterTimestamp = true;
+ }
+
+ if (timeToLive != 0 && !loggedCounterTTL)
+ {
+ logger.warn("Detected use of 'USING TTL' in a counter UPDATE. This is invalid " +
+ "because counter tables do not support TTL, and the TTL value has been ignored. " +
+ "Such queries will be rejected in Cassandra 2.1+ - please fix your queries before then.");
+ loggedCounterTTL = true;
+ }
+ }
}
protected abstract void validateConsistency(ConsistencyLevel cl) throws InvalidRequestException;
[5/5] git commit: Merge branch 'cassandra-2.0' into trunk
Posted by al...@apache.org.
Merge branch 'cassandra-2.0' into trunk
Conflicts:
src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/aac421aa
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/aac421aa
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/aac421aa
Branch: refs/heads/trunk
Commit: aac421aaa1d2cf214839eedd55abf35958ae4487
Parents: f007a35 b0e4f00
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Feb 12 02:05:16 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Feb 12 02:05:16 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 3 +++
.../cql3/statements/ModificationStatement.java | 12 +++++++++---
2 files changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aac421aa/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index eec6296,aec6f5e..8e7e249
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,35 -1,3 +1,37 @@@
+2.1
+ * add listsnapshots command to nodetool (CASSANDRA-5742)
+ * Introduce AtomicBTreeColumns (CASSANDRA-6271)
+ * Multithreaded commitlog (CASSANDRA-3578)
+ * allocate fixed index summary memory pool and resample cold index summaries
+ to use less memory (CASSANDRA-5519)
+ * Removed multithreaded compaction (CASSANDRA-6142)
+ * Parallelize fetching rows for low-cardinality indexes (CASSANDRA-1337)
+ * change logging from log4j to logback (CASSANDRA-5883)
+ * switch to LZ4 compression for internode communication (CASSANDRA-5887)
+ * Stop using Thrift-generated Index* classes internally (CASSANDRA-5971)
+ * Remove 1.2 network compatibility code (CASSANDRA-5960)
+ * Remove leveled json manifest migration code (CASSANDRA-5996)
+ * Remove CFDefinition (CASSANDRA-6253)
+ * Use AtomicIntegerFieldUpdater in RefCountedMemory (CASSANDRA-6278)
+ * User-defined types for CQL3 (CASSANDRA-5590)
+ * Use of o.a.c.metrics in nodetool (CASSANDRA-5871, 6406)
+ * Batch read from OTC's queue and cleanup (CASSANDRA-1632)
+ * Secondary index support for collections (CASSANDRA-4511, 6383)
+ * SSTable metadata(Stats.db) format change (CASSANDRA-6356)
+ * Push composites support in the storage engine
+ (CASSANDRA-5417, CASSANDRA-6520)
+ * Add snapshot space used to cfstats (CASSANDRA-6231)
+ * Add cardinality estimator for key count estimation (CASSANDRA-5906)
+ * CF id is changed to be non-deterministic. Data dir/key cache are created
+ uniquely for CF id (CASSANDRA-5202)
+ * New counters implementation (CASSANDRA-6504)
+ * Replace UnsortedColumns and TreeMapBackedSortedColumns with rewritten
+ ArrayBackedSortedColumns (CASSANDRA-6630, CASSANDRA-6662)
+ * Add option to use row cache with a given amount of rows (CASSANDRA-5357)
+ * Avoid repairing already repaired data (CASSANDRA-5351)
++ * Reject counter updates with USING TTL/TIMESTAMP (CASSANDRA-6649)
++
+
2.0.6
* Add compatibility for Hadoop 0.2.x (CASSANDRA-5201)
* Fix EstimatedHistogram races (CASSANDRA-6682)
@@@ -45,8 -16,8 +47,9 @@@ Merged from 1.2
* Fix partition and range deletes not triggering flush (CASSANDRA-6655)
* Fix mean cells and mean row size per sstable calculations (CASSANDRA-6667)
* Compact hints after partial replay to clean out tombstones (CASSANDRA-6666)
+ * Log USING TTL/TIMESTAMP in a counter update warning (CASSANDRA-6649)
+
2.0.5
* Reduce garbage generated by bloom filter lookups (CASSANDRA-6609)
* Add ks.cf names to tombstone logging (CASSANDRA-6597)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aac421aa/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 3775bde,676286c..6d18f1b
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@@ -126,7 -131,26 +126,13 @@@ public abstract class ModificationState
public void validate(ClientState state) throws InvalidRequestException
{
if (hasConditions() && attrs.isTimestampSet())
-- throw new InvalidRequestException("Custom timestamps are not allowed when conditions are used");
++ throw new InvalidRequestException("Cannot provide custom timestamp for conditional updates");
+
- if (isCounter())
- {
- if (attrs.isTimestampSet() && !loggedCounterTimestamp)
- {
- logger.warn("Detected use of 'USING TIMESTAMP' in a counter UPDATE. This is invalid " +
- "because counters do not use timestamps, and the timestamp has been ignored. " +
- "Such queries will be rejected in Cassandra 2.1+ - please fix your queries before then.");
- loggedCounterTimestamp = true;
- }
++ if (isCounter() && attrs.isTimestampSet())
++ throw new InvalidRequestException("Cannot provide custom timestamp for counter updates");
+
- if (attrs.isTimeToLiveSet() && !loggedCounterTTL)
- {
- logger.warn("Detected use of 'USING TTL' in a counter UPDATE. This is invalid " +
- "because counter tables do not support TTL, and the TTL value has been ignored. " +
- "Such queries will be rejected in Cassandra 2.1+ - please fix your queries before then.");
- loggedCounterTTL = true;
- }
- }
++ if (isCounter() && attrs.isTimeToLiveSet())
++ throw new InvalidRequestException("Cannot provide custom TTL for counter updates");
}
public void addOperation(Operation op)
@@@ -655,15 -690,15 +661,15 @@@
Attributes preparedAttributes = attrs.prepare(keyspace(), columnFamily());
preparedAttributes.collectMarkerSpecification(boundNames);
- ModificationStatement stmt = prepareInternal(cfDef, boundNames, preparedAttributes);
+ ModificationStatement stmt = prepareInternal(metadata, boundNames, preparedAttributes);
- if (ifNotExists || (conditions != null && !conditions.isEmpty()))
+ if (ifNotExists || !conditions.isEmpty())
{
if (stmt.isCounter())
throw new InvalidRequestException("Conditional updates are not supported on counter tables");
if (attrs.timestamp != null)
-- throw new InvalidRequestException("Cannot provide custom timestamp for conditional update");
++ throw new InvalidRequestException("Cannot provide custom timestamp for conditional updates");
if (ifNotExists)
{