You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/11/24 12:51:26 UTC

cassandra git commit: Validate size of indexed column values

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 2ce1ad8e6 -> 0e3d9fc14


Validate size of indexed column values

patch by Sam Tunnicliffe; reviewed by Aleksey Yeschenko for
CASSANDRA-8280


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0e3d9fc1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0e3d9fc1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0e3d9fc1

Branch: refs/heads/cassandra-2.0
Commit: 0e3d9fc14bfcb38b9f179c0428cf586890c4a8ab
Parents: 2ce1ad8
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Mon Nov 24 14:50:14 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Mon Nov 24 14:50:14 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/cql3/CFDefinition.java |   5 +
 .../cassandra/cql3/ColumnNameBuilder.java       |   7 ++
 .../cql3/statements/ModificationStatement.java  |  20 +--
 .../cql3/statements/UpdateStatement.java        |  22 +++-
 .../db/index/SecondaryIndexManager.java         |   6 +-
 .../cassandra/db/marshal/CompositeType.java     |   9 ++
 .../cassandra/io/sstable/SSTableWriter.java     |   9 ++
 .../cql3/IndexedValuesValidationTest.java       | 124 +++++++++++++++++++
 9 files changed, 192 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3d9fc1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6a5ac0d..412eb59 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.12:
+ * Validate size of indexed column values (CASSANDRA-8280)
  * Make LCS split compaction results over all data directories (CASSANDRA-8329)
  * Fix some failing queries that use multi-column relations
    on COMPACT STORAGE tables (CASSANDRA-8264)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3d9fc1/src/java/org/apache/cassandra/cql3/CFDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CFDefinition.java b/src/java/org/apache/cassandra/cql3/CFDefinition.java
index 23bedaf..e0bb409 100644
--- a/src/java/org/apache/cassandra/cql3/CFDefinition.java
+++ b/src/java/org/apache/cassandra/cql3/CFDefinition.java
@@ -358,5 +358,10 @@ public class CFDefinition implements Iterable<CFDefinition.Name>
 
             return columnName;
         }
+
+        public int getLength()
+        {
+            return columnName == null ? 0 : columnName.remaining();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3d9fc1/src/java/org/apache/cassandra/cql3/ColumnNameBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ColumnNameBuilder.java b/src/java/org/apache/cassandra/cql3/ColumnNameBuilder.java
index 3d5eff6..50cdc74 100644
--- a/src/java/org/apache/cassandra/cql3/ColumnNameBuilder.java
+++ b/src/java/org/apache/cassandra/cql3/ColumnNameBuilder.java
@@ -78,4 +78,11 @@ public interface ColumnNameBuilder
      */
     public ByteBuffer getComponent(int i);
 
+    /**
+     * Returns the total length of the ByteBuffer that will
+     * be returned by build().
+     * @return the total length of the column name to be built
+     */
+    public int getLength();
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3d9fc1/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 61f65c1..db22e7d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -22,8 +22,6 @@ import java.util.*;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.github.jamm.MemoryMeter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +41,9 @@ import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.thrift.ThriftValidation;
 import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
+import org.github.jamm.MemoryMeter;
 
 /*
  * Abstract parent class of individual modifications, i.e. INSERT, UPDATE and DELETE.
@@ -328,7 +328,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
     throws InvalidRequestException
     {
         CFDefinition cfDef = cfm.getCfDef();
-        ColumnNameBuilder keyBuilder = cfDef.getKeyNameBuilder();
+        ColumnNameBuilder keyBuilderBase = cfDef.getKeyNameBuilder();
         List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
         for (CFDefinition.Name name : cfDef.partitionKeys())
         {
@@ -337,14 +337,19 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
                 throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s", name));
 
             List<ByteBuffer> values = r.values(variables);
-
-            if (keyBuilder.remainingCount() == 1)
+            if (keyBuilderBase.remainingCount() == 1)
             {
                 for (ByteBuffer val : values)
                 {
                     if (val == null)
                         throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", name));
-                    ByteBuffer key = keyBuilder.copy().add(val).build();
+
+                    ColumnNameBuilder keyBuilder = keyBuilderBase.copy().add(val);
+                    if (keyBuilder.getLength() > FBUtilities.MAX_UNSIGNED_SHORT)
+                        throw new InvalidRequestException(String.format("Partition key size %s exceeds maximum %s",
+                                                                        keyBuilder.getLength(),
+                                                                        FBUtilities.MAX_UNSIGNED_SHORT));
+                    ByteBuffer key = keyBuilder.build();
                     ThriftValidation.validateKey(cfm, key);
                     keys.add(key);
                 }
@@ -356,7 +361,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
                 ByteBuffer val = values.get(0);
                 if (val == null)
                     throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", name));
-                keyBuilder.add(val);
+                keyBuilderBase.add(val);
             }
         }
         return keys;
@@ -727,7 +732,6 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
         Collection<IMutation> mutations = new ArrayList<IMutation>();
         for (ByteBuffer key: keys)
         {
-            ThriftValidation.validateKey(cfm, key);
             ColumnFamily cf = UnsortedColumns.factory.create(cfm);
             addUpdateForKey(cf, key, clusteringPrefix, params);
             RowMutation rm = new RowMutation(cfm.ksName, key, cf);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3d9fc1/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index e2da251..9d98c84 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -23,9 +23,10 @@ import java.util.*;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -51,6 +52,11 @@ public class UpdateStatement extends ModificationStatement
     {
         CFDefinition cfDef = cfm.getCfDef();
 
+        if (builder.getLength() > FBUtilities.MAX_UNSIGNED_SHORT)
+            throw new InvalidRequestException(String.format("The sum of all clustering columns is too long (%s > %s)",
+                                                            builder.getLength(),
+                                                            FBUtilities.MAX_UNSIGNED_SHORT));
+
         // Inserting the CQL row marker (see #4361)
         // We always need to insert a marker for INSERT, because of the following situation:
         //   CREATE TABLE t ( k int PRIMARY KEY, c text );
@@ -99,6 +105,20 @@ public class UpdateStatement extends ModificationStatement
             for (Operation update : updates)
                 update.execute(key, cf, builder.copy(), params);
         }
+
+        SecondaryIndexManager indexManager = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfId).indexManager;
+        if (indexManager.hasIndexes())
+        {
+            for (Column column : cf)
+            {
+                if (!indexManager.validate(column))
+                    throw new InvalidRequestException(String.format("Can't index column value of size %d for index %s on %s.%s",
+                                                                    column.value().remaining(),
+                                                                    cfm.getColumnDefinitionFromColumnName(column.name()).getIndexName(),
+                                                                    cfm.ksName,
+                                                                    cfm.cfName));
+            }
+        }
     }
 
     public static class ParsedInsert extends ModificationStatement.Parsed

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3d9fc1/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index 6d9f28a..fda79f8 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -574,8 +574,10 @@ public class SecondaryIndexManager
 
     public boolean validate(Column column)
     {
-        SecondaryIndex index = getIndexForColumn(column.name());
-        return index == null || index.validate(column);
+        for (SecondaryIndex index : indexFor(column.name()))
+            if (!index.validate(column))
+                return false;
+        return true;
     }
 
     public static interface Updater

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3d9fc1/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index 946ba24..f0d9d9b 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -504,5 +504,14 @@ public class CompositeType extends AbstractCompositeType
 
             return components.get(i);
         }
+
+        public int getLength()
+        {
+            int length = 0;
+            for (ByteBuffer component : components)
+                length += component.remaining() + 3; // length + 2 bytes for length + EOC
+
+            return length;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3d9fc1/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 4619ddc..afa066d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.io.compress.CompressedSequentialWriter;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.FilterFactory;
 import org.apache.cassandra.utils.IFilter;
 import org.apache.cassandra.utils.Pair;
@@ -181,6 +182,14 @@ public class SSTableWriter extends SSTable
 
     public void append(DecoratedKey decoratedKey, ColumnFamily cf)
     {
+        if (decoratedKey.key.remaining() > FBUtilities.MAX_UNSIGNED_SHORT)
+        {
+            logger.error("Key size {} exceeds maximum of {}, skipping row",
+                         decoratedKey.key.remaining(),
+                         FBUtilities.MAX_UNSIGNED_SHORT);
+            return;
+        }
+
         long startPosition = beforeAppend(decoratedKey);
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3d9fc1/test/unit/org/apache/cassandra/cql3/IndexedValuesValidationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/IndexedValuesValidationTest.java b/test/unit/org/apache/cassandra/cql3/IndexedValuesValidationTest.java
new file mode 100644
index 0000000..9c2bc0f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/IndexedValuesValidationTest.java
@@ -0,0 +1,124 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.cassandra.cql3;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.exceptions.RequestValidationException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.utils.MD5Digest;
+
+import static org.junit.Assert.fail;
+import static org.apache.cassandra.cql3.QueryProcessor.process;
+
+public class IndexedValuesValidationTest
+{
+    static ClientState clientState;
+    static String keyspace = "indexed_value_validation_test";
+
+    @BeforeClass
+    public static void setUpClass() throws Throwable
+    {
+        SchemaLoader.loadSchema();
+        executeSchemaChange("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}");
+        clientState = ClientState.forInternalCalls();
+    }
+
+    // CASSANDRA-8280/8081
+    // reject updates with indexed values where value > 64k
+    @Test
+    public void testIndexOnCompositeValueOver64k() throws Throwable
+    {
+        executeSchemaChange("CREATE TABLE %s.composite_index_table (a int, b int, c blob, PRIMARY KEY (a))");
+        executeSchemaChange("CREATE INDEX ON %s.composite_index_table(c)");
+        performInsertWithIndexedValueOver64k("INSERT INTO %s.composite_index_table (a, b, c) VALUES (0, 0, ?)");
+    }
+
+    @Test
+    public void testIndexOnClusteringValueOver64k() throws Throwable
+    {
+        executeSchemaChange("CREATE TABLE %s.ck_index_table (a int, b blob, c int, PRIMARY KEY (a, b))");
+        executeSchemaChange("CREATE INDEX ON %s.ck_index_table(b)");
+        performInsertWithIndexedValueOver64k("INSERT INTO %s.ck_index_table (a, b, c) VALUES (0, ?, 0)");
+    }
+
+    @Test
+    public void testIndexOnPartitionKeyOver64k() throws Throwable
+    {
+        executeSchemaChange("CREATE TABLE %s.pk_index_table (a blob, b int, c int, PRIMARY KEY ((a, b)))");
+        executeSchemaChange("CREATE INDEX ON %s.pk_index_table(a)");
+        performInsertWithIndexedValueOver64k("INSERT INTO %s.pk_index_table (a, b, c) VALUES (?, 0, 0)");
+    }
+
+    @Test
+    public void testCompactTableWithValueOver64k() throws Throwable
+    {
+        executeSchemaChange("CREATE TABLE %s.compact_table (a int, b blob, PRIMARY KEY (a)) WITH COMPACT STORAGE");
+        executeSchemaChange("CREATE INDEX ON %s.compact_table(b)");
+        performInsertWithIndexedValueOver64k("INSERT INTO %s.compact_table (a, b) VALUES (0, ?)");
+    }
+
+    private static void performInsertWithIndexedValueOver64k(String insertCQL) throws Exception
+    {
+        ByteBuffer buf = ByteBuffer.allocate(1024 * 65);
+        buf.clear();
+        for (int i=0; i<1024 + 1; i++)
+            buf.put((byte)0);
+
+        try
+        {
+            execute(String.format(insertCQL, keyspace), buf);
+            fail("Expected statement to fail validation");
+        }
+        catch (InvalidRequestException e)
+        {
+            // as expected
+        }
+    }
+
+    private static void execute(String query, ByteBuffer value) throws RequestValidationException, RequestExecutionException
+    {
+        MD5Digest statementId = QueryProcessor.prepare(String.format(query, keyspace), clientState, false).statementId;
+        CQLStatement statement = QueryProcessor.instance.getPrepared(statementId);
+        statement.executeInternal(QueryState.forInternalCalls(),
+                                  new QueryOptions(ConsistencyLevel.ONE, Collections.singletonList(value)));
+    }
+
+    private static void executeSchemaChange(String query) throws Throwable
+    {
+        try
+        {
+            process(String.format(query, keyspace), ConsistencyLevel.ONE);
+        }
+        catch (RuntimeException exc)
+        {
+            throw exc.getCause();
+        }
+    }
+}
+