You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2017/08/08 14:27:37 UTC

[1/8] cassandra git commit: Fix ColumnMetadata.cellValueType() return type and change sstabledump tool to use type.toJsonString()

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 918667929 -> 396026047
  refs/heads/cassandra-3.11 15abe2db9 -> 47a2839bf
  refs/heads/trunk 6dfd11c30 -> cad941653


Fix ColumnMetadata.cellValueType() return type and change sstabledump tool to use type.toJsonString()

patch by Zhao Yang, reviewed by Andres de la Peña for CASSANDRA-13573


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/39602604
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/39602604
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/39602604

Branch: refs/heads/cassandra-3.0
Commit: 3960260472fcd4e0243f62cc813992f1365197c6
Parents: 9186679
Author: Zhao Yang <zh...@gmail.com>
Authored: Wed Aug 2 11:58:38 2017 +0800
Committer: Andrés de la Peña <a....@gmail.com>
Committed: Tue Aug 8 14:31:23 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/config/ColumnDefinition.java      | 19 ++++++++---
 .../org/apache/cassandra/db/rows/BTreeRow.java  |  2 +-
 .../apache/cassandra/db/rows/BufferCell.java    |  2 +-
 .../apache/cassandra/tools/JsonTransformer.java |  6 ++--
 .../org/apache/cassandra/cql3/ViewTest.java     | 33 ++++++++++++++++++++
 6 files changed, 55 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/39602604/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 905a436..1525289 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.15
+ * Fix ColumnDefinition.cellValueType() for non-frozen collection and change SSTabledump to use type.toJSONString() (CASSANDRA-13573)
  * Skip materialized view addition if the base table doesn't exist (CASSANDRA-13737)
  * Drop table should remove corresponding entries in dropped_columns table (CASSANDRA-13730)
  * Log warn message until legacy auth tables have been migrated (CASSANDRA-13371)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39602604/src/java/org/apache/cassandra/config/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java b/src/java/org/apache/cassandra/config/ColumnDefinition.java
index 34840e3..6a0f530 100644
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@ -391,13 +391,24 @@ public class ColumnDefinition extends ColumnSpecification implements Comparable<
     /**
      * The type of the cell values for cell belonging to this column.
      *
-     * This is the same than the column type, except for collections where it's the 'valueComparator'
+     * This is the same than the column type, except for non-frozen collections where it's the 'valueComparator'
      * of the collection.
+     * 
+     * This method should not be used to get value type of non-frozon UDT.
      */
     public AbstractType<?> cellValueType()
     {
-        return type instanceof CollectionType
-             ? ((CollectionType)type).valueComparator()
-             : type;
+        assert !(type instanceof UserType && type.isMultiCell());
+        return type instanceof CollectionType && type.isMultiCell()
+                ? ((CollectionType)type).valueComparator()
+                : type;
+    }
+
+
+    public boolean isCounterColumn()
+    {
+        if (type instanceof CollectionType) // for thrift
+            return ((CollectionType) type).valueComparator().isCounter();
+        return type.isCounter();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39602604/src/java/org/apache/cassandra/db/rows/BTreeRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index fda33d6..41dad0a 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -294,7 +294,7 @@ public class BTreeRow extends AbstractRow
 
     public Row markCounterLocalToBeCleared()
     {
-        return transformAndFilter(primaryKeyLivenessInfo, deletion, (cd) -> cd.column().cellValueType().isCounter()
+        return transformAndFilter(primaryKeyLivenessInfo, deletion, (cd) -> cd.column().isCounterColumn()
                                                                             ? cd.markCounterLocalToBeCleared()
                                                                             : cd);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39602604/src/java/org/apache/cassandra/db/rows/BufferCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BufferCell.java b/src/java/org/apache/cassandra/db/rows/BufferCell.java
index e4ad7e6..82ae02c 100644
--- a/src/java/org/apache/cassandra/db/rows/BufferCell.java
+++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java
@@ -89,7 +89,7 @@ public class BufferCell extends AbstractCell
 
     public boolean isCounterCell()
     {
-        return !isTombstone() && column.cellValueType().isCounter();
+        return !isTombstone() && column.isCounterColumn();
     }
 
     public boolean isLive(int nowInSec)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39602604/src/java/org/apache/cassandra/tools/JsonTransformer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/JsonTransformer.java b/src/java/org/apache/cassandra/tools/JsonTransformer.java
index 0a72583..5c32035 100644
--- a/src/java/org/apache/cassandra/tools/JsonTransformer.java
+++ b/src/java/org/apache/cassandra/tools/JsonTransformer.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.db.RangeTombstone;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.ColumnData;
 import org.apache.cassandra.db.rows.ComplexColumnData;
@@ -49,6 +50,7 @@ import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.Unfiltered;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.transport.Server;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonGenerator;
@@ -411,7 +413,7 @@ public final class JsonTransformer
             AbstractType<?> type = cell.column().type;
             json.writeString(cell.column().name.toCQLString());
 
-            if (cell.path() != null && cell.path().size() > 0)
+            if (type.isCollection() && type.isMultiCell()) // non-frozen collection
             {
                 CollectionType ct = (CollectionType) type;
                 json.writeFieldName("path");
@@ -437,7 +439,7 @@ public final class JsonTransformer
             else
             {
                 json.writeFieldName("value");
-                json.writeString(cell.column().cellValueType().getString(cell.value()));
+                json.writeRawValue(cell.column().cellValueType().toJSONString(cell.value(), Server.CURRENT_VERSION));
             }
             if (liveInfo.isEmpty() || cell.timestamp() != liveInfo.timestamp())
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39602604/test/unit/org/apache/cassandra/cql3/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/ViewTest.java b/test/unit/org/apache/cassandra/cql3/ViewTest.java
index e595ebd..f8f8c9f 100644
--- a/test/unit/org/apache/cassandra/cql3/ViewTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java
@@ -1262,4 +1262,37 @@ public class ViewTest extends CQLTester
 
         assertRows(execute("SELECT count(*) FROM mv_test"), row(1024L));
     }
+
+    @Test
+    public void testFrozenCollectionsWithComplicatedInnerType() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int, intval int,  listval frozen<list<tuple<text,text>>>, PRIMARY KEY (k))");
+
+        execute("USE " + keyspace());
+        executeNet(protocolVersion, "USE " + keyspace());
+
+        createView("mv",
+                   "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE k IS NOT NULL AND listval IS NOT NULL PRIMARY KEY (k, listval)");
+
+        updateView("INSERT INTO %s (k, intval, listval) VALUES (?, ?, fromJson(?))",
+                   0,
+                   0,
+                   "[[\"a\", \"1\"], [\"b\", \"2\"], [\"c\", \"3\"]]");
+
+        // verify input
+        assertRows(execute("SELECT k, toJson(listval) FROM %s WHERE k = ?", 0),
+                   row(0, "[[\"a\", \"1\"], [\"b\", \"2\"], [\"c\", \"3\"]]"));
+        assertRows(execute("SELECT k, toJson(listval) from mv"),
+                   row(0, "[[\"a\", \"1\"], [\"b\", \"2\"], [\"c\", \"3\"]]"));
+
+        // update listval with the same value and it will be compared in view generator
+        updateView("INSERT INTO %s (k, listval) VALUES (?, fromJson(?))",
+                   0,
+                   "[[\"a\", \"1\"], [\"b\", \"2\"], [\"c\", \"3\"]]");
+        // verify result
+        assertRows(execute("SELECT k, toJson(listval) FROM %s WHERE k = ?", 0),
+                   row(0, "[[\"a\", \"1\"], [\"b\", \"2\"], [\"c\", \"3\"]]"));
+        assertRows(execute("SELECT k, toJson(listval) from mv"),
+                   row(0, "[[\"a\", \"1\"], [\"b\", \"2\"], [\"c\", \"3\"]]"));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[6/8] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by ad...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a2839b/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
index a6ce08b,0000000..e9051b4
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
@@@ -1,2552 -1,0 +1,2566 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.index.sasi;
 +
 +import java.io.FileWriter;
 +import java.io.Writer;
 +import java.nio.ByteBuffer;
 +import java.nio.file.FileSystems;
 +import java.nio.file.Files;
 +import java.nio.file.Path;
 +import java.nio.file.attribute.BasicFileAttributes;
 +import java.util.*;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.ThreadLocalRandom;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.index.Index;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.cql3.*;
 +import org.apache.cassandra.cql3.Term;
 +import org.apache.cassandra.cql3.statements.IndexTarget;
 +import org.apache.cassandra.cql3.statements.SelectStatement;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.filter.ColumnFilter;
 +import org.apache.cassandra.db.filter.DataLimits;
 +import org.apache.cassandra.db.filter.RowFilter;
 +import org.apache.cassandra.db.lifecycle.SSTableSet;
 +import org.apache.cassandra.db.marshal.*;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.dht.Murmur3Partitioner;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.index.sasi.conf.ColumnIndex;
 +import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
 +import org.apache.cassandra.index.sasi.exceptions.TimeQuotaExceededException;
 +import org.apache.cassandra.index.sasi.memory.IndexMemtable;
 +import org.apache.cassandra.index.sasi.plan.QueryController;
 +import org.apache.cassandra.index.sasi.plan.QueryPlan;
 +import org.apache.cassandra.io.sstable.SSTable;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.schema.KeyspaceMetadata;
 +import org.apache.cassandra.schema.KeyspaceParams;
 +import org.apache.cassandra.schema.Tables;
 +import org.apache.cassandra.serializers.MarshalException;
 +import org.apache.cassandra.serializers.TypeSerializer;
 +import org.apache.cassandra.service.MigrationManager;
 +import org.apache.cassandra.service.QueryState;
 +import org.apache.cassandra.thrift.CqlRow;
 +import org.apache.cassandra.transport.messages.ResultMessage;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.Pair;
 +
 +import com.google.common.collect.Lists;
 +import com.google.common.util.concurrent.Uninterruptibles;
 +
 +import junit.framework.Assert;
 +
 +import org.junit.*;
 +
 +public class SASIIndexTest
 +{
 +    private static final IPartitioner PARTITIONER;
 +
 +    static {
 +        System.setProperty("cassandra.config", "cassandra-murmur.yaml");
 +        PARTITIONER = Murmur3Partitioner.instance;
 +    }
 +
 +    private static final String KS_NAME = "sasi";
 +    private static final String CF_NAME = "test_cf";
 +    private static final String CLUSTERING_CF_NAME_1 = "clustering_test_cf_1";
 +    private static final String CLUSTERING_CF_NAME_2 = "clustering_test_cf_2";
 +    private static final String STATIC_CF_NAME = "static_sasi_test_cf";
 +    private static final String FTS_CF_NAME = "full_text_search_sasi_test_cf";
 +
 +    @BeforeClass
 +    public static void loadSchema() throws ConfigurationException
 +    {
 +        SchemaLoader.loadSchema();
 +        MigrationManager.announceNewKeyspace(KeyspaceMetadata.create(KS_NAME,
 +                                                                     KeyspaceParams.simpleTransient(1),
 +                                                                     Tables.of(SchemaLoader.sasiCFMD(KS_NAME, CF_NAME),
 +                                                                               SchemaLoader.clusteringSASICFMD(KS_NAME, CLUSTERING_CF_NAME_1),
 +                                                                               SchemaLoader.clusteringSASICFMD(KS_NAME, CLUSTERING_CF_NAME_2, "location"),
 +                                                                               SchemaLoader.staticSASICFMD(KS_NAME, STATIC_CF_NAME),
 +                                                                               SchemaLoader.fullTextSearchSASICFMD(KS_NAME, FTS_CF_NAME))));
 +    }
 +
 +    @Before
 +    public void cleanUp()
 +    {
 +        Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME).truncateBlocking();
 +    }
 +
 +    @Test
 +    public void testSingleExpressionQueries() throws Exception
 +    {
 +        testSingleExpressionQueries(false);
 +        cleanupData();
 +        testSingleExpressionQueries(true);
 +    }
 +
 +    private void testSingleExpressionQueries(boolean forceFlush) throws Exception
 +    {
 +        Map<String, Pair<String, Integer>> data = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +            put("key1", Pair.create("Pavel", 14));
 +            put("key2", Pair.create("Pavel", 26));
 +            put("key3", Pair.create("Pavel", 27));
 +            put("key4", Pair.create("Jason", 27));
 +        }};
 +
 +        ColumnFamilyStore store = loadData(data, forceFlush);
 +
 +        final ByteBuffer firstName = UTF8Type.instance.decompose("first_name");
 +        final ByteBuffer age = UTF8Type.instance.decompose("age");
 +
 +        Set<String> rows;
 +
 +        rows = getIndexed(store, 10, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1", "key2", "key3", "key4" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("av")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1", "key2", "key3" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("as")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key4" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("aw")));
 +        Assert.assertEquals(rows.toString(), 0, rows.size());
 +
 +        rows = getIndexed(store, 10, buildExpression(firstName, Operator.LIKE_SUFFIX, UTF8Type.instance.decompose("avel")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1", "key2", "key3" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(firstName, Operator.LIKE_SUFFIX, UTF8Type.instance.decompose("n")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key4" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(age, Operator.EQ, Int32Type.instance.decompose(27)));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[]{"key3", "key4"}, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(age, Operator.EQ, Int32Type.instance.decompose(26)));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(age, Operator.EQ, Int32Type.instance.decompose(13)));
 +        Assert.assertEquals(rows.toString(), 0, rows.size());
 +    }
 +
 +    @Test
 +    public void testEmptyTokenizedResults() throws Exception
 +    {
 +        testEmptyTokenizedResults(false);
 +        cleanupData();
 +        testEmptyTokenizedResults(true);
 +    }
 +
 +    private void testEmptyTokenizedResults(boolean forceFlush) throws Exception
 +    {
 +        Map<String, Pair<String, Integer>> data = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key1", Pair.create("  ", 14));
 +        }};
 +
 +        ColumnFamilyStore store = loadData(data, forceFlush);
 +
 +        Set<String> rows= getIndexed(store, 10, buildExpression(UTF8Type.instance.decompose("first_name"), Operator.LIKE_MATCHES, UTF8Type.instance.decompose("doesntmatter")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[]{}, rows.toArray(new String[rows.size()])));
 +    }
 +
 +    @Test
 +    public void testMultiExpressionQueries() throws Exception
 +    {
 +        testMultiExpressionQueries(false);
 +        cleanupData();
 +        testMultiExpressionQueries(true);
 +    }
 +
 +    public void testMultiExpressionQueries(boolean forceFlush) throws Exception
 +    {
 +        Map<String, Pair<String, Integer>> data = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key1", Pair.create("Pavel", 14));
 +                put("key2", Pair.create("Pavel", 26));
 +                put("key3", Pair.create("Pavel", 27));
 +                put("key4", Pair.create("Jason", 27));
 +        }};
 +
 +        ColumnFamilyStore store = loadData(data, forceFlush);
 +
 +        final ByteBuffer firstName = UTF8Type.instance.decompose("first_name");
 +        final ByteBuffer age = UTF8Type.instance.decompose("age");
 +
 +        Set<String> rows;
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                          buildExpression(age, Operator.GT, Int32Type.instance.decompose(14)));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2", "key3", "key4" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                          buildExpression(age, Operator.LT, Int32Type.instance.decompose(27)));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[]{"key1", "key2"}, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                         buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                         buildExpression(age, Operator.GT, Int32Type.instance.decompose(14)),
 +                         buildExpression(age, Operator.LT, Int32Type.instance.decompose(27)));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                         buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                         buildExpression(age, Operator.GT, Int32Type.instance.decompose(12)));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[]{ "key1", "key2", "key3", "key4" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                         buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                         buildExpression(age, Operator.GTE, Int32Type.instance.decompose(13)));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1", "key2", "key3", "key4" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                         buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                         buildExpression(age, Operator.GTE, Int32Type.instance.decompose(16)));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2", "key3", "key4" }, rows.toArray(new String[rows.size()])));
 +
 +
 +        rows = getIndexed(store, 10,
 +                         buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                         buildExpression(age, Operator.LT, Int32Type.instance.decompose(30)));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1", "key2", "key3", "key4" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                         buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                         buildExpression(age, Operator.LTE, Int32Type.instance.decompose(29)));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1", "key2", "key3", "key4" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                         buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                         buildExpression(age, Operator.LTE, Int32Type.instance.decompose(25)));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[]{ "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(firstName, Operator.LIKE_SUFFIX, UTF8Type.instance.decompose("avel")),
 +                                     buildExpression(age, Operator.LTE, Int32Type.instance.decompose(25)));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(firstName, Operator.LIKE_SUFFIX, UTF8Type.instance.decompose("n")),
 +                                     buildExpression(age, Operator.LTE, Int32Type.instance.decompose(25)));
 +        Assert.assertTrue(rows.isEmpty());
 +
 +    }
 +
 +    @Test
 +    public void testCrossSSTableQueries() throws Exception
 +    {
 +        testCrossSSTableQueries(false);
 +        cleanupData();
 +        testCrossSSTableQueries(true);
 +
 +    }
 +
 +    private void testCrossSSTableQueries(boolean forceFlush) throws Exception
 +    {
 +        Map<String, Pair<String, Integer>> part1 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key0", Pair.create("Maxie", 43));
 +                put("key1", Pair.create("Chelsie", 33));
 +                put("key2", Pair.create("Josephine", 43));
 +                put("key3", Pair.create("Shanna", 27));
 +                put("key4", Pair.create("Amiya", 36));
 +            }};
 +
 +        loadData(part1, forceFlush); // first sstable
 +
 +        Map<String, Pair<String, Integer>> part2 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key5", Pair.create("Americo", 20));
 +                put("key6", Pair.create("Fiona", 39));
 +                put("key7", Pair.create("Francis", 41));
 +                put("key8", Pair.create("Charley", 21));
 +                put("key9", Pair.create("Amely", 40));
 +            }};
 +
 +        loadData(part2, forceFlush);
 +
 +        Map<String, Pair<String, Integer>> part3 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key10", Pair.create("Eddie", 42));
 +                put("key11", Pair.create("Oswaldo", 35));
 +                put("key12", Pair.create("Susana", 35));
 +                put("key13", Pair.create("Alivia", 42));
 +                put("key14", Pair.create("Demario", 28));
 +            }};
 +
 +        ColumnFamilyStore store = loadData(part3, forceFlush);
 +
 +        final ByteBuffer firstName = UTF8Type.instance.decompose("first_name");
 +        final ByteBuffer age = UTF8Type.instance.decompose("age");
 +
 +        Set<String> rows;
 +        rows = getIndexed(store, 10, buildExpression(firstName, Operator.EQ, UTF8Type.instance.decompose("Fiona")),
 +                                     buildExpression(age, Operator.LT, Int32Type.instance.decompose(40)));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key6" }, rows.toArray(new String[rows.size()])));
 +
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key0", "key11", "key12", "key13", "key14",
 +                                                                        "key3", "key4", "key6", "key7", "key8" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 5,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +
 +        Assert.assertEquals(rows.toString(), 5, rows.size());
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                          buildExpression(age, Operator.GTE, Int32Type.instance.decompose(35)));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key0", "key11", "key12", "key13", "key4", "key6", "key7" },
 +                                                         rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                          buildExpression(age, Operator.LT, Int32Type.instance.decompose(32)));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key14", "key3", "key8" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                          buildExpression(age, Operator.GT, Int32Type.instance.decompose(27)),
 +                          buildExpression(age, Operator.LT, Int32Type.instance.decompose(32)));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key14" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                          buildExpression(age, Operator.GT, Int32Type.instance.decompose(10)));
 +
 +        Assert.assertEquals(rows.toString(), 10, rows.size());
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                          buildExpression(age, Operator.LTE, Int32Type.instance.decompose(50)));
 +
 +        Assert.assertEquals(rows.toString(), 10, rows.size());
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_SUFFIX, UTF8Type.instance.decompose("ie")),
 +                          buildExpression(age, Operator.LT, Int32Type.instance.decompose(43)));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1", "key10" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_SUFFIX, UTF8Type.instance.decompose("a")));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key12", "key13", "key3", "key4", "key6" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_SUFFIX, UTF8Type.instance.decompose("a")),
 +                          buildExpression(age, Operator.LT, Int32Type.instance.decompose(33)));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key3" }, rows.toArray(new String[rows.size()])));
 +    }
 +
 +    @Test
 +    public void testQueriesThatShouldBeTokenized() throws Exception
 +    {
 +        testQueriesThatShouldBeTokenized(false);
 +        cleanupData();
 +        testQueriesThatShouldBeTokenized(true);
 +    }
 +
 +    private void testQueriesThatShouldBeTokenized(boolean forceFlush) throws Exception
 +    {
 +        Map<String, Pair<String, Integer>> part1 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key0", Pair.create("If you can dream it, you can do it.", 43));
 +                put("key1", Pair.create("What you get by achieving your goals is not " +
 +                        "as important as what you become by achieving your goals, do it.", 33));
 +                put("key2", Pair.create("Keep your face always toward the sunshine " +
 +                        "- and shadows will fall behind you.", 43));
 +                put("key3", Pair.create("We can't help everyone, but everyone can " +
 +                        "help someone.", 27));
 +            }};
 +
 +        ColumnFamilyStore store = loadData(part1, forceFlush);
 +
 +        final ByteBuffer firstName = UTF8Type.instance.decompose("first_name");
 +        final ByteBuffer age = UTF8Type.instance.decompose("age");
 +
 +        Set<String> rows = getIndexed(store, 10,
 +                buildExpression(firstName, Operator.LIKE_CONTAINS,
 +                        UTF8Type.instance.decompose("What you get by achieving your goals")),
 +                buildExpression(age, Operator.GT, Int32Type.instance.decompose(32)));
 +
 +        Assert.assertEquals(rows.toString(), Collections.singleton("key1"), rows);
 +
 +        rows = getIndexed(store, 10,
 +                buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("do it.")));
 +
 +        Assert.assertEquals(rows.toString(), Arrays.asList("key0", "key1"), Lists.newArrayList(rows));
 +    }
 +
 +    @Test
 +    public void testPrefixSearchWithContainsMode() throws Exception
 +    {
 +        testPrefixSearchWithContainsMode(false);
 +        cleanupData();
 +        testPrefixSearchWithContainsMode(true);
 +    }
 +
 +    private void testPrefixSearchWithContainsMode(boolean forceFlush) throws Exception
 +    {
 +        ColumnFamilyStore store = Keyspace.open(KS_NAME).getColumnFamilyStore(FTS_CF_NAME);
 +
 +        executeCQL(FTS_CF_NAME, "INSERT INTO %s.%s (song_id, title, artist) VALUES(?, ?, ?)", UUID.fromString("1a4abbcd-b5de-4c69-a578-31231e01ff09"), "Poker Face", "Lady Gaga");
 +        executeCQL(FTS_CF_NAME, "INSERT INTO %s.%s (song_id, title, artist) VALUES(?, ?, ?)", UUID.fromString("9472a394-359b-4a06-b1d5-b6afce590598"), "Forgetting the Way Home", "Our Lady of Bells");
 +        executeCQL(FTS_CF_NAME, "INSERT INTO %s.%s (song_id, title, artist) VALUES(?, ?, ?)", UUID.fromString("4f8dc18e-54e6-4e16-b507-c5324b61523b"), "Zamki na piasku", "Lady Pank");
 +        executeCQL(FTS_CF_NAME, "INSERT INTO %s.%s (song_id, title, artist) VALUES(?, ?, ?)", UUID.fromString("eaf294fa-bad5-49d4-8f08-35ba3636a706"), "Koncertowa", "Lady Pank");
 +
 +
 +        if (forceFlush)
 +            store.forceBlockingFlush();
 +
 +        final UntypedResultSet results = executeCQL(FTS_CF_NAME, "SELECT * FROM %s.%s WHERE artist LIKE 'lady%%'");
 +        Assert.assertNotNull(results);
 +        Assert.assertEquals(3, results.size());
 +    }
 +
 +    @Test
 +    public void testMultiExpressionQueriesWhereRowSplitBetweenSSTables() throws Exception
 +    {
 +        testMultiExpressionQueriesWhereRowSplitBetweenSSTables(false);
 +        cleanupData();
 +        testMultiExpressionQueriesWhereRowSplitBetweenSSTables(true);
 +    }
 +
 +    private void testMultiExpressionQueriesWhereRowSplitBetweenSSTables(boolean forceFlush) throws Exception
 +    {
 +        Map<String, Pair<String, Integer>> part1 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key0", Pair.create("Maxie", -1));
 +                put("key1", Pair.create("Chelsie", 33));
 +                put("key2", Pair.create((String)null, 43));
 +                put("key3", Pair.create("Shanna", 27));
 +                put("key4", Pair.create("Amiya", 36));
 +        }};
 +
 +        loadData(part1, forceFlush); // first sstable
 +
 +        Map<String, Pair<String, Integer>> part2 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key5", Pair.create("Americo", 20));
 +                put("key6", Pair.create("Fiona", 39));
 +                put("key7", Pair.create("Francis", 41));
 +                put("key8", Pair.create("Charley", 21));
 +                put("key9", Pair.create("Amely", 40));
 +                put("key14", Pair.create((String)null, 28));
 +        }};
 +
 +        loadData(part2, forceFlush);
 +
 +        Map<String, Pair<String, Integer>> part3 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key0", Pair.create((String)null, 43));
 +                put("key10", Pair.create("Eddie", 42));
 +                put("key11", Pair.create("Oswaldo", 35));
 +                put("key12", Pair.create("Susana", 35));
 +                put("key13", Pair.create("Alivia", 42));
 +                put("key14", Pair.create("Demario", -1));
 +                put("key2", Pair.create("Josephine", -1));
 +        }};
 +
 +        ColumnFamilyStore store = loadData(part3, forceFlush);
 +
 +        final ByteBuffer firstName = UTF8Type.instance.decompose("first_name");
 +        final ByteBuffer age = UTF8Type.instance.decompose("age");
 +
 +        Set<String> rows = getIndexed(store, 10,
 +                                      buildExpression(firstName, Operator.EQ, UTF8Type.instance.decompose("Fiona")),
 +                                      buildExpression(age, Operator.LT, Int32Type.instance.decompose(40)));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key6" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key0", "key11", "key12", "key13", "key14",
 +                                                                        "key3", "key4", "key6", "key7", "key8" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 5,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +
 +        Assert.assertEquals(rows.toString(), 5, rows.size());
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                          buildExpression(age, Operator.GTE, Int32Type.instance.decompose(35)));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key0", "key11", "key12", "key13", "key4", "key6", "key7" },
 +                                                         rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                          buildExpression(age, Operator.LT, Int32Type.instance.decompose(32)));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key14", "key3", "key8" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                          buildExpression(age, Operator.GT, Int32Type.instance.decompose(27)),
 +                          buildExpression(age, Operator.LT, Int32Type.instance.decompose(32)));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key14" }, rows.toArray(new String[rows.size()])));
 +
 +        Map<String, Pair<String, Integer>> part4 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key12", Pair.create((String)null, 12));
 +                put("key14", Pair.create("Demario", 42));
 +                put("key2", Pair.create("Frank", -1));
 +        }};
 +
 +        store = loadData(part4, forceFlush);
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_MATCHES, UTF8Type.instance.decompose("Susana")),
 +                          buildExpression(age, Operator.LTE, Int32Type.instance.decompose(13)),
 +                          buildExpression(age, Operator.GT, Int32Type.instance.decompose(10)));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key12" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_MATCHES, UTF8Type.instance.decompose("Demario")),
 +                          buildExpression(age, Operator.LTE, Int32Type.instance.decompose(30)));
 +        Assert.assertTrue(rows.toString(), rows.size() == 0);
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_MATCHES, UTF8Type.instance.decompose("Josephine")));
 +        Assert.assertTrue(rows.toString(), rows.size() == 0);
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                          buildExpression(age, Operator.GT, Int32Type.instance.decompose(10)));
 +
 +        Assert.assertEquals(rows.toString(), 10, rows.size());
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                          buildExpression(age, Operator.LTE, Int32Type.instance.decompose(50)));
 +
 +        Assert.assertEquals(rows.toString(), 10, rows.size());
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_SUFFIX, UTF8Type.instance.decompose("ie")),
 +                          buildExpression(age, Operator.LTE, Int32Type.instance.decompose(43)));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key0", "key1", "key10" }, rows.toArray(new String[rows.size()])));
 +    }
 +
 +    @Test
 +    public void testPagination() throws Exception
 +    {
 +        testPagination(false);
 +        cleanupData();
 +        testPagination(true);
 +    }
 +
 +    private void testPagination(boolean forceFlush) throws Exception
 +    {
 +        // split data into 3 distinct SSTables to test paging with overlapping token intervals.
 +
 +        Map<String, Pair<String, Integer>> part1 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key01", Pair.create("Ali", 33));
 +                put("key02", Pair.create("Jeremy", 41));
 +                put("key03", Pair.create("Elvera", 22));
 +                put("key04", Pair.create("Bailey", 45));
 +                put("key05", Pair.create("Emerson", 32));
 +                put("key06", Pair.create("Kadin", 38));
 +                put("key07", Pair.create("Maggie", 36));
 +                put("key08", Pair.create("Kailey", 36));
 +                put("key09", Pair.create("Armand", 21));
 +                put("key10", Pair.create("Arnold", 35));
 +        }};
 +
 +        Map<String, Pair<String, Integer>> part2 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key11", Pair.create("Ken", 38));
 +                put("key12", Pair.create("Penelope", 43));
 +                put("key13", Pair.create("Wyatt", 34));
 +                put("key14", Pair.create("Johnpaul", 34));
 +                put("key15", Pair.create("Trycia", 43));
 +                put("key16", Pair.create("Aida", 21));
 +                put("key17", Pair.create("Devon", 42));
 +        }};
 +
 +        Map<String, Pair<String, Integer>> part3 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key18", Pair.create("Christina", 20));
 +                put("key19", Pair.create("Rick", 19));
 +                put("key20", Pair.create("Fannie", 22));
 +                put("key21", Pair.create("Keegan", 29));
 +                put("key22", Pair.create("Ignatius", 36));
 +                put("key23", Pair.create("Ellis", 26));
 +                put("key24", Pair.create("Annamarie", 29));
 +                put("key25", Pair.create("Tianna", 31));
 +                put("key26", Pair.create("Dennis", 32));
 +        }};
 +
 +        ColumnFamilyStore store = loadData(part1, forceFlush);
 +
 +        loadData(part2, forceFlush);
 +        loadData(part3, forceFlush);
 +
 +        final ByteBuffer firstName = UTF8Type.instance.decompose("first_name");
 +        final ByteBuffer age = UTF8Type.instance.decompose("age");
 +
 +        Set<DecoratedKey> uniqueKeys = getPaged(store, 4,
 +                buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                buildExpression(age, Operator.GTE, Int32Type.instance.decompose(21)));
 +
 +
 +        List<String> expected = new ArrayList<String>()
 +        {{
 +                add("key25");
 +                add("key20");
 +                add("key13");
 +                add("key22");
 +                add("key09");
 +                add("key14");
 +                add("key16");
 +                add("key24");
 +                add("key03");
 +                add("key04");
 +                add("key08");
 +                add("key07");
 +                add("key15");
 +                add("key06");
 +                add("key21");
 +        }};
 +
 +        Assert.assertEquals(expected, convert(uniqueKeys));
 +
 +        // now let's test a single equals condition
 +
 +        uniqueKeys = getPaged(store, 4, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +
 +        expected = new ArrayList<String>()
 +        {{
 +                add("key25");
 +                add("key20");
 +                add("key13");
 +                add("key22");
 +                add("key09");
 +                add("key14");
 +                add("key16");
 +                add("key24");
 +                add("key03");
 +                add("key04");
 +                add("key18");
 +                add("key08");
 +                add("key07");
 +                add("key15");
 +                add("key06");
 +                add("key21");
 +        }};
 +
 +        Assert.assertEquals(expected, convert(uniqueKeys));
 +
 +        // now let's test something which is smaller than a single page
 +        uniqueKeys = getPaged(store, 4,
 +                              buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                              buildExpression(age, Operator.EQ, Int32Type.instance.decompose(36)));
 +
 +        expected = new ArrayList<String>()
 +        {{
 +                add("key22");
 +                add("key08");
 +                add("key07");
 +        }};
 +
 +        Assert.assertEquals(expected, convert(uniqueKeys));
 +
 +        // the same but with the page size of 2 to test minimal pagination windows
 +
 +        uniqueKeys = getPaged(store, 2,
 +                              buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                              buildExpression(age, Operator.EQ, Int32Type.instance.decompose(36)));
 +
 +        Assert.assertEquals(expected, convert(uniqueKeys));
 +
 +        // and last but not least, test age range query with pagination
 +        uniqueKeys = getPaged(store, 4,
 +                buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                buildExpression(age, Operator.GT, Int32Type.instance.decompose(20)),
 +                buildExpression(age, Operator.LTE, Int32Type.instance.decompose(36)));
 +
 +        expected = new ArrayList<String>()
 +        {{
 +                add("key25");
 +                add("key20");
 +                add("key13");
 +                add("key22");
 +                add("key09");
 +                add("key14");
 +                add("key16");
 +                add("key24");
 +                add("key03");
 +                add("key08");
 +                add("key07");
 +                add("key21");
 +        }};
 +
 +        Assert.assertEquals(expected, convert(uniqueKeys));
 +
 +        Set<String> rows;
 +
 +        rows = executeCQLWithKeys(String.format("SELECT * FROM %s.%s WHERE first_name LIKE '%%a%%' limit 10 ALLOW FILTERING;", KS_NAME, CF_NAME));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key03", "key04", "key09", "key13", "key14", "key16", "key20", "key22", "key24", "key25" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = executeCQLWithKeys(String.format("SELECT * FROM %s.%s WHERE first_name LIKE '%%a%%' and token(id) >= token('key14') limit 5 ALLOW FILTERING;", KS_NAME, CF_NAME));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key03", "key04", "key14", "key16", "key24" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = executeCQLWithKeys(String.format("SELECT * FROM %s.%s WHERE first_name LIKE '%%a%%' and token(id) >= token('key14') and token(id) <= token('key24') limit 5 ALLOW FILTERING;", KS_NAME, CF_NAME));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key14", "key16", "key24" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = executeCQLWithKeys(String.format("SELECT * FROM %s.%s WHERE first_name LIKE '%%a%%' and age > 30 and token(id) >= token('key14') and token(id) <= token('key24') limit 5 ALLOW FILTERING;", KS_NAME, CF_NAME));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key14" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = executeCQLWithKeys(String.format("SELECT * FROM %s.%s WHERE first_name like '%%ie' limit 5 ALLOW FILTERING;", KS_NAME, CF_NAME));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key07", "key20", "key24" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = executeCQLWithKeys(String.format("SELECT * FROM %s.%s WHERE first_name like '%%ie' AND token(id) > token('key24') limit 5 ALLOW FILTERING;", KS_NAME, CF_NAME));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key07", "key24" }, rows.toArray(new String[rows.size()])));
 +    }
 +
 +    @Test
 +    public void testColumnNamesWithSlashes() throws Exception
 +    {
 +        testColumnNamesWithSlashes(false);
 +        cleanupData();
 +        testColumnNamesWithSlashes(true);
 +    }
 +
 +    private void testColumnNamesWithSlashes(boolean forceFlush) throws Exception
 +    {
 +        ColumnFamilyStore store = Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME);
 +
 +        Mutation rm1 = new Mutation(KS_NAME, decoratedKey(AsciiType.instance.decompose("key1")));
 +        rm1.add(PartitionUpdate.singleRowUpdate(store.metadata,
 +                                                rm1.key(),
 +                                                buildRow(buildCell(store.metadata,
 +                                                                   UTF8Type.instance.decompose("/data/output/id"),
 +                                                                   AsciiType.instance.decompose("jason"),
 +                                                                   System.currentTimeMillis()))));
 +
 +        Mutation rm2 = new Mutation(KS_NAME, decoratedKey(AsciiType.instance.decompose("key2")));
 +        rm2.add(PartitionUpdate.singleRowUpdate(store.metadata,
 +                                                rm2.key(),
 +                                                buildRow(buildCell(store.metadata,
 +                                                                   UTF8Type.instance.decompose("/data/output/id"),
 +                                                                   AsciiType.instance.decompose("pavel"),
 +                                                                   System.currentTimeMillis()))));
 +
 +        Mutation rm3 = new Mutation(KS_NAME, decoratedKey(AsciiType.instance.decompose("key3")));
 +        rm3.add(PartitionUpdate.singleRowUpdate(store.metadata,
 +                                                rm3.key(),
 +                                                buildRow(buildCell(store.metadata,
 +                                                                   UTF8Type.instance.decompose("/data/output/id"),
 +                                                                   AsciiType.instance.decompose("Aleksey"),
 +                                                                   System.currentTimeMillis()))));
 +
 +        rm1.apply();
 +        rm2.apply();
 +        rm3.apply();
 +
 +        if (forceFlush)
 +            store.forceBlockingFlush();
 +
 +        final ByteBuffer dataOutputId = UTF8Type.instance.decompose("/data/output/id");
 +
 +        Set<String> rows = getIndexed(store, 10, buildExpression(dataOutputId, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1", "key2" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(dataOutputId, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("A")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[]{ "key3" }, rows.toArray(new String[rows.size()])));
 +
 +        // doesn't really make sense to rebuild index for in-memory data
 +        if (!forceFlush)
 +            return;
 +
 +        store.indexManager.invalidateAllIndexesBlocking();
 +
 +        rows = getIndexed(store, 10, buildExpression(dataOutputId, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +        Assert.assertTrue(rows.toString(), rows.isEmpty());
 +
 +        rows = getIndexed(store, 10, buildExpression(dataOutputId, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("A")));
 +        Assert.assertTrue(rows.toString(), rows.isEmpty());
 +
 +        // now let's trigger index rebuild and check if we got the data back
 +        store.indexManager.buildIndexBlocking(store.indexManager.getIndexByName("data_output_id"));
 +
 +        rows = getIndexed(store, 10, buildExpression(dataOutputId, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1", "key2" }, rows.toArray(new String[rows.size()])));
 +
 +        // also let's try to build an index for column which has no data to make sure that doesn't fail
 +        store.indexManager.buildIndexBlocking(store.indexManager.getIndexByName("first_name"));
 +        store.indexManager.buildIndexBlocking(store.indexManager.getIndexByName("data_output_id"));
 +
 +        rows = getIndexed(store, 10, buildExpression(dataOutputId, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1", "key2" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(dataOutputId, Operator.LIKE_SUFFIX, UTF8Type.instance.decompose("el")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2" }, rows.toArray(new String[rows.size()])));
 +    }
 +
 +    @Test
 +    public void testInvalidate() throws Exception
 +    {
 +        testInvalidate(false);
 +        cleanupData();
 +        testInvalidate(true);
 +    }
 +
 +    private void testInvalidate(boolean forceFlush) throws Exception
 +    {
 +        Map<String, Pair<String, Integer>> part1 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key0", Pair.create("Maxie", -1));
 +                put("key1", Pair.create("Chelsie", 33));
 +                put("key2", Pair.create((String) null, 43));
 +                put("key3", Pair.create("Shanna", 27));
 +                put("key4", Pair.create("Amiya", 36));
 +        }};
 +
 +        ColumnFamilyStore store = loadData(part1, forceFlush);
 +
 +        final ByteBuffer firstName = UTF8Type.instance.decompose("first_name");
 +        final ByteBuffer age = UTF8Type.instance.decompose("age");
 +
 +        Set<String> rows = getIndexed(store, 10, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[]{ "key0", "key3", "key4" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(age, Operator.EQ, Int32Type.instance.decompose(33)));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[]{ "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        store.indexManager.invalidateAllIndexesBlocking();
 +
 +        rows = getIndexed(store, 10, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +        Assert.assertTrue(rows.toString(), rows.isEmpty());
 +
 +        rows = getIndexed(store, 10, buildExpression(age, Operator.EQ, Int32Type.instance.decompose(33)));
 +        Assert.assertTrue(rows.toString(), rows.isEmpty());
 +
 +
 +        Map<String, Pair<String, Integer>> part2 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key5", Pair.create("Americo", 20));
 +                put("key6", Pair.create("Fiona", 39));
 +                put("key7", Pair.create("Francis", 41));
 +                put("key8", Pair.create("Fred", 21));
 +                put("key9", Pair.create("Amely", 40));
 +                put("key14", Pair.create("Dino", 28));
 +        }};
 +
 +        loadData(part2, forceFlush);
 +
 +        rows = getIndexed(store, 10, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[]{ "key6", "key7" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(age, Operator.EQ, Int32Type.instance.decompose(40)));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[]{ "key9" }, rows.toArray(new String[rows.size()])));
 +    }
 +
 +    @Test
 +    public void testTruncate()
 +    {
 +        Map<String, Pair<String, Integer>> part1 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key01", Pair.create("Ali", 33));
 +                put("key02", Pair.create("Jeremy", 41));
 +                put("key03", Pair.create("Elvera", 22));
 +                put("key04", Pair.create("Bailey", 45));
 +                put("key05", Pair.create("Emerson", 32));
 +                put("key06", Pair.create("Kadin", 38));
 +                put("key07", Pair.create("Maggie", 36));
 +                put("key08", Pair.create("Kailey", 36));
 +                put("key09", Pair.create("Armand", 21));
 +                put("key10", Pair.create("Arnold", 35));
 +        }};
 +
 +        Map<String, Pair<String, Integer>> part2 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key11", Pair.create("Ken", 38));
 +                put("key12", Pair.create("Penelope", 43));
 +                put("key13", Pair.create("Wyatt", 34));
 +                put("key14", Pair.create("Johnpaul", 34));
 +                put("key15", Pair.create("Trycia", 43));
 +                put("key16", Pair.create("Aida", 21));
 +                put("key17", Pair.create("Devon", 42));
 +        }};
 +
 +        Map<String, Pair<String, Integer>> part3 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key18", Pair.create("Christina", 20));
 +                put("key19", Pair.create("Rick", 19));
 +                put("key20", Pair.create("Fannie", 22));
 +                put("key21", Pair.create("Keegan", 29));
 +                put("key22", Pair.create("Ignatius", 36));
 +                put("key23", Pair.create("Ellis", 26));
 +                put("key24", Pair.create("Annamarie", 29));
 +                put("key25", Pair.create("Tianna", 31));
 +                put("key26", Pair.create("Dennis", 32));
 +        }};
 +
 +        ColumnFamilyStore store = loadData(part1, 1000, true);
 +
 +        loadData(part2, 2000, true);
 +        loadData(part3, 3000, true);
 +
 +        final ByteBuffer firstName = UTF8Type.instance.decompose("first_name");
 +
 +        Set<String> rows = getIndexed(store, 100, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +        Assert.assertEquals(rows.toString(), 16, rows.size());
 +
 +        // make sure we don't prematurely delete anything
 +        store.indexManager.truncateAllIndexesBlocking(500);
 +
 +        rows = getIndexed(store, 100, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +        Assert.assertEquals(rows.toString(), 16, rows.size());
 +
 +        store.indexManager.truncateAllIndexesBlocking(1500);
 +
 +        rows = getIndexed(store, 100, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +        Assert.assertEquals(rows.toString(), 10, rows.size());
 +
 +        store.indexManager.truncateAllIndexesBlocking(2500);
 +
 +        rows = getIndexed(store, 100, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +        Assert.assertEquals(rows.toString(), 6, rows.size());
 +
 +        store.indexManager.truncateAllIndexesBlocking(3500);
 +
 +        rows = getIndexed(store, 100, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +        Assert.assertEquals(rows.toString(), 0, rows.size());
 +
 +        // add back in some data just to make sure it all still works
 +        Map<String, Pair<String, Integer>> part4 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key40", Pair.create("Tianna", 31));
 +                put("key41", Pair.create("Dennis", 32));
 +        }};
 +
 +        loadData(part4, 4000, true);
 +
 +        rows = getIndexed(store, 100, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +        Assert.assertEquals(rows.toString(), 1, rows.size());
 +    }
 +
 +
 +    @Test
 +    public void testConcurrentMemtableReadsAndWrites() throws Exception
 +    {
 +        final ColumnFamilyStore store = Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME);
 +
 +        ExecutorService scheduler = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
 +
 +        final int writeCount = 10000;
 +        final AtomicInteger updates = new AtomicInteger(0);
 +
 +        for (int i = 0; i < writeCount; i++)
 +        {
 +            final String key = "key" + i;
 +            final String firstName = "first_name#" + i;
 +            final String lastName = "last_name#" + i;
 +
 +            scheduler.submit((Runnable) () -> {
 +                try
 +                {
 +                    newMutation(key, firstName, lastName, 26, System.currentTimeMillis()).apply();
 +                    Uninterruptibles.sleepUninterruptibly(5, TimeUnit.MILLISECONDS); // back up a bit to do more reads
 +                }
 +                finally
 +                {
 +                    updates.incrementAndGet();
 +                }
 +            });
 +        }
 +
 +        final ByteBuffer firstName = UTF8Type.instance.decompose("first_name");
 +        final ByteBuffer age = UTF8Type.instance.decompose("age");
 +
 +        int previousCount = 0;
 +
 +        do
 +        {
 +            // this loop figures out if number of search results monotonically increasing
 +            // to make sure that concurrent updates don't interfere with reads, uses first_name and age
 +            // indexes to test correctness of both Trie and SkipList ColumnIndex implementations.
 +
 +            Set<DecoratedKey> rows = getPaged(store, 100, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                                                          buildExpression(age, Operator.EQ, Int32Type.instance.decompose(26)));
 +
 +            Assert.assertTrue(previousCount <= rows.size());
 +            previousCount = rows.size();
 +        }
 +        while (updates.get() < writeCount);
 +
 +        // to make sure that after all of the right are done we can read all "count" worth of rows
 +        Set<DecoratedKey> rows = getPaged(store, 100, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                                                      buildExpression(age, Operator.EQ, Int32Type.instance.decompose(26)));
 +
 +        Assert.assertEquals(writeCount, rows.size());
 +    }
 +
 +    @Test
 +    public void testSameKeyInMemtableAndSSTables()
 +    {
 +        final ByteBuffer firstName = UTF8Type.instance.decompose("first_name");
 +        final ByteBuffer age = UTF8Type.instance.decompose("age");
 +
 +        Map<String, Pair<String, Integer>> data1 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key1", Pair.create("Pavel", 14));
 +                put("key2", Pair.create("Pavel", 26));
 +                put("key3", Pair.create("Pavel", 27));
 +                put("key4", Pair.create("Jason", 27));
 +        }};
 +
 +        ColumnFamilyStore store = loadData(data1, true);
 +
 +        Map<String, Pair<String, Integer>> data2 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key1", Pair.create("Pavel", 14));
 +                put("key2", Pair.create("Pavel", 27));
 +                put("key4", Pair.create("Jason", 28));
 +        }};
 +
 +        loadData(data2, true);
 +
 +        Map<String, Pair<String, Integer>> data3 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key1", Pair.create("Pavel", 15));
 +                put("key4", Pair.create("Jason", 29));
 +        }};
 +
 +        loadData(data3, false);
 +
 +        Set<String> rows = getIndexed(store, 100, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1", "key2", "key3", "key4" }, rows.toArray(new String[rows.size()])));
 +
 +
 +        rows = getIndexed(store, 100, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                                      buildExpression(age, Operator.EQ, Int32Type.instance.decompose(15)));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 100, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                                      buildExpression(age, Operator.EQ, Int32Type.instance.decompose(29)));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key4" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 100, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                                      buildExpression(age, Operator.EQ, Int32Type.instance.decompose(27)));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[]{"key2", "key3"}, rows.toArray(new String[rows.size()])));
 +    }
 +
 +    @Test
 +    public void testInsertingIncorrectValuesIntoAgeIndex()
 +    {
 +        ColumnFamilyStore store = Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME);
 +
 +        final ByteBuffer firstName = UTF8Type.instance.decompose("first_name");
 +        final ByteBuffer age = UTF8Type.instance.decompose("age");
 +
 +        Mutation rm = new Mutation(KS_NAME, decoratedKey(AsciiType.instance.decompose("key1")));
 +        update(rm, new ArrayList<Cell>()
 +        {{
 +            add(buildCell(age, LongType.instance.decompose(26L), System.currentTimeMillis()));
 +            add(buildCell(firstName, AsciiType.instance.decompose("pavel"), System.currentTimeMillis()));
 +        }});
 +        rm.apply();
 +
 +        store.forceBlockingFlush();
 +
 +        Set<String> rows = getIndexed(store, 10, buildExpression(firstName, Operator.EQ, UTF8Type.instance.decompose("a")),
 +                                                 buildExpression(age, Operator.GTE, Int32Type.instance.decompose(26)));
 +
 +        // index is expected to have 0 results because age value was of wrong type
 +        Assert.assertEquals(0, rows.size());
 +    }
 +
 +
 +    @Test
 +    public void testUnicodeSupport()
 +    {
 +        testUnicodeSupport(false);
 +        cleanupData();
 +        testUnicodeSupport(true);
 +    }
 +
 +    private void testUnicodeSupport(boolean forceFlush)
 +    {
 +        ColumnFamilyStore store = Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME);
 +
 +        final ByteBuffer comment = UTF8Type.instance.decompose("comment");
 +
 +        Mutation rm = new Mutation(KS_NAME, decoratedKey("key1"));
 +        update(rm, comment, UTF8Type.instance.decompose("ⓈⓅⒺⒸⒾⒶⓁ ⒞⒣⒜⒭⒮ and normal ones"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key2"));
 +        update(rm, comment, UTF8Type.instance.decompose("龍馭鬱"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key3"));
 +        update(rm, comment, UTF8Type.instance.decompose("インディアナ"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key4"));
 +        update(rm, comment, UTF8Type.instance.decompose("レストラン"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key5"));
 +        update(rm, comment, UTF8Type.instance.decompose("ベンジャミン ウエスト"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        if (forceFlush)
 +            store.forceBlockingFlush();
 +
 +        Set<String> rows;
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("ⓈⓅⒺⒸⒾ")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("normal")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("龍")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("鬱")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("馭鬱")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("龍馭鬱")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("ベンジャミン")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key5" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("レストラ")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key4" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("インディ")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key3" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("ベンジャミ")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key5" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_SUFFIX, UTF8Type.instance.decompose("ン")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key4", "key5" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_MATCHES, UTF8Type.instance.decompose("レストラン")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key4" }, rows.toArray(new String[rows.size()])));
 +    }
 +
 +    @Test
 +    public void testUnicodeSuffixModeNoSplits()
 +    {
 +        testUnicodeSuffixModeNoSplits(false);
 +        cleanupData();
 +        testUnicodeSuffixModeNoSplits(true);
 +    }
 +
 +    private void testUnicodeSuffixModeNoSplits(boolean forceFlush)
 +    {
 +        ColumnFamilyStore store = Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME);
 +
 +        final ByteBuffer comment = UTF8Type.instance.decompose("comment_suffix_split");
 +
 +        Mutation rm = new Mutation(KS_NAME, decoratedKey("key1"));
 +        update(rm, comment, UTF8Type.instance.decompose("龍馭鬱"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key2"));
 +        update(rm, comment, UTF8Type.instance.decompose("インディアナ"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key3"));
 +        update(rm, comment, UTF8Type.instance.decompose("レストラン"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key4"));
 +        update(rm, comment, UTF8Type.instance.decompose("ベンジャミン ウエスト"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        if (forceFlush)
 +            store.forceBlockingFlush();
 +
 +        Set<String> rows;
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("龍")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("鬱")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("馭鬱")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("龍馭鬱")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("ベンジャミン")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key4" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("トラン")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key3" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("ディア")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("ジャミン")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key4" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("ン")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2", "key3", "key4" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_SUFFIX, UTF8Type.instance.decompose("ン")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key3" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_MATCHES, UTF8Type.instance.decompose("ベンジャミン ウエスト")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key4" }, rows.toArray(new String[rows.size()])));
 +    }
 +
 +    @Test
 +    public void testThatTooBigValueIsRejected()
 +    {
 +        ColumnFamilyStore store = Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME);
 +
 +        final ByteBuffer comment = UTF8Type.instance.decompose("comment_suffix_split");
 +
 +        for (int i = 0; i < 10; i++)
 +        {
 +            byte[] randomBytes = new byte[ThreadLocalRandom.current().nextInt(OnDiskIndexBuilder.MAX_TERM_SIZE, 5 * OnDiskIndexBuilder.MAX_TERM_SIZE)];
 +            ThreadLocalRandom.current().nextBytes(randomBytes);
 +
 +            final ByteBuffer bigValue = UTF8Type.instance.decompose(new String(randomBytes));
 +
 +            Mutation rm = new Mutation(KS_NAME, decoratedKey("key1"));
 +            update(rm, comment, bigValue, System.currentTimeMillis());
 +            rm.apply();
 +
 +            Set<String> rows;
 +
 +            rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_MATCHES, bigValue.duplicate()));
 +            Assert.assertEquals(0, rows.size());
 +
 +            store.forceBlockingFlush();
 +
 +            rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_MATCHES, bigValue.duplicate()));
 +            Assert.assertEquals(0, rows.size());
 +        }
 +    }
 +
 +    @Test
 +    public void testSearchTimeouts() throws Exception
 +    {
 +        final ByteBuffer firstName = UTF8Type.instance.decompose("first_name");
 +
 +        Map<String, Pair<String, Integer>> data1 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key1", Pair.create("Pavel", 14));
 +                put("key2", Pair.create("Pavel", 26));
 +                put("key3", Pair.create("Pavel", 27));
 +                put("key4", Pair.create("Jason", 27));
 +        }};
 +
 +        ColumnFamilyStore store = loadData(data1, true);
 +
 +        RowFilter filter = RowFilter.create();
 +        filter.add(store.metadata.getColumnDefinition(firstName), Operator.LIKE_CONTAINS, AsciiType.instance.fromString("a"));
 +
 +        ReadCommand command = new PartitionRangeReadCommand(store.metadata,
 +                                                            FBUtilities.nowInSeconds(),
 +                                                            ColumnFilter.all(store.metadata),
 +                                                            filter,
 +                                                            DataLimits.NONE,
 +                                                            DataRange.allData(store.metadata.partitioner),
 +                                                            Optional.empty());
 +
 +        try
 +        {
 +            new QueryPlan(store, command, 0).execute(ReadExecutionController.empty());
 +            Assert.fail();
 +        }
 +        catch (TimeQuotaExceededException e)
 +        {
 +            // correct behavior
 +        }
 +        catch (Exception e)
 +        {
 +            Assert.fail();
 +            e.printStackTrace();
 +        }
 +
 +        // to make sure that query doesn't fail in normal conditions
 +
 +        try (ReadExecutionController controller = command.executionController())
 +        {
 +            Set<String> rows = getKeys(new QueryPlan(store, command, DatabaseDescriptor.getRangeRpcTimeout()).execute(controller));
 +            Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1", "key2", "key3", "key4" }, rows.toArray(new String[rows.size()])));
 +        }
 +    }
 +
 +    @Test
 +    public void testLowerCaseAnalyzer()
 +    {
 +        testLowerCaseAnalyzer(false);
 +        cleanupData();
 +        testLowerCaseAnalyzer(true);
 +    }
 +
 +    @Test
 +    public void testChinesePrefixSearch()
 +    {
 +        ColumnFamilyStore store = Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME);
 +
 +        final ByteBuffer fullName = UTF8Type.instance.decompose("/output/full-name/");
 +
 +        Mutation rm = new Mutation(KS_NAME, decoratedKey("key1"));
 +        update(rm, fullName, UTF8Type.instance.decompose("美加 八田"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key2"));
 +        update(rm, fullName, UTF8Type.instance.decompose("仁美 瀧澤"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key3"));
 +        update(rm, fullName, UTF8Type.instance.decompose("晃宏 高須"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key4"));
 +        update(rm, fullName, UTF8Type.instance.decompose("弘孝 大竹"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key5"));
 +        update(rm, fullName, UTF8Type.instance.decompose("満枝 榎本"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key6"));
 +        update(rm, fullName, UTF8Type.instance.decompose("飛鳥 上原"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key7"));
 +        update(rm, fullName, UTF8Type.instance.decompose("大輝 鎌田"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key8"));
 +        update(rm, fullName, UTF8Type.instance.decompose("利久 寺地"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        store.forceBlockingFlush();
 +
 +
 +        Set<String> rows;
 +
 +        rows = getIndexed(store, 10, buildExpression(fullName, Operator.EQ, UTF8Type.instance.decompose("美加 八田")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(fullName, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("美加")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(fullName, Operator.EQ, UTF8Type.instance.decompose("晃宏 高須")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key3" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(fullName, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("大輝")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key7" }, rows.toArray(new String[rows.size()])));
 +    }
 +
 +    public void testLowerCaseAnalyzer(boolean forceFlush)
 +    {
 +        ColumnFamilyStore store = Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME);
 +
 +        final ByteBuffer comment = UTF8Type.instance.decompose("address");
 +
 +        Mutation rm = new Mutation(KS_NAME, decoratedKey("key1"));
 +        update(rm, comment, UTF8Type.instance.decompose("577 Rogahn Valleys Apt. 178"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key2"));
 +        update(rm, comment, UTF8Type.instance.decompose("89809 Beverly Course Suite 089"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key3"));
 +        update(rm, comment, UTF8Type.instance.decompose("165 clydie oval apt. 399"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        if (forceFlush)
 +            store.forceBlockingFlush();
 +
 +        Set<String> rows;
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("577 Rogahn Valleys")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("577 ROgAhn VallEYs")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("577 rogahn valleys")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("577 rogahn")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("57")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("89809 Beverly Course")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("89809 BEVERly COURSE")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("89809 beverly course")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("89809 Beverly")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("8980")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("165 ClYdie OvAl APT. 399")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key3" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("165 Clydie Oval Apt. 399")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key3" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("165 clydie oval apt. 399")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key3" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("165 ClYdie OvA")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key3" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("165 ClYdi")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key3" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("165")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key3" }, rows.toArray(new String[rows.size()])));
 +    }
 +
 +    @Test
 +    public void testPrefixSSTableLookup()
 +    {
 +        // This test coverts particular case which interval lookup can return invalid results
 +        // when queried on the prefix e.g. "j".
 +        ColumnFamilyStore store = Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME);
 +
 +        final ByteBuffer name = UTF8Type.instance.decompose("first_name_prefix");
 +
 +        Mutation rm;
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key1"));
 +        update(rm, name, UTF8Type.instance.decompose("Pavel"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key2"));
 +        update(rm, name, UTF8Type.instance.decompose("Jordan"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key3"));
 +        update(rm, name, UTF8Type.instance.decompose("Mikhail"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key4"));
 +        update(rm, name, UTF8Type.instance.decompose("Michael"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key5"));
 +        update(rm, name, UTF8Type.instance.decompose("Johnny"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        // first flush would make interval for name - 'johnny' -> 'pavel'
 +        store.forceBlockingFlush();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key6"));
 +        update(rm, name, UTF8Type.instance.decompose("Jason"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key7"));
 +        update(rm, name, UTF8Type.instance.decompose("Vijay"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key8")); // this name is going to be tokenized
 +        update(rm, name, UTF8Type.instance.decompose("Jean-Claude"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        // this flush is going to produce range - 'jason' -> 'vijay'
 +        store.forceBlockingFlush();
 +
 +        // make sure that overlap of the prefixes is properly handled across sstables
 +        // since simple interval tree lookup is not going to cover it, prefix lookup actually required.
 +
 +        Set<String> rows;
 +
 +        rows = getIndexed(store, 10, buildExpression(name, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("J")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2", "key5", "key6", "key8"}, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(name, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("j")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2", "key5", "key6", "key8" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(name, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("m")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key3", "key4" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(name, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("v")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key7" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(name, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("p")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(name, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("j")),
 +                                     buildExpression(name, Operator.NEQ, UTF8Type.instance.decompose("joh")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2", "key6", "key8" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(name, Operator.LIKE_MATCHES, UTF8Type.instance.decompose("pavel")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(name, Operator.EQ, UTF8Type.instance.decompose("Pave")));
 +        Assert.assertTrue(rows.isEmpty());
 +
 +        rows = getIndexed(store, 10, buildExpression(name, Operator.EQ, UTF8Type.instance.decompose("Pavel")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(name, Operator.LIKE_MATCHES, UTF8Type.instance.decompose("JeAn")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key8" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(name, Operator.LIKE_MATCHES, UTF8Type.instance.decompose("claUde")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key8" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(name, Operator.EQ, UTF8Type.instance.decompose("Jean")));
 +        Assert.assertTrue(rows.isEmpty());
 +
 +        rows = getIndexed(store, 10, buildExpression(name, Operator.EQ, UTF8Type.instance.decompose("Jean-Claude")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key8" }, rows.toArray(new String[rows.size()])));
 +    }
 +
 +    @Test
 +    public void testSettingIsLiteralOption()
 +    {
 +
 +        // special type which is UTF-8 but is only on the inside
 +        AbstractType<?> stringType = new AbstractType<String>(AbstractType.ComparisonType.CUSTOM)
 +        {
 +            public ByteBuffer fromString(String source) throws MarshalException
 +            {
 +                return UTF8Type.instance.fromString(source);
 +            }
 +
 +            public Term fromJSONObject(Object parsed) throws MarshalException
 +            {
 +                throw new UnsupportedOperationException();
 +            }
 +
 +            public TypeSerializer<String> getSerializer()
 +            {
 +                return UTF8Type.instance.getSerializer();
 +            }
 +
 +            public int compareCustom(ByteBuffer a, ByteBuffer b)
 +            {
 +                return UTF8Type.instance.compare(a, b);
 +            }
 +        };
 +
 +        // first let's check that we get 'false' for 'isLiteral' if we don't set the option with special comparator
 +        ColumnDefinition columnA = ColumnDefinition.regularDef(KS_NAME, CF_NAME, "special-A", stringType);
 +
 +        ColumnIndex indexA = new ColumnIndex(UTF8Type.instance, columnA, IndexMetadata.fromSchemaMetadata("special-index-A", IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
 +        {{
 +            put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, SASIIndex.class.getName());
 +        }}));
 +
 +        Assert.assertEquals(true,  indexA.isIndexed());
 +        Assert.assertEquals(false, indexA.isLiteral());
 +
 +        // now let's double-check that we do get 'true' when we set it
 +        ColumnDefinition columnB = ColumnDefinition.regularDef(KS_NAME, CF_NAME, "special-B", stringType);
 +
 +        ColumnIndex indexB = new ColumnIndex(UTF8Type.instance, columnB, IndexMetadata.fromSchemaMetadata("special-index-B", IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
 +        {{
 +            put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, SASIIndex.class.getName());
 +            put("is_literal", "true");
 +        }}));
 +
 +        Assert.assertEquals(true, indexB.isIndexed());
 +        Assert.assertEquals(true, indexB.isLiteral());
 +
 +        // and finally we should also get a 'true' if it's built-in UTF-8/ASCII comparator
 +        ColumnDefinition columnC = ColumnDefinition.regularDef(KS_NAME, CF_NAME, "special-C", UTF8Type.instance);
 +
 +        ColumnIndex indexC = new ColumnIndex(UTF8Type.instance, columnC, IndexMetadata.fromSchemaMetadata("special-index-C", IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
 +        {{
 +            put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, SASIIndex.class.getName());
 +        }}));
 +
 +        Assert.assertEquals(true, indexC.isIndexed());
 +        Assert.assertEquals(true, indexC.isLiteral());
 +
 +        ColumnDefinition columnD = ColumnDefinition.regularDef(KS_NAME, CF_NAME, "special-D", AsciiType.instance);
 +
 +        ColumnIndex indexD = new ColumnIndex(UTF8Type.instance, columnD, IndexMetadata.fromSchemaMetadata("special-index-D", IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
 +        {{
 +            put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, SASIIndex.class.getName());
 +        }}));
 +
 +        Assert.assertEquals(true, indexD.isIndexed());
 +        Assert.assertEquals(true, indexD.isLiteral());
 +
 +        // and option should supersedes the comparator type
 +        ColumnDefinition columnE = ColumnDefinition.regularDef(KS_NAME, CF_NAME, "special-E", UTF8Type.instance);
 +
 +        ColumnIndex indexE = new ColumnIndex(UTF8Type.instance, columnE, IndexMetadata.fromSchemaMetadata("special-index-E", IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
 +        {{
 +            put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, SASIIndex.class.getName());
 +            put("is_literal", "false");
 +        }}));
 +
 +        Assert.assertEquals(true,  indexE.isIndexed());
 +        Assert.assertEquals(false, indexE.isLiteral());
++
++        // test frozen-collection
++        ColumnDefinition columnF = ColumnDefinition.regularDef(KS_NAME,
++                                                               CF_NAME,
++                                                               "special-F",
++                                                               ListType.getInstance(UTF8Type.instance, false));
++
++        ColumnIndex indexF = new ColumnIndex(UTF8Type.instance, columnF, IndexMetadata.fromSchemaMetadata("special-index-F", IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
++        {{
++            put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, SASIIndex.class.getName());
++        }}));
++
++        Assert.assertEquals(true,  indexF.isIndexed());
++        Assert.assertEquals(false, indexF.isLiteral());
 +    }
 +
 +    @Test
 +    public void testClusteringIndexes() throws Exception
 +    {
 +        testClusteringIndexes(false);
 +        cleanupData();
 +        testClusteringIndexes(true);
 +    }
 +
 +    public void testClusteringIndexes(boolean forceFlush) throws Exception
 +    {
 +        ColumnFamilyStore store = Keyspace.open(KS_NAME).getColumnFamilyStore(CLUSTERING_CF_NAME_1);
 +
 +        executeCQL(CLUSTERING_CF_NA

<TRUNCATED>

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[5/8] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by ad...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/47a2839b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/47a2839b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/47a2839b

Branch: refs/heads/trunk
Commit: 47a2839bf094a7fa9bad6de140fba486756c49bf
Parents: 15abe2d 3960260
Author: Andrés de la Peña <a....@gmail.com>
Authored: Tue Aug 8 15:01:04 2017 +0100
Committer: Andrés de la Peña <a....@gmail.com>
Committed: Tue Aug 8 15:01:04 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/config/ColumnDefinition.java      | 19 ++++++++---
 .../apache/cassandra/db/rows/AbstractCell.java  |  2 +-
 .../org/apache/cassandra/db/rows/BTreeRow.java  |  2 +-
 .../apache/cassandra/tools/JsonTransformer.java | 31 ++++++++++++++++--
 .../org/apache/cassandra/cql3/ViewTest.java     | 33 ++++++++++++++++++++
 .../cassandra/index/sasi/SASIIndexTest.java     | 14 +++++++++
 7 files changed, 94 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a2839b/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index b8c4bde,1525289..b778df6
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,9 -1,5 +1,10 @@@
 -3.0.15
 +3.11.1
 + * "ignore" option is ignored in sstableloader (CASSANDRA-13721)
 + * Deadlock in AbstractCommitLogSegmentManager (CASSANDRA-13652)
 + * Duplicate the buffer before passing it to analyser in SASI operation (CASSANDRA-13512)
 + * Properly evict pstmts from prepared statements cache (CASSANDRA-13641)
 +Merged from 3.0:
+  * Fix ColumnDefinition.cellValueType() for non-frozen collection and change SSTabledump to use type.toJSONString() (CASSANDRA-13573)
   * Skip materialized view addition if the base table doesn't exist (CASSANDRA-13737)
   * Drop table should remove corresponding entries in dropped_columns table (CASSANDRA-13730)
   * Log warn message until legacy auth tables have been migrated (CASSANDRA-13371)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a2839b/src/java/org/apache/cassandra/config/ColumnDefinition.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/ColumnDefinition.java
index ea508d2,6a0f530..159ea0c
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@@ -446,197 -398,17 +448,206 @@@ public class ColumnDefinition extends C
       */
      public AbstractType<?> cellValueType()
      {
-         return type instanceof CollectionType
-              ? ((CollectionType)type).valueComparator()
-              : type;
+         assert !(type instanceof UserType && type.isMultiCell());
+         return type instanceof CollectionType && type.isMultiCell()
+                 ? ((CollectionType)type).valueComparator()
+                 : type;
+     }
+ 
+ 
+     public boolean isCounterColumn()
+     {
+         if (type instanceof CollectionType) // for thrift
+             return ((CollectionType) type).valueComparator().isCounter();
+         return type.isCounter();
      }
 +
 +    public Selector.Factory newSelectorFactory(CFMetaData cfm, AbstractType<?> expectedType, List<ColumnDefinition> defs, VariableSpecifications boundNames) throws InvalidRequestException
 +    {
 +        return SimpleSelector.newFactory(this, addAndGetIndex(this, defs));
 +    }
 +
 +    public AbstractType<?> getExactTypeIfKnown(String keyspace)
 +    {
 +        return type;
 +    }
 +
 +    /**
 +     * Because Thrift-created tables may have a non-text comparator, we cannot determine the proper 'key' until
 +     * we know the comparator. ColumnDefinition.Raw is a placeholder that can be converted to a real ColumnIdentifier
 +     * once the comparator is known with prepare(). This should only be used with identifiers that are actual
 +     * column names. See CASSANDRA-8178 for more background.
 +     */
 +    public static abstract class Raw extends Selectable.Raw
 +    {
 +        /**
 +         * Creates a {@code ColumnDefinition.Raw} from an unquoted identifier string.
 +         */
 +        public static Raw forUnquoted(String text)
 +        {
 +            return new Literal(text, false);
 +        }
 +
 +        /**
 +         * Creates a {@code ColumnDefinition.Raw} from a quoted identifier string.
 +         */
 +        public static Raw forQuoted(String text)
 +        {
 +            return new Literal(text, true);
 +        }
 +
 +        /**
 +         * Creates a {@code ColumnDefinition.Raw} from a pre-existing {@code ColumnDefinition}
 +         * (useful in the rare cases where we already have the column but need
 +         * a {@code ColumnDefinition.Raw} for typing purposes).
 +         */
 +        public static Raw forColumn(ColumnDefinition column)
 +        {
 +            return new ForColumn(column);
 +        }
 +
 +        /**
 +         * Get the identifier corresponding to this raw column, without assuming this is an
 +         * existing column (unlike {@link #prepare}).
 +         */
 +        public abstract ColumnIdentifier getIdentifier(CFMetaData cfm);
 +
 +        public abstract String rawText();
 +
 +        @Override
 +        public abstract ColumnDefinition prepare(CFMetaData cfm);
 +
 +        @Override
 +        public boolean processesSelection()
 +        {
 +            return false;
 +        }
 +
 +        @Override
 +        public final int hashCode()
 +        {
 +            return toString().hashCode();
 +        }
 +
 +        @Override
 +        public final boolean equals(Object o)
 +        {
 +            if(!(o instanceof Raw))
 +                return false;
 +
 +            Raw that = (Raw)o;
 +            return this.toString().equals(that.toString());
 +        }
 +
 +        private static class Literal extends Raw
 +        {
 +            private final String text;
 +
 +            public Literal(String rawText, boolean keepCase)
 +            {
 +                this.text =  keepCase ? rawText : rawText.toLowerCase(Locale.US);
 +            }
 +
 +            public ColumnIdentifier getIdentifier(CFMetaData cfm)
 +            {
 +                if (!cfm.isStaticCompactTable())
 +                    return ColumnIdentifier.getInterned(text, true);
 +
 +                AbstractType<?> thriftColumnNameType = cfm.thriftColumnNameType();
 +                if (thriftColumnNameType instanceof UTF8Type)
 +                    return ColumnIdentifier.getInterned(text, true);
 +
 +                // We have a Thrift-created table with a non-text comparator. Check if we have a match column, otherwise assume we should use
 +                // thriftColumnNameType
 +                ByteBuffer bufferName = ByteBufferUtil.bytes(text);
 +                for (ColumnDefinition def : cfm.allColumns())
 +                {
 +                    if (def.name.bytes.equals(bufferName))
 +                        return def.name;
 +                }
 +                return ColumnIdentifier.getInterned(thriftColumnNameType, thriftColumnNameType.fromString(text), text);
 +            }
 +
 +            public ColumnDefinition prepare(CFMetaData cfm)
 +            {
 +                if (!cfm.isStaticCompactTable())
 +                    return find(cfm);
 +
 +                AbstractType<?> thriftColumnNameType = cfm.thriftColumnNameType();
 +                if (thriftColumnNameType instanceof UTF8Type)
 +                    return find(cfm);
 +
 +                // We have a Thrift-created table with a non-text comparator. Check if we have a match column, otherwise assume we should use
 +                // thriftColumnNameType
 +                ByteBuffer bufferName = ByteBufferUtil.bytes(text);
 +                for (ColumnDefinition def : cfm.allColumns())
 +                {
 +                    if (def.name.bytes.equals(bufferName))
 +                        return def;
 +                }
 +                return find(thriftColumnNameType.fromString(text), cfm);
 +            }
 +
 +            private ColumnDefinition find(CFMetaData cfm)
 +            {
 +                return find(ByteBufferUtil.bytes(text), cfm);
 +            }
 +
 +            private ColumnDefinition find(ByteBuffer id, CFMetaData cfm)
 +            {
 +                ColumnDefinition def = cfm.getColumnDefinition(id);
 +                if (def == null)
 +                    throw new InvalidRequestException(String.format("Undefined column name %s", toString()));
 +                return def;
 +            }
 +
 +            public String rawText()
 +            {
 +                return text;
 +            }
 +
 +            @Override
 +            public String toString()
 +            {
 +                return ColumnIdentifier.maybeQuote(text);
 +            }
 +        }
 +
 +        // Use internally in the rare case where we need a ColumnDefinition.Raw for type-checking but
 +        // actually already have the column itself.
 +        private static class ForColumn extends Raw
 +        {
 +            private final ColumnDefinition column;
 +
 +            private ForColumn(ColumnDefinition column)
 +            {
 +                this.column = column;
 +            }
 +
 +            public ColumnIdentifier getIdentifier(CFMetaData cfm)
 +            {
 +                return column.name;
 +            }
 +
 +            public ColumnDefinition prepare(CFMetaData cfm)
 +            {
 +                assert cfm.getColumnDefinition(column.name) != null; // Sanity check that we're not doing something crazy
 +                return column;
 +            }
 +
 +            public String rawText()
 +            {
 +                return column.name.toString();
 +            }
 +
 +            @Override
 +            public String toString()
 +            {
 +                return column.name.toCQLString();
 +            }
 +        }
 +    }
 +
 +
 +
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a2839b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/rows/AbstractCell.java
index 54bd9e8,7e93c2e..54c8f24
--- a/src/java/org/apache/cassandra/db/rows/AbstractCell.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
@@@ -44,81 -40,6 +44,81 @@@ public abstract class AbstractCell exte
          super(column);
      }
  
 +    public boolean isCounterCell()
 +    {
-         return !isTombstone() && column.cellValueType().isCounter();
++        return !isTombstone() && column.isCounterColumn();
 +    }
 +
 +    public boolean isLive(int nowInSec)
 +    {
 +        return localDeletionTime() == NO_DELETION_TIME || (ttl() != NO_TTL && nowInSec < localDeletionTime());
 +    }
 +
 +    public boolean isTombstone()
 +    {
 +        return localDeletionTime() != NO_DELETION_TIME && ttl() == NO_TTL;
 +    }
 +
 +    public boolean isExpiring()
 +    {
 +        return ttl() != NO_TTL;
 +    }
 +
 +    public Cell markCounterLocalToBeCleared()
 +    {
 +        if (!isCounterCell())
 +            return this;
 +
 +        ByteBuffer value = value();
 +        ByteBuffer marked = CounterContext.instance().markLocalToBeCleared(value);
 +        return marked == value ? this : new BufferCell(column, timestamp(), ttl(), localDeletionTime(), marked, path());
 +    }
 +
 +    public Cell purge(DeletionPurger purger, int nowInSec)
 +    {
 +        if (!isLive(nowInSec))
 +        {
 +            if (purger.shouldPurge(timestamp(), localDeletionTime()))
 +                return null;
 +
 +            // We slightly hijack purging to convert expired but not purgeable columns to tombstones. The reason we do that is
 +            // that once a column has expired it is equivalent to a tombstone but actually using a tombstone is more compact since
 +            // we don't keep the column value. The reason we do it here is that 1) it's somewhat related to dealing with tombstones
 +            // so hopefully not too surprising and 2) we want to this and purging at the same places, so it's simpler/more efficient
 +            // to do both here.
 +            if (isExpiring())
 +            {
 +                // Note that as long as the expiring column and the tombstone put together live longer than GC grace seconds,
 +                // we'll fulfil our responsibility to repair. See discussion at
 +                // http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/repair-compaction-and-tombstone-rows-td7583481.html
 +                return BufferCell.tombstone(column, timestamp(), localDeletionTime() - ttl(), path()).purge(purger, nowInSec);
 +            }
 +        }
 +        return this;
 +    }
 +
 +    public Cell copy(AbstractAllocator allocator)
 +    {
 +        CellPath path = path();
 +        return new BufferCell(column, timestamp(), ttl(), localDeletionTime(), allocator.clone(value()), path == null ? null : path.copy(allocator));
 +    }
 +
 +    // note: while the cell returned may be different, the value is the same, so if the value is offheap it must be referenced inside a guarded context (or copied)
 +    public Cell updateAllTimestamp(long newTimestamp)
 +    {
 +        return new BufferCell(column, isTombstone() ? newTimestamp - 1 : newTimestamp, ttl(), localDeletionTime(), value(), path());
 +    }
 +
 +    public int dataSize()
 +    {
 +        CellPath path = path();
 +        return TypeSizes.sizeof(timestamp())
 +               + TypeSizes.sizeof(ttl())
 +               + TypeSizes.sizeof(localDeletionTime())
 +               + value().remaining()
 +               + (path == null ? 0 : path.dataSize());
 +    }
 +
      public void digest(MessageDigest digest)
      {
          digest.update(value().duplicate());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a2839b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a2839b/src/java/org/apache/cassandra/tools/JsonTransformer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/JsonTransformer.java
index 360e8ff,5c32035..c3a0a17
--- a/src/java/org/apache/cassandra/tools/JsonTransformer.java
+++ b/src/java/org/apache/cassandra/tools/JsonTransformer.java
@@@ -46,6 -50,7 +47,7 @@@ import org.apache.cassandra.db.rows.Row
  import org.apache.cassandra.db.rows.Unfiltered;
  import org.apache.cassandra.db.rows.UnfilteredRowIterator;
  import org.apache.cassandra.io.sstable.ISSTableScanner;
 -import org.apache.cassandra.transport.Server;
++import org.apache.cassandra.transport.ProtocolVersion;
  import org.apache.cassandra.utils.ByteBufferUtil;
  import org.codehaus.jackson.JsonFactory;
  import org.codehaus.jackson.JsonGenerator;
@@@ -405,9 -411,9 +407,10 @@@ public final class JsonTransforme
              objectIndenter.setCompact(true);
              json.writeFieldName("name");
              AbstractType<?> type = cell.column().type;
++            AbstractType<?> cellType = null;
              json.writeString(cell.column().name.toCQLString());
  
-             if (cell.path() != null && cell.path().size() > 0)
+             if (type.isCollection() && type.isMultiCell()) // non-frozen collection
              {
                  CollectionType ct = (CollectionType) type;
                  json.writeFieldName("path");
@@@ -419,6 -425,6 +422,30 @@@
                  }
                  json.writeEndArray();
                  arrayIndenter.setCompact(false);
++
++                cellType = cell.column().cellValueType();
++            }
++            else if (type.isUDT() && type.isMultiCell()) // non-frozen udt
++            {
++                UserType ut = (UserType) type;
++                json.writeFieldName("path");
++                arrayIndenter.setCompact(true);
++                json.writeStartArray();
++                for (int i = 0; i < cell.path().size(); i++)
++                {
++                    Short fieldPosition = ut.nameComparator().compose(cell.path().get(i));
++                    json.writeString(ut.fieldNameAsString(fieldPosition));
++                }
++                json.writeEndArray();
++                arrayIndenter.setCompact(false);
++
++                // cellType of udt
++                Short fieldPosition = ((UserType) type).nameComparator().compose(cell.path().get(0));
++                cellType = ((UserType) type).fieldType(fieldPosition);
++            }
++            else
++            {
++                cellType = cell.column().cellValueType();
              }
              if (cell.isTombstone())
              {
@@@ -433,7 -439,7 +460,7 @@@
              else
              {
                  json.writeFieldName("value");
-                 json.writeString(cell.column().cellValueType().getString(cell.value()));
 -                json.writeRawValue(cell.column().cellValueType().toJSONString(cell.value(), Server.CURRENT_VERSION));
++                json.writeRawValue(cellType.toJSONString(cell.value(), ProtocolVersion.CURRENT));
              }
              if (liveInfo.isEmpty() || cell.timestamp() != liveInfo.timestamp())
              {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a2839b/test/unit/org/apache/cassandra/cql3/ViewTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/ViewTest.java
index 6f6e04d,f8f8c9f..0853562
--- a/test/unit/org/apache/cassandra/cql3/ViewTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java
@@@ -777,6 -788,6 +777,39 @@@ public class ViewTest extends CQLTeste
      }
  
      @Test
++    public void testFrozenCollectionsWithComplicatedInnerType() throws Throwable
++    {
++        createTable("CREATE TABLE %s (k int, intval int,  listval frozen<list<tuple<text,text>>>, PRIMARY KEY (k))");
++
++        execute("USE " + keyspace());
++        executeNet(protocolVersion, "USE " + keyspace());
++
++        createView("mv",
++                   "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE k IS NOT NULL AND listval IS NOT NULL PRIMARY KEY (k, listval)");
++
++        updateView("INSERT INTO %s (k, intval, listval) VALUES (?, ?, fromJson(?))",
++                   0,
++                   0,
++                   "[[\"a\",\"1\"], [\"b\",\"2\"], [\"c\",\"3\"]]");
++
++        // verify input
++        assertRows(execute("SELECT k, listval FROM %s WHERE k = ?", 0),
++                   row(0, list(tuple("a", "1"), tuple("b", "2"), tuple("c", "3"))));
++        assertRows(execute("SELECT k, listval from mv"),
++                   row(0, list(tuple("a", "1"), tuple("b", "2"), tuple("c", "3"))));
++
++        // update listval with the same value and it will be compared in view generator
++        updateView("INSERT INTO %s (k, listval) VALUES (?, fromJson(?))",
++                   0,
++                   "[[\"a\",\"1\"], [\"b\",\"2\"], [\"c\",\"3\"]]");
++        // verify result
++        assertRows(execute("SELECT k, listval FROM %s WHERE k = ?", 0),
++                   row(0, list(tuple("a", "1"), tuple("b", "2"), tuple("c", "3"))));
++        assertRows(execute("SELECT k, listval from mv"),
++                   row(0, list(tuple("a", "1"), tuple("b", "2"), tuple("c", "3"))));
++    }
++
++    @Test
      public void testUpdate() throws Throwable
      {
          createTable("CREATE TABLE %s (" +


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[2/8] cassandra git commit: Fix ColumnMetadata.cellValueType() return type and change sstabledump tool to use type.toJsonString()

Posted by ad...@apache.org.
Fix ColumnMetadata.cellValueType() return type and change sstabledump tool to use type.toJsonString()

patch by Zhao Yang, reviewed by Andres de la Peña for CASSANDRA-13573


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/39602604
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/39602604
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/39602604

Branch: refs/heads/cassandra-3.11
Commit: 3960260472fcd4e0243f62cc813992f1365197c6
Parents: 9186679
Author: Zhao Yang <zh...@gmail.com>
Authored: Wed Aug 2 11:58:38 2017 +0800
Committer: Andrés de la Peña <a....@gmail.com>
Committed: Tue Aug 8 14:31:23 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/config/ColumnDefinition.java      | 19 ++++++++---
 .../org/apache/cassandra/db/rows/BTreeRow.java  |  2 +-
 .../apache/cassandra/db/rows/BufferCell.java    |  2 +-
 .../apache/cassandra/tools/JsonTransformer.java |  6 ++--
 .../org/apache/cassandra/cql3/ViewTest.java     | 33 ++++++++++++++++++++
 6 files changed, 55 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/39602604/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 905a436..1525289 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.15
+ * Fix ColumnDefinition.cellValueType() for non-frozen collection and change SSTabledump to use type.toJSONString() (CASSANDRA-13573)
  * Skip materialized view addition if the base table doesn't exist (CASSANDRA-13737)
  * Drop table should remove corresponding entries in dropped_columns table (CASSANDRA-13730)
  * Log warn message until legacy auth tables have been migrated (CASSANDRA-13371)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39602604/src/java/org/apache/cassandra/config/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java b/src/java/org/apache/cassandra/config/ColumnDefinition.java
index 34840e3..6a0f530 100644
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@ -391,13 +391,24 @@ public class ColumnDefinition extends ColumnSpecification implements Comparable<
     /**
      * The type of the cell values for cell belonging to this column.
      *
-     * This is the same than the column type, except for collections where it's the 'valueComparator'
+     * This is the same than the column type, except for non-frozen collections where it's the 'valueComparator'
      * of the collection.
+     * 
+     * This method should not be used to get value type of non-frozon UDT.
      */
     public AbstractType<?> cellValueType()
     {
-        return type instanceof CollectionType
-             ? ((CollectionType)type).valueComparator()
-             : type;
+        assert !(type instanceof UserType && type.isMultiCell());
+        return type instanceof CollectionType && type.isMultiCell()
+                ? ((CollectionType)type).valueComparator()
+                : type;
+    }
+
+
+    public boolean isCounterColumn()
+    {
+        if (type instanceof CollectionType) // for thrift
+            return ((CollectionType) type).valueComparator().isCounter();
+        return type.isCounter();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39602604/src/java/org/apache/cassandra/db/rows/BTreeRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index fda33d6..41dad0a 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -294,7 +294,7 @@ public class BTreeRow extends AbstractRow
 
     public Row markCounterLocalToBeCleared()
     {
-        return transformAndFilter(primaryKeyLivenessInfo, deletion, (cd) -> cd.column().cellValueType().isCounter()
+        return transformAndFilter(primaryKeyLivenessInfo, deletion, (cd) -> cd.column().isCounterColumn()
                                                                             ? cd.markCounterLocalToBeCleared()
                                                                             : cd);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39602604/src/java/org/apache/cassandra/db/rows/BufferCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BufferCell.java b/src/java/org/apache/cassandra/db/rows/BufferCell.java
index e4ad7e6..82ae02c 100644
--- a/src/java/org/apache/cassandra/db/rows/BufferCell.java
+++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java
@@ -89,7 +89,7 @@ public class BufferCell extends AbstractCell
 
     public boolean isCounterCell()
     {
-        return !isTombstone() && column.cellValueType().isCounter();
+        return !isTombstone() && column.isCounterColumn();
     }
 
     public boolean isLive(int nowInSec)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39602604/src/java/org/apache/cassandra/tools/JsonTransformer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/JsonTransformer.java b/src/java/org/apache/cassandra/tools/JsonTransformer.java
index 0a72583..5c32035 100644
--- a/src/java/org/apache/cassandra/tools/JsonTransformer.java
+++ b/src/java/org/apache/cassandra/tools/JsonTransformer.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.db.RangeTombstone;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.ColumnData;
 import org.apache.cassandra.db.rows.ComplexColumnData;
@@ -49,6 +50,7 @@ import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.Unfiltered;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.transport.Server;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonGenerator;
@@ -411,7 +413,7 @@ public final class JsonTransformer
             AbstractType<?> type = cell.column().type;
             json.writeString(cell.column().name.toCQLString());
 
-            if (cell.path() != null && cell.path().size() > 0)
+            if (type.isCollection() && type.isMultiCell()) // non-frozen collection
             {
                 CollectionType ct = (CollectionType) type;
                 json.writeFieldName("path");
@@ -437,7 +439,7 @@ public final class JsonTransformer
             else
             {
                 json.writeFieldName("value");
-                json.writeString(cell.column().cellValueType().getString(cell.value()));
+                json.writeRawValue(cell.column().cellValueType().toJSONString(cell.value(), Server.CURRENT_VERSION));
             }
             if (liveInfo.isEmpty() || cell.timestamp() != liveInfo.timestamp())
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39602604/test/unit/org/apache/cassandra/cql3/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/ViewTest.java b/test/unit/org/apache/cassandra/cql3/ViewTest.java
index e595ebd..f8f8c9f 100644
--- a/test/unit/org/apache/cassandra/cql3/ViewTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java
@@ -1262,4 +1262,37 @@ public class ViewTest extends CQLTester
 
         assertRows(execute("SELECT count(*) FROM mv_test"), row(1024L));
     }
+
+    @Test
+    public void testFrozenCollectionsWithComplicatedInnerType() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int, intval int,  listval frozen<list<tuple<text,text>>>, PRIMARY KEY (k))");
+
+        execute("USE " + keyspace());
+        executeNet(protocolVersion, "USE " + keyspace());
+
+        createView("mv",
+                   "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE k IS NOT NULL AND listval IS NOT NULL PRIMARY KEY (k, listval)");
+
+        updateView("INSERT INTO %s (k, intval, listval) VALUES (?, ?, fromJson(?))",
+                   0,
+                   0,
+                   "[[\"a\", \"1\"], [\"b\", \"2\"], [\"c\", \"3\"]]");
+
+        // verify input
+        assertRows(execute("SELECT k, toJson(listval) FROM %s WHERE k = ?", 0),
+                   row(0, "[[\"a\", \"1\"], [\"b\", \"2\"], [\"c\", \"3\"]]"));
+        assertRows(execute("SELECT k, toJson(listval) from mv"),
+                   row(0, "[[\"a\", \"1\"], [\"b\", \"2\"], [\"c\", \"3\"]]"));
+
+        // update listval with the same value and it will be compared in view generator
+        updateView("INSERT INTO %s (k, listval) VALUES (?, fromJson(?))",
+                   0,
+                   "[[\"a\", \"1\"], [\"b\", \"2\"], [\"c\", \"3\"]]");
+        // verify result
+        assertRows(execute("SELECT k, toJson(listval) FROM %s WHERE k = ?", 0),
+                   row(0, "[[\"a\", \"1\"], [\"b\", \"2\"], [\"c\", \"3\"]]"));
+        assertRows(execute("SELECT k, toJson(listval) from mv"),
+                   row(0, "[[\"a\", \"1\"], [\"b\", \"2\"], [\"c\", \"3\"]]"));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[3/8] cassandra git commit: Fix ColumnMetadata.cellValueType() return type and change sstabledump tool to use type.toJsonString()

Posted by ad...@apache.org.
Fix ColumnMetadata.cellValueType() return type and change sstabledump tool to use type.toJsonString()

patch by Zhao Yang, reviewed by Andres de la Peña for CASSANDRA-13573


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/39602604
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/39602604
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/39602604

Branch: refs/heads/trunk
Commit: 3960260472fcd4e0243f62cc813992f1365197c6
Parents: 9186679
Author: Zhao Yang <zh...@gmail.com>
Authored: Wed Aug 2 11:58:38 2017 +0800
Committer: Andrés de la Peña <a....@gmail.com>
Committed: Tue Aug 8 14:31:23 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/config/ColumnDefinition.java      | 19 ++++++++---
 .../org/apache/cassandra/db/rows/BTreeRow.java  |  2 +-
 .../apache/cassandra/db/rows/BufferCell.java    |  2 +-
 .../apache/cassandra/tools/JsonTransformer.java |  6 ++--
 .../org/apache/cassandra/cql3/ViewTest.java     | 33 ++++++++++++++++++++
 6 files changed, 55 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/39602604/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 905a436..1525289 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.15
+ * Fix ColumnDefinition.cellValueType() for non-frozen collection and change SSTabledump to use type.toJSONString() (CASSANDRA-13573)
  * Skip materialized view addition if the base table doesn't exist (CASSANDRA-13737)
  * Drop table should remove corresponding entries in dropped_columns table (CASSANDRA-13730)
  * Log warn message until legacy auth tables have been migrated (CASSANDRA-13371)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39602604/src/java/org/apache/cassandra/config/ColumnDefinition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java b/src/java/org/apache/cassandra/config/ColumnDefinition.java
index 34840e3..6a0f530 100644
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@ -391,13 +391,24 @@ public class ColumnDefinition extends ColumnSpecification implements Comparable<
     /**
      * The type of the cell values for cell belonging to this column.
      *
-     * This is the same than the column type, except for collections where it's the 'valueComparator'
+     * This is the same than the column type, except for non-frozen collections where it's the 'valueComparator'
      * of the collection.
+     * 
+     * This method should not be used to get value type of non-frozon UDT.
      */
     public AbstractType<?> cellValueType()
     {
-        return type instanceof CollectionType
-             ? ((CollectionType)type).valueComparator()
-             : type;
+        assert !(type instanceof UserType && type.isMultiCell());
+        return type instanceof CollectionType && type.isMultiCell()
+                ? ((CollectionType)type).valueComparator()
+                : type;
+    }
+
+
+    public boolean isCounterColumn()
+    {
+        if (type instanceof CollectionType) // for thrift
+            return ((CollectionType) type).valueComparator().isCounter();
+        return type.isCounter();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39602604/src/java/org/apache/cassandra/db/rows/BTreeRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index fda33d6..41dad0a 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -294,7 +294,7 @@ public class BTreeRow extends AbstractRow
 
     public Row markCounterLocalToBeCleared()
     {
-        return transformAndFilter(primaryKeyLivenessInfo, deletion, (cd) -> cd.column().cellValueType().isCounter()
+        return transformAndFilter(primaryKeyLivenessInfo, deletion, (cd) -> cd.column().isCounterColumn()
                                                                             ? cd.markCounterLocalToBeCleared()
                                                                             : cd);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39602604/src/java/org/apache/cassandra/db/rows/BufferCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BufferCell.java b/src/java/org/apache/cassandra/db/rows/BufferCell.java
index e4ad7e6..82ae02c 100644
--- a/src/java/org/apache/cassandra/db/rows/BufferCell.java
+++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java
@@ -89,7 +89,7 @@ public class BufferCell extends AbstractCell
 
     public boolean isCounterCell()
     {
-        return !isTombstone() && column.cellValueType().isCounter();
+        return !isTombstone() && column.isCounterColumn();
     }
 
     public boolean isLive(int nowInSec)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39602604/src/java/org/apache/cassandra/tools/JsonTransformer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/JsonTransformer.java b/src/java/org/apache/cassandra/tools/JsonTransformer.java
index 0a72583..5c32035 100644
--- a/src/java/org/apache/cassandra/tools/JsonTransformer.java
+++ b/src/java/org/apache/cassandra/tools/JsonTransformer.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.db.RangeTombstone;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.ColumnData;
 import org.apache.cassandra.db.rows.ComplexColumnData;
@@ -49,6 +50,7 @@ import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.Unfiltered;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.transport.Server;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonGenerator;
@@ -411,7 +413,7 @@ public final class JsonTransformer
             AbstractType<?> type = cell.column().type;
             json.writeString(cell.column().name.toCQLString());
 
-            if (cell.path() != null && cell.path().size() > 0)
+            if (type.isCollection() && type.isMultiCell()) // non-frozen collection
             {
                 CollectionType ct = (CollectionType) type;
                 json.writeFieldName("path");
@@ -437,7 +439,7 @@ public final class JsonTransformer
             else
             {
                 json.writeFieldName("value");
-                json.writeString(cell.column().cellValueType().getString(cell.value()));
+                json.writeRawValue(cell.column().cellValueType().toJSONString(cell.value(), Server.CURRENT_VERSION));
             }
             if (liveInfo.isEmpty() || cell.timestamp() != liveInfo.timestamp())
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/39602604/test/unit/org/apache/cassandra/cql3/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/ViewTest.java b/test/unit/org/apache/cassandra/cql3/ViewTest.java
index e595ebd..f8f8c9f 100644
--- a/test/unit/org/apache/cassandra/cql3/ViewTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java
@@ -1262,4 +1262,37 @@ public class ViewTest extends CQLTester
 
         assertRows(execute("SELECT count(*) FROM mv_test"), row(1024L));
     }
+
+    @Test
+    public void testFrozenCollectionsWithComplicatedInnerType() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int, intval int,  listval frozen<list<tuple<text,text>>>, PRIMARY KEY (k))");
+
+        execute("USE " + keyspace());
+        executeNet(protocolVersion, "USE " + keyspace());
+
+        createView("mv",
+                   "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE k IS NOT NULL AND listval IS NOT NULL PRIMARY KEY (k, listval)");
+
+        updateView("INSERT INTO %s (k, intval, listval) VALUES (?, ?, fromJson(?))",
+                   0,
+                   0,
+                   "[[\"a\", \"1\"], [\"b\", \"2\"], [\"c\", \"3\"]]");
+
+        // verify input
+        assertRows(execute("SELECT k, toJson(listval) FROM %s WHERE k = ?", 0),
+                   row(0, "[[\"a\", \"1\"], [\"b\", \"2\"], [\"c\", \"3\"]]"));
+        assertRows(execute("SELECT k, toJson(listval) from mv"),
+                   row(0, "[[\"a\", \"1\"], [\"b\", \"2\"], [\"c\", \"3\"]]"));
+
+        // update listval with the same value and it will be compared in view generator
+        updateView("INSERT INTO %s (k, listval) VALUES (?, fromJson(?))",
+                   0,
+                   "[[\"a\", \"1\"], [\"b\", \"2\"], [\"c\", \"3\"]]");
+        // verify result
+        assertRows(execute("SELECT k, toJson(listval) FROM %s WHERE k = ?", 0),
+                   row(0, "[[\"a\", \"1\"], [\"b\", \"2\"], [\"c\", \"3\"]]"));
+        assertRows(execute("SELECT k, toJson(listval) from mv"),
+                   row(0, "[[\"a\", \"1\"], [\"b\", \"2\"], [\"c\", \"3\"]]"));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[7/8] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by ad...@apache.org.
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/47a2839b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/47a2839b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/47a2839b

Branch: refs/heads/cassandra-3.11
Commit: 47a2839bf094a7fa9bad6de140fba486756c49bf
Parents: 15abe2d 3960260
Author: Andrés de la Peña <a....@gmail.com>
Authored: Tue Aug 8 15:01:04 2017 +0100
Committer: Andrés de la Peña <a....@gmail.com>
Committed: Tue Aug 8 15:01:04 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/config/ColumnDefinition.java      | 19 ++++++++---
 .../apache/cassandra/db/rows/AbstractCell.java  |  2 +-
 .../org/apache/cassandra/db/rows/BTreeRow.java  |  2 +-
 .../apache/cassandra/tools/JsonTransformer.java | 31 ++++++++++++++++--
 .../org/apache/cassandra/cql3/ViewTest.java     | 33 ++++++++++++++++++++
 .../cassandra/index/sasi/SASIIndexTest.java     | 14 +++++++++
 7 files changed, 94 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a2839b/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index b8c4bde,1525289..b778df6
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,9 -1,5 +1,10 @@@
 -3.0.15
 +3.11.1
 + * "ignore" option is ignored in sstableloader (CASSANDRA-13721)
 + * Deadlock in AbstractCommitLogSegmentManager (CASSANDRA-13652)
 + * Duplicate the buffer before passing it to analyser in SASI operation (CASSANDRA-13512)
 + * Properly evict pstmts from prepared statements cache (CASSANDRA-13641)
 +Merged from 3.0:
+  * Fix ColumnDefinition.cellValueType() for non-frozen collection and change SSTabledump to use type.toJSONString() (CASSANDRA-13573)
   * Skip materialized view addition if the base table doesn't exist (CASSANDRA-13737)
   * Drop table should remove corresponding entries in dropped_columns table (CASSANDRA-13730)
   * Log warn message until legacy auth tables have been migrated (CASSANDRA-13371)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a2839b/src/java/org/apache/cassandra/config/ColumnDefinition.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/ColumnDefinition.java
index ea508d2,6a0f530..159ea0c
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@@ -446,197 -398,17 +448,206 @@@ public class ColumnDefinition extends C
       */
      public AbstractType<?> cellValueType()
      {
-         return type instanceof CollectionType
-              ? ((CollectionType)type).valueComparator()
-              : type;
+         assert !(type instanceof UserType && type.isMultiCell());
+         return type instanceof CollectionType && type.isMultiCell()
+                 ? ((CollectionType)type).valueComparator()
+                 : type;
+     }
+ 
+ 
+     public boolean isCounterColumn()
+     {
+         if (type instanceof CollectionType) // for thrift
+             return ((CollectionType) type).valueComparator().isCounter();
+         return type.isCounter();
      }
 +
 +    public Selector.Factory newSelectorFactory(CFMetaData cfm, AbstractType<?> expectedType, List<ColumnDefinition> defs, VariableSpecifications boundNames) throws InvalidRequestException
 +    {
 +        return SimpleSelector.newFactory(this, addAndGetIndex(this, defs));
 +    }
 +
 +    public AbstractType<?> getExactTypeIfKnown(String keyspace)
 +    {
 +        return type;
 +    }
 +
 +    /**
 +     * Because Thrift-created tables may have a non-text comparator, we cannot determine the proper 'key' until
 +     * we know the comparator. ColumnDefinition.Raw is a placeholder that can be converted to a real ColumnIdentifier
 +     * once the comparator is known with prepare(). This should only be used with identifiers that are actual
 +     * column names. See CASSANDRA-8178 for more background.
 +     */
 +    public static abstract class Raw extends Selectable.Raw
 +    {
 +        /**
 +         * Creates a {@code ColumnDefinition.Raw} from an unquoted identifier string.
 +         */
 +        public static Raw forUnquoted(String text)
 +        {
 +            return new Literal(text, false);
 +        }
 +
 +        /**
 +         * Creates a {@code ColumnDefinition.Raw} from a quoted identifier string.
 +         */
 +        public static Raw forQuoted(String text)
 +        {
 +            return new Literal(text, true);
 +        }
 +
 +        /**
 +         * Creates a {@code ColumnDefinition.Raw} from a pre-existing {@code ColumnDefinition}
 +         * (useful in the rare cases where we already have the column but need
 +         * a {@code ColumnDefinition.Raw} for typing purposes).
 +         */
 +        public static Raw forColumn(ColumnDefinition column)
 +        {
 +            return new ForColumn(column);
 +        }
 +
 +        /**
 +         * Get the identifier corresponding to this raw column, without assuming this is an
 +         * existing column (unlike {@link #prepare}).
 +         */
 +        public abstract ColumnIdentifier getIdentifier(CFMetaData cfm);
 +
 +        public abstract String rawText();
 +
 +        @Override
 +        public abstract ColumnDefinition prepare(CFMetaData cfm);
 +
 +        @Override
 +        public boolean processesSelection()
 +        {
 +            return false;
 +        }
 +
 +        @Override
 +        public final int hashCode()
 +        {
 +            return toString().hashCode();
 +        }
 +
 +        @Override
 +        public final boolean equals(Object o)
 +        {
 +            if(!(o instanceof Raw))
 +                return false;
 +
 +            Raw that = (Raw)o;
 +            return this.toString().equals(that.toString());
 +        }
 +
 +        private static class Literal extends Raw
 +        {
 +            private final String text;
 +
 +            public Literal(String rawText, boolean keepCase)
 +            {
 +                this.text =  keepCase ? rawText : rawText.toLowerCase(Locale.US);
 +            }
 +
 +            public ColumnIdentifier getIdentifier(CFMetaData cfm)
 +            {
 +                if (!cfm.isStaticCompactTable())
 +                    return ColumnIdentifier.getInterned(text, true);
 +
 +                AbstractType<?> thriftColumnNameType = cfm.thriftColumnNameType();
 +                if (thriftColumnNameType instanceof UTF8Type)
 +                    return ColumnIdentifier.getInterned(text, true);
 +
 +                // We have a Thrift-created table with a non-text comparator. Check if we have a match column, otherwise assume we should use
 +                // thriftColumnNameType
 +                ByteBuffer bufferName = ByteBufferUtil.bytes(text);
 +                for (ColumnDefinition def : cfm.allColumns())
 +                {
 +                    if (def.name.bytes.equals(bufferName))
 +                        return def.name;
 +                }
 +                return ColumnIdentifier.getInterned(thriftColumnNameType, thriftColumnNameType.fromString(text), text);
 +            }
 +
 +            public ColumnDefinition prepare(CFMetaData cfm)
 +            {
 +                if (!cfm.isStaticCompactTable())
 +                    return find(cfm);
 +
 +                AbstractType<?> thriftColumnNameType = cfm.thriftColumnNameType();
 +                if (thriftColumnNameType instanceof UTF8Type)
 +                    return find(cfm);
 +
 +                // We have a Thrift-created table with a non-text comparator. Check if we have a match column, otherwise assume we should use
 +                // thriftColumnNameType
 +                ByteBuffer bufferName = ByteBufferUtil.bytes(text);
 +                for (ColumnDefinition def : cfm.allColumns())
 +                {
 +                    if (def.name.bytes.equals(bufferName))
 +                        return def;
 +                }
 +                return find(thriftColumnNameType.fromString(text), cfm);
 +            }
 +
 +            private ColumnDefinition find(CFMetaData cfm)
 +            {
 +                return find(ByteBufferUtil.bytes(text), cfm);
 +            }
 +
 +            private ColumnDefinition find(ByteBuffer id, CFMetaData cfm)
 +            {
 +                ColumnDefinition def = cfm.getColumnDefinition(id);
 +                if (def == null)
 +                    throw new InvalidRequestException(String.format("Undefined column name %s", toString()));
 +                return def;
 +            }
 +
 +            public String rawText()
 +            {
 +                return text;
 +            }
 +
 +            @Override
 +            public String toString()
 +            {
 +                return ColumnIdentifier.maybeQuote(text);
 +            }
 +        }
 +
 +        // Use internally in the rare case where we need a ColumnDefinition.Raw for type-checking but
 +        // actually already have the column itself.
 +        private static class ForColumn extends Raw
 +        {
 +            private final ColumnDefinition column;
 +
 +            private ForColumn(ColumnDefinition column)
 +            {
 +                this.column = column;
 +            }
 +
 +            public ColumnIdentifier getIdentifier(CFMetaData cfm)
 +            {
 +                return column.name;
 +            }
 +
 +            public ColumnDefinition prepare(CFMetaData cfm)
 +            {
 +                assert cfm.getColumnDefinition(column.name) != null; // Sanity check that we're not doing something crazy
 +                return column;
 +            }
 +
 +            public String rawText()
 +            {
 +                return column.name.toString();
 +            }
 +
 +            @Override
 +            public String toString()
 +            {
 +                return column.name.toCQLString();
 +            }
 +        }
 +    }
 +
 +
 +
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a2839b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/rows/AbstractCell.java
index 54bd9e8,7e93c2e..54c8f24
--- a/src/java/org/apache/cassandra/db/rows/AbstractCell.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
@@@ -44,81 -40,6 +44,81 @@@ public abstract class AbstractCell exte
          super(column);
      }
  
 +    public boolean isCounterCell()
 +    {
-         return !isTombstone() && column.cellValueType().isCounter();
++        return !isTombstone() && column.isCounterColumn();
 +    }
 +
 +    public boolean isLive(int nowInSec)
 +    {
 +        return localDeletionTime() == NO_DELETION_TIME || (ttl() != NO_TTL && nowInSec < localDeletionTime());
 +    }
 +
 +    public boolean isTombstone()
 +    {
 +        return localDeletionTime() != NO_DELETION_TIME && ttl() == NO_TTL;
 +    }
 +
 +    public boolean isExpiring()
 +    {
 +        return ttl() != NO_TTL;
 +    }
 +
 +    public Cell markCounterLocalToBeCleared()
 +    {
 +        if (!isCounterCell())
 +            return this;
 +
 +        ByteBuffer value = value();
 +        ByteBuffer marked = CounterContext.instance().markLocalToBeCleared(value);
 +        return marked == value ? this : new BufferCell(column, timestamp(), ttl(), localDeletionTime(), marked, path());
 +    }
 +
 +    public Cell purge(DeletionPurger purger, int nowInSec)
 +    {
 +        if (!isLive(nowInSec))
 +        {
 +            if (purger.shouldPurge(timestamp(), localDeletionTime()))
 +                return null;
 +
 +            // We slightly hijack purging to convert expired but not purgeable columns to tombstones. The reason we do that is
 +            // that once a column has expired it is equivalent to a tombstone but actually using a tombstone is more compact since
 +            // we don't keep the column value. The reason we do it here is that 1) it's somewhat related to dealing with tombstones
 +            // so hopefully not too surprising and 2) we want to this and purging at the same places, so it's simpler/more efficient
 +            // to do both here.
 +            if (isExpiring())
 +            {
 +                // Note that as long as the expiring column and the tombstone put together live longer than GC grace seconds,
 +                // we'll fulfil our responsibility to repair. See discussion at
 +                // http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/repair-compaction-and-tombstone-rows-td7583481.html
 +                return BufferCell.tombstone(column, timestamp(), localDeletionTime() - ttl(), path()).purge(purger, nowInSec);
 +            }
 +        }
 +        return this;
 +    }
 +
 +    public Cell copy(AbstractAllocator allocator)
 +    {
 +        CellPath path = path();
 +        return new BufferCell(column, timestamp(), ttl(), localDeletionTime(), allocator.clone(value()), path == null ? null : path.copy(allocator));
 +    }
 +
 +    // note: while the cell returned may be different, the value is the same, so if the value is offheap it must be referenced inside a guarded context (or copied)
 +    public Cell updateAllTimestamp(long newTimestamp)
 +    {
 +        return new BufferCell(column, isTombstone() ? newTimestamp - 1 : newTimestamp, ttl(), localDeletionTime(), value(), path());
 +    }
 +
 +    public int dataSize()
 +    {
 +        CellPath path = path();
 +        return TypeSizes.sizeof(timestamp())
 +               + TypeSizes.sizeof(ttl())
 +               + TypeSizes.sizeof(localDeletionTime())
 +               + value().remaining()
 +               + (path == null ? 0 : path.dataSize());
 +    }
 +
      public void digest(MessageDigest digest)
      {
          digest.update(value().duplicate());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a2839b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a2839b/src/java/org/apache/cassandra/tools/JsonTransformer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/JsonTransformer.java
index 360e8ff,5c32035..c3a0a17
--- a/src/java/org/apache/cassandra/tools/JsonTransformer.java
+++ b/src/java/org/apache/cassandra/tools/JsonTransformer.java
@@@ -46,6 -50,7 +47,7 @@@ import org.apache.cassandra.db.rows.Row
  import org.apache.cassandra.db.rows.Unfiltered;
  import org.apache.cassandra.db.rows.UnfilteredRowIterator;
  import org.apache.cassandra.io.sstable.ISSTableScanner;
 -import org.apache.cassandra.transport.Server;
++import org.apache.cassandra.transport.ProtocolVersion;
  import org.apache.cassandra.utils.ByteBufferUtil;
  import org.codehaus.jackson.JsonFactory;
  import org.codehaus.jackson.JsonGenerator;
@@@ -405,9 -411,9 +407,10 @@@ public final class JsonTransforme
              objectIndenter.setCompact(true);
              json.writeFieldName("name");
              AbstractType<?> type = cell.column().type;
++            AbstractType<?> cellType = null;
              json.writeString(cell.column().name.toCQLString());
  
-             if (cell.path() != null && cell.path().size() > 0)
+             if (type.isCollection() && type.isMultiCell()) // non-frozen collection
              {
                  CollectionType ct = (CollectionType) type;
                  json.writeFieldName("path");
@@@ -419,6 -425,6 +422,30 @@@
                  }
                  json.writeEndArray();
                  arrayIndenter.setCompact(false);
++
++                cellType = cell.column().cellValueType();
++            }
++            else if (type.isUDT() && type.isMultiCell()) // non-frozen udt
++            {
++                UserType ut = (UserType) type;
++                json.writeFieldName("path");
++                arrayIndenter.setCompact(true);
++                json.writeStartArray();
++                for (int i = 0; i < cell.path().size(); i++)
++                {
++                    Short fieldPosition = ut.nameComparator().compose(cell.path().get(i));
++                    json.writeString(ut.fieldNameAsString(fieldPosition));
++                }
++                json.writeEndArray();
++                arrayIndenter.setCompact(false);
++
++                // cellType of udt
++                Short fieldPosition = ((UserType) type).nameComparator().compose(cell.path().get(0));
++                cellType = ((UserType) type).fieldType(fieldPosition);
++            }
++            else
++            {
++                cellType = cell.column().cellValueType();
              }
              if (cell.isTombstone())
              {
@@@ -433,7 -439,7 +460,7 @@@
              else
              {
                  json.writeFieldName("value");
-                 json.writeString(cell.column().cellValueType().getString(cell.value()));
 -                json.writeRawValue(cell.column().cellValueType().toJSONString(cell.value(), Server.CURRENT_VERSION));
++                json.writeRawValue(cellType.toJSONString(cell.value(), ProtocolVersion.CURRENT));
              }
              if (liveInfo.isEmpty() || cell.timestamp() != liveInfo.timestamp())
              {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a2839b/test/unit/org/apache/cassandra/cql3/ViewTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/ViewTest.java
index 6f6e04d,f8f8c9f..0853562
--- a/test/unit/org/apache/cassandra/cql3/ViewTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java
@@@ -777,6 -788,6 +777,39 @@@ public class ViewTest extends CQLTeste
      }
  
      @Test
++    public void testFrozenCollectionsWithComplicatedInnerType() throws Throwable
++    {
++        createTable("CREATE TABLE %s (k int, intval int,  listval frozen<list<tuple<text,text>>>, PRIMARY KEY (k))");
++
++        execute("USE " + keyspace());
++        executeNet(protocolVersion, "USE " + keyspace());
++
++        createView("mv",
++                   "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE k IS NOT NULL AND listval IS NOT NULL PRIMARY KEY (k, listval)");
++
++        updateView("INSERT INTO %s (k, intval, listval) VALUES (?, ?, fromJson(?))",
++                   0,
++                   0,
++                   "[[\"a\",\"1\"], [\"b\",\"2\"], [\"c\",\"3\"]]");
++
++        // verify input
++        assertRows(execute("SELECT k, listval FROM %s WHERE k = ?", 0),
++                   row(0, list(tuple("a", "1"), tuple("b", "2"), tuple("c", "3"))));
++        assertRows(execute("SELECT k, listval from mv"),
++                   row(0, list(tuple("a", "1"), tuple("b", "2"), tuple("c", "3"))));
++
++        // update listval with the same value and it will be compared in view generator
++        updateView("INSERT INTO %s (k, listval) VALUES (?, fromJson(?))",
++                   0,
++                   "[[\"a\",\"1\"], [\"b\",\"2\"], [\"c\",\"3\"]]");
++        // verify result
++        assertRows(execute("SELECT k, listval FROM %s WHERE k = ?", 0),
++                   row(0, list(tuple("a", "1"), tuple("b", "2"), tuple("c", "3"))));
++        assertRows(execute("SELECT k, listval from mv"),
++                   row(0, list(tuple("a", "1"), tuple("b", "2"), tuple("c", "3"))));
++    }
++
++    @Test
      public void testUpdate() throws Throwable
      {
          createTable("CREATE TABLE %s (" +


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[4/8] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11

Posted by ad...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a2839b/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
index a6ce08b,0000000..e9051b4
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
@@@ -1,2552 -1,0 +1,2566 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.index.sasi;
 +
 +import java.io.FileWriter;
 +import java.io.Writer;
 +import java.nio.ByteBuffer;
 +import java.nio.file.FileSystems;
 +import java.nio.file.Files;
 +import java.nio.file.Path;
 +import java.nio.file.attribute.BasicFileAttributes;
 +import java.util.*;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.ThreadLocalRandom;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.index.Index;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.cql3.*;
 +import org.apache.cassandra.cql3.Term;
 +import org.apache.cassandra.cql3.statements.IndexTarget;
 +import org.apache.cassandra.cql3.statements.SelectStatement;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.filter.ColumnFilter;
 +import org.apache.cassandra.db.filter.DataLimits;
 +import org.apache.cassandra.db.filter.RowFilter;
 +import org.apache.cassandra.db.lifecycle.SSTableSet;
 +import org.apache.cassandra.db.marshal.*;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.dht.Murmur3Partitioner;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.index.sasi.conf.ColumnIndex;
 +import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
 +import org.apache.cassandra.index.sasi.exceptions.TimeQuotaExceededException;
 +import org.apache.cassandra.index.sasi.memory.IndexMemtable;
 +import org.apache.cassandra.index.sasi.plan.QueryController;
 +import org.apache.cassandra.index.sasi.plan.QueryPlan;
 +import org.apache.cassandra.io.sstable.SSTable;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.schema.KeyspaceMetadata;
 +import org.apache.cassandra.schema.KeyspaceParams;
 +import org.apache.cassandra.schema.Tables;
 +import org.apache.cassandra.serializers.MarshalException;
 +import org.apache.cassandra.serializers.TypeSerializer;
 +import org.apache.cassandra.service.MigrationManager;
 +import org.apache.cassandra.service.QueryState;
 +import org.apache.cassandra.thrift.CqlRow;
 +import org.apache.cassandra.transport.messages.ResultMessage;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.Pair;
 +
 +import com.google.common.collect.Lists;
 +import com.google.common.util.concurrent.Uninterruptibles;
 +
 +import junit.framework.Assert;
 +
 +import org.junit.*;
 +
 +public class SASIIndexTest
 +{
 +    private static final IPartitioner PARTITIONER;
 +
 +    static {
 +        System.setProperty("cassandra.config", "cassandra-murmur.yaml");
 +        PARTITIONER = Murmur3Partitioner.instance;
 +    }
 +
 +    private static final String KS_NAME = "sasi";
 +    private static final String CF_NAME = "test_cf";
 +    private static final String CLUSTERING_CF_NAME_1 = "clustering_test_cf_1";
 +    private static final String CLUSTERING_CF_NAME_2 = "clustering_test_cf_2";
 +    private static final String STATIC_CF_NAME = "static_sasi_test_cf";
 +    private static final String FTS_CF_NAME = "full_text_search_sasi_test_cf";
 +
 +    @BeforeClass
 +    public static void loadSchema() throws ConfigurationException
 +    {
 +        SchemaLoader.loadSchema();
 +        MigrationManager.announceNewKeyspace(KeyspaceMetadata.create(KS_NAME,
 +                                                                     KeyspaceParams.simpleTransient(1),
 +                                                                     Tables.of(SchemaLoader.sasiCFMD(KS_NAME, CF_NAME),
 +                                                                               SchemaLoader.clusteringSASICFMD(KS_NAME, CLUSTERING_CF_NAME_1),
 +                                                                               SchemaLoader.clusteringSASICFMD(KS_NAME, CLUSTERING_CF_NAME_2, "location"),
 +                                                                               SchemaLoader.staticSASICFMD(KS_NAME, STATIC_CF_NAME),
 +                                                                               SchemaLoader.fullTextSearchSASICFMD(KS_NAME, FTS_CF_NAME))));
 +    }
 +
 +    @Before
 +    public void cleanUp()
 +    {
 +        Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME).truncateBlocking();
 +    }
 +
 +    @Test
 +    public void testSingleExpressionQueries() throws Exception
 +    {
 +        testSingleExpressionQueries(false);
 +        cleanupData();
 +        testSingleExpressionQueries(true);
 +    }
 +
 +    private void testSingleExpressionQueries(boolean forceFlush) throws Exception
 +    {
 +        Map<String, Pair<String, Integer>> data = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +            put("key1", Pair.create("Pavel", 14));
 +            put("key2", Pair.create("Pavel", 26));
 +            put("key3", Pair.create("Pavel", 27));
 +            put("key4", Pair.create("Jason", 27));
 +        }};
 +
 +        ColumnFamilyStore store = loadData(data, forceFlush);
 +
 +        final ByteBuffer firstName = UTF8Type.instance.decompose("first_name");
 +        final ByteBuffer age = UTF8Type.instance.decompose("age");
 +
 +        Set<String> rows;
 +
 +        rows = getIndexed(store, 10, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1", "key2", "key3", "key4" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("av")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1", "key2", "key3" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("as")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key4" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("aw")));
 +        Assert.assertEquals(rows.toString(), 0, rows.size());
 +
 +        rows = getIndexed(store, 10, buildExpression(firstName, Operator.LIKE_SUFFIX, UTF8Type.instance.decompose("avel")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1", "key2", "key3" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(firstName, Operator.LIKE_SUFFIX, UTF8Type.instance.decompose("n")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key4" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(age, Operator.EQ, Int32Type.instance.decompose(27)));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[]{"key3", "key4"}, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(age, Operator.EQ, Int32Type.instance.decompose(26)));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(age, Operator.EQ, Int32Type.instance.decompose(13)));
 +        Assert.assertEquals(rows.toString(), 0, rows.size());
 +    }
 +
 +    @Test
 +    public void testEmptyTokenizedResults() throws Exception
 +    {
 +        testEmptyTokenizedResults(false);
 +        cleanupData();
 +        testEmptyTokenizedResults(true);
 +    }
 +
 +    private void testEmptyTokenizedResults(boolean forceFlush) throws Exception
 +    {
 +        Map<String, Pair<String, Integer>> data = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key1", Pair.create("  ", 14));
 +        }};
 +
 +        ColumnFamilyStore store = loadData(data, forceFlush);
 +
 +        Set<String> rows= getIndexed(store, 10, buildExpression(UTF8Type.instance.decompose("first_name"), Operator.LIKE_MATCHES, UTF8Type.instance.decompose("doesntmatter")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[]{}, rows.toArray(new String[rows.size()])));
 +    }
 +
 +    @Test
 +    public void testMultiExpressionQueries() throws Exception
 +    {
 +        testMultiExpressionQueries(false);
 +        cleanupData();
 +        testMultiExpressionQueries(true);
 +    }
 +
 +    public void testMultiExpressionQueries(boolean forceFlush) throws Exception
 +    {
 +        Map<String, Pair<String, Integer>> data = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key1", Pair.create("Pavel", 14));
 +                put("key2", Pair.create("Pavel", 26));
 +                put("key3", Pair.create("Pavel", 27));
 +                put("key4", Pair.create("Jason", 27));
 +        }};
 +
 +        ColumnFamilyStore store = loadData(data, forceFlush);
 +
 +        final ByteBuffer firstName = UTF8Type.instance.decompose("first_name");
 +        final ByteBuffer age = UTF8Type.instance.decompose("age");
 +
 +        Set<String> rows;
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                          buildExpression(age, Operator.GT, Int32Type.instance.decompose(14)));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2", "key3", "key4" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                          buildExpression(age, Operator.LT, Int32Type.instance.decompose(27)));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[]{"key1", "key2"}, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                         buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                         buildExpression(age, Operator.GT, Int32Type.instance.decompose(14)),
 +                         buildExpression(age, Operator.LT, Int32Type.instance.decompose(27)));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                         buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                         buildExpression(age, Operator.GT, Int32Type.instance.decompose(12)));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[]{ "key1", "key2", "key3", "key4" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                         buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                         buildExpression(age, Operator.GTE, Int32Type.instance.decompose(13)));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1", "key2", "key3", "key4" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                         buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                         buildExpression(age, Operator.GTE, Int32Type.instance.decompose(16)));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2", "key3", "key4" }, rows.toArray(new String[rows.size()])));
 +
 +
 +        rows = getIndexed(store, 10,
 +                         buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                         buildExpression(age, Operator.LT, Int32Type.instance.decompose(30)));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1", "key2", "key3", "key4" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                         buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                         buildExpression(age, Operator.LTE, Int32Type.instance.decompose(29)));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1", "key2", "key3", "key4" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                         buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                         buildExpression(age, Operator.LTE, Int32Type.instance.decompose(25)));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[]{ "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(firstName, Operator.LIKE_SUFFIX, UTF8Type.instance.decompose("avel")),
 +                                     buildExpression(age, Operator.LTE, Int32Type.instance.decompose(25)));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(firstName, Operator.LIKE_SUFFIX, UTF8Type.instance.decompose("n")),
 +                                     buildExpression(age, Operator.LTE, Int32Type.instance.decompose(25)));
 +        Assert.assertTrue(rows.isEmpty());
 +
 +    }
 +
 +    @Test
 +    public void testCrossSSTableQueries() throws Exception
 +    {
 +        testCrossSSTableQueries(false);
 +        cleanupData();
 +        testCrossSSTableQueries(true);
 +
 +    }
 +
 +    private void testCrossSSTableQueries(boolean forceFlush) throws Exception
 +    {
 +        Map<String, Pair<String, Integer>> part1 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key0", Pair.create("Maxie", 43));
 +                put("key1", Pair.create("Chelsie", 33));
 +                put("key2", Pair.create("Josephine", 43));
 +                put("key3", Pair.create("Shanna", 27));
 +                put("key4", Pair.create("Amiya", 36));
 +            }};
 +
 +        loadData(part1, forceFlush); // first sstable
 +
 +        Map<String, Pair<String, Integer>> part2 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key5", Pair.create("Americo", 20));
 +                put("key6", Pair.create("Fiona", 39));
 +                put("key7", Pair.create("Francis", 41));
 +                put("key8", Pair.create("Charley", 21));
 +                put("key9", Pair.create("Amely", 40));
 +            }};
 +
 +        loadData(part2, forceFlush);
 +
 +        Map<String, Pair<String, Integer>> part3 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key10", Pair.create("Eddie", 42));
 +                put("key11", Pair.create("Oswaldo", 35));
 +                put("key12", Pair.create("Susana", 35));
 +                put("key13", Pair.create("Alivia", 42));
 +                put("key14", Pair.create("Demario", 28));
 +            }};
 +
 +        ColumnFamilyStore store = loadData(part3, forceFlush);
 +
 +        final ByteBuffer firstName = UTF8Type.instance.decompose("first_name");
 +        final ByteBuffer age = UTF8Type.instance.decompose("age");
 +
 +        Set<String> rows;
 +        rows = getIndexed(store, 10, buildExpression(firstName, Operator.EQ, UTF8Type.instance.decompose("Fiona")),
 +                                     buildExpression(age, Operator.LT, Int32Type.instance.decompose(40)));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key6" }, rows.toArray(new String[rows.size()])));
 +
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key0", "key11", "key12", "key13", "key14",
 +                                                                        "key3", "key4", "key6", "key7", "key8" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 5,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +
 +        Assert.assertEquals(rows.toString(), 5, rows.size());
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                          buildExpression(age, Operator.GTE, Int32Type.instance.decompose(35)));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key0", "key11", "key12", "key13", "key4", "key6", "key7" },
 +                                                         rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                          buildExpression(age, Operator.LT, Int32Type.instance.decompose(32)));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key14", "key3", "key8" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                          buildExpression(age, Operator.GT, Int32Type.instance.decompose(27)),
 +                          buildExpression(age, Operator.LT, Int32Type.instance.decompose(32)));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key14" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                          buildExpression(age, Operator.GT, Int32Type.instance.decompose(10)));
 +
 +        Assert.assertEquals(rows.toString(), 10, rows.size());
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                          buildExpression(age, Operator.LTE, Int32Type.instance.decompose(50)));
 +
 +        Assert.assertEquals(rows.toString(), 10, rows.size());
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_SUFFIX, UTF8Type.instance.decompose("ie")),
 +                          buildExpression(age, Operator.LT, Int32Type.instance.decompose(43)));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1", "key10" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_SUFFIX, UTF8Type.instance.decompose("a")));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key12", "key13", "key3", "key4", "key6" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_SUFFIX, UTF8Type.instance.decompose("a")),
 +                          buildExpression(age, Operator.LT, Int32Type.instance.decompose(33)));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key3" }, rows.toArray(new String[rows.size()])));
 +    }
 +
 +    @Test
 +    public void testQueriesThatShouldBeTokenized() throws Exception
 +    {
 +        testQueriesThatShouldBeTokenized(false);
 +        cleanupData();
 +        testQueriesThatShouldBeTokenized(true);
 +    }
 +
 +    private void testQueriesThatShouldBeTokenized(boolean forceFlush) throws Exception
 +    {
 +        Map<String, Pair<String, Integer>> part1 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key0", Pair.create("If you can dream it, you can do it.", 43));
 +                put("key1", Pair.create("What you get by achieving your goals is not " +
 +                        "as important as what you become by achieving your goals, do it.", 33));
 +                put("key2", Pair.create("Keep your face always toward the sunshine " +
 +                        "- and shadows will fall behind you.", 43));
 +                put("key3", Pair.create("We can't help everyone, but everyone can " +
 +                        "help someone.", 27));
 +            }};
 +
 +        ColumnFamilyStore store = loadData(part1, forceFlush);
 +
 +        final ByteBuffer firstName = UTF8Type.instance.decompose("first_name");
 +        final ByteBuffer age = UTF8Type.instance.decompose("age");
 +
 +        Set<String> rows = getIndexed(store, 10,
 +                buildExpression(firstName, Operator.LIKE_CONTAINS,
 +                        UTF8Type.instance.decompose("What you get by achieving your goals")),
 +                buildExpression(age, Operator.GT, Int32Type.instance.decompose(32)));
 +
 +        Assert.assertEquals(rows.toString(), Collections.singleton("key1"), rows);
 +
 +        rows = getIndexed(store, 10,
 +                buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("do it.")));
 +
 +        Assert.assertEquals(rows.toString(), Arrays.asList("key0", "key1"), Lists.newArrayList(rows));
 +    }
 +
 +    @Test
 +    public void testPrefixSearchWithContainsMode() throws Exception
 +    {
 +        testPrefixSearchWithContainsMode(false);
 +        cleanupData();
 +        testPrefixSearchWithContainsMode(true);
 +    }
 +
 +    private void testPrefixSearchWithContainsMode(boolean forceFlush) throws Exception
 +    {
 +        ColumnFamilyStore store = Keyspace.open(KS_NAME).getColumnFamilyStore(FTS_CF_NAME);
 +
 +        executeCQL(FTS_CF_NAME, "INSERT INTO %s.%s (song_id, title, artist) VALUES(?, ?, ?)", UUID.fromString("1a4abbcd-b5de-4c69-a578-31231e01ff09"), "Poker Face", "Lady Gaga");
 +        executeCQL(FTS_CF_NAME, "INSERT INTO %s.%s (song_id, title, artist) VALUES(?, ?, ?)", UUID.fromString("9472a394-359b-4a06-b1d5-b6afce590598"), "Forgetting the Way Home", "Our Lady of Bells");
 +        executeCQL(FTS_CF_NAME, "INSERT INTO %s.%s (song_id, title, artist) VALUES(?, ?, ?)", UUID.fromString("4f8dc18e-54e6-4e16-b507-c5324b61523b"), "Zamki na piasku", "Lady Pank");
 +        executeCQL(FTS_CF_NAME, "INSERT INTO %s.%s (song_id, title, artist) VALUES(?, ?, ?)", UUID.fromString("eaf294fa-bad5-49d4-8f08-35ba3636a706"), "Koncertowa", "Lady Pank");
 +
 +
 +        if (forceFlush)
 +            store.forceBlockingFlush();
 +
 +        final UntypedResultSet results = executeCQL(FTS_CF_NAME, "SELECT * FROM %s.%s WHERE artist LIKE 'lady%%'");
 +        Assert.assertNotNull(results);
 +        Assert.assertEquals(3, results.size());
 +    }
 +
 +    @Test
 +    public void testMultiExpressionQueriesWhereRowSplitBetweenSSTables() throws Exception
 +    {
 +        testMultiExpressionQueriesWhereRowSplitBetweenSSTables(false);
 +        cleanupData();
 +        testMultiExpressionQueriesWhereRowSplitBetweenSSTables(true);
 +    }
 +
 +    private void testMultiExpressionQueriesWhereRowSplitBetweenSSTables(boolean forceFlush) throws Exception
 +    {
 +        Map<String, Pair<String, Integer>> part1 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key0", Pair.create("Maxie", -1));
 +                put("key1", Pair.create("Chelsie", 33));
 +                put("key2", Pair.create((String)null, 43));
 +                put("key3", Pair.create("Shanna", 27));
 +                put("key4", Pair.create("Amiya", 36));
 +        }};
 +
 +        loadData(part1, forceFlush); // first sstable
 +
 +        Map<String, Pair<String, Integer>> part2 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key5", Pair.create("Americo", 20));
 +                put("key6", Pair.create("Fiona", 39));
 +                put("key7", Pair.create("Francis", 41));
 +                put("key8", Pair.create("Charley", 21));
 +                put("key9", Pair.create("Amely", 40));
 +                put("key14", Pair.create((String)null, 28));
 +        }};
 +
 +        loadData(part2, forceFlush);
 +
 +        Map<String, Pair<String, Integer>> part3 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key0", Pair.create((String)null, 43));
 +                put("key10", Pair.create("Eddie", 42));
 +                put("key11", Pair.create("Oswaldo", 35));
 +                put("key12", Pair.create("Susana", 35));
 +                put("key13", Pair.create("Alivia", 42));
 +                put("key14", Pair.create("Demario", -1));
 +                put("key2", Pair.create("Josephine", -1));
 +        }};
 +
 +        ColumnFamilyStore store = loadData(part3, forceFlush);
 +
 +        final ByteBuffer firstName = UTF8Type.instance.decompose("first_name");
 +        final ByteBuffer age = UTF8Type.instance.decompose("age");
 +
 +        Set<String> rows = getIndexed(store, 10,
 +                                      buildExpression(firstName, Operator.EQ, UTF8Type.instance.decompose("Fiona")),
 +                                      buildExpression(age, Operator.LT, Int32Type.instance.decompose(40)));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key6" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key0", "key11", "key12", "key13", "key14",
 +                                                                        "key3", "key4", "key6", "key7", "key8" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 5,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +
 +        Assert.assertEquals(rows.toString(), 5, rows.size());
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                          buildExpression(age, Operator.GTE, Int32Type.instance.decompose(35)));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key0", "key11", "key12", "key13", "key4", "key6", "key7" },
 +                                                         rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                          buildExpression(age, Operator.LT, Int32Type.instance.decompose(32)));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key14", "key3", "key8" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                          buildExpression(age, Operator.GT, Int32Type.instance.decompose(27)),
 +                          buildExpression(age, Operator.LT, Int32Type.instance.decompose(32)));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key14" }, rows.toArray(new String[rows.size()])));
 +
 +        Map<String, Pair<String, Integer>> part4 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key12", Pair.create((String)null, 12));
 +                put("key14", Pair.create("Demario", 42));
 +                put("key2", Pair.create("Frank", -1));
 +        }};
 +
 +        store = loadData(part4, forceFlush);
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_MATCHES, UTF8Type.instance.decompose("Susana")),
 +                          buildExpression(age, Operator.LTE, Int32Type.instance.decompose(13)),
 +                          buildExpression(age, Operator.GT, Int32Type.instance.decompose(10)));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key12" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_MATCHES, UTF8Type.instance.decompose("Demario")),
 +                          buildExpression(age, Operator.LTE, Int32Type.instance.decompose(30)));
 +        Assert.assertTrue(rows.toString(), rows.size() == 0);
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_MATCHES, UTF8Type.instance.decompose("Josephine")));
 +        Assert.assertTrue(rows.toString(), rows.size() == 0);
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                          buildExpression(age, Operator.GT, Int32Type.instance.decompose(10)));
 +
 +        Assert.assertEquals(rows.toString(), 10, rows.size());
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                          buildExpression(age, Operator.LTE, Int32Type.instance.decompose(50)));
 +
 +        Assert.assertEquals(rows.toString(), 10, rows.size());
 +
 +        rows = getIndexed(store, 10,
 +                          buildExpression(firstName, Operator.LIKE_SUFFIX, UTF8Type.instance.decompose("ie")),
 +                          buildExpression(age, Operator.LTE, Int32Type.instance.decompose(43)));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key0", "key1", "key10" }, rows.toArray(new String[rows.size()])));
 +    }
 +
 +    @Test
 +    public void testPagination() throws Exception
 +    {
 +        testPagination(false);
 +        cleanupData();
 +        testPagination(true);
 +    }
 +
 +    private void testPagination(boolean forceFlush) throws Exception
 +    {
 +        // split data into 3 distinct SSTables to test paging with overlapping token intervals.
 +
 +        Map<String, Pair<String, Integer>> part1 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key01", Pair.create("Ali", 33));
 +                put("key02", Pair.create("Jeremy", 41));
 +                put("key03", Pair.create("Elvera", 22));
 +                put("key04", Pair.create("Bailey", 45));
 +                put("key05", Pair.create("Emerson", 32));
 +                put("key06", Pair.create("Kadin", 38));
 +                put("key07", Pair.create("Maggie", 36));
 +                put("key08", Pair.create("Kailey", 36));
 +                put("key09", Pair.create("Armand", 21));
 +                put("key10", Pair.create("Arnold", 35));
 +        }};
 +
 +        Map<String, Pair<String, Integer>> part2 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key11", Pair.create("Ken", 38));
 +                put("key12", Pair.create("Penelope", 43));
 +                put("key13", Pair.create("Wyatt", 34));
 +                put("key14", Pair.create("Johnpaul", 34));
 +                put("key15", Pair.create("Trycia", 43));
 +                put("key16", Pair.create("Aida", 21));
 +                put("key17", Pair.create("Devon", 42));
 +        }};
 +
 +        Map<String, Pair<String, Integer>> part3 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key18", Pair.create("Christina", 20));
 +                put("key19", Pair.create("Rick", 19));
 +                put("key20", Pair.create("Fannie", 22));
 +                put("key21", Pair.create("Keegan", 29));
 +                put("key22", Pair.create("Ignatius", 36));
 +                put("key23", Pair.create("Ellis", 26));
 +                put("key24", Pair.create("Annamarie", 29));
 +                put("key25", Pair.create("Tianna", 31));
 +                put("key26", Pair.create("Dennis", 32));
 +        }};
 +
 +        ColumnFamilyStore store = loadData(part1, forceFlush);
 +
 +        loadData(part2, forceFlush);
 +        loadData(part3, forceFlush);
 +
 +        final ByteBuffer firstName = UTF8Type.instance.decompose("first_name");
 +        final ByteBuffer age = UTF8Type.instance.decompose("age");
 +
 +        Set<DecoratedKey> uniqueKeys = getPaged(store, 4,
 +                buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                buildExpression(age, Operator.GTE, Int32Type.instance.decompose(21)));
 +
 +
 +        List<String> expected = new ArrayList<String>()
 +        {{
 +                add("key25");
 +                add("key20");
 +                add("key13");
 +                add("key22");
 +                add("key09");
 +                add("key14");
 +                add("key16");
 +                add("key24");
 +                add("key03");
 +                add("key04");
 +                add("key08");
 +                add("key07");
 +                add("key15");
 +                add("key06");
 +                add("key21");
 +        }};
 +
 +        Assert.assertEquals(expected, convert(uniqueKeys));
 +
 +        // now let's test a single equals condition
 +
 +        uniqueKeys = getPaged(store, 4, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +
 +        expected = new ArrayList<String>()
 +        {{
 +                add("key25");
 +                add("key20");
 +                add("key13");
 +                add("key22");
 +                add("key09");
 +                add("key14");
 +                add("key16");
 +                add("key24");
 +                add("key03");
 +                add("key04");
 +                add("key18");
 +                add("key08");
 +                add("key07");
 +                add("key15");
 +                add("key06");
 +                add("key21");
 +        }};
 +
 +        Assert.assertEquals(expected, convert(uniqueKeys));
 +
 +        // now let's test something which is smaller than a single page
 +        uniqueKeys = getPaged(store, 4,
 +                              buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                              buildExpression(age, Operator.EQ, Int32Type.instance.decompose(36)));
 +
 +        expected = new ArrayList<String>()
 +        {{
 +                add("key22");
 +                add("key08");
 +                add("key07");
 +        }};
 +
 +        Assert.assertEquals(expected, convert(uniqueKeys));
 +
 +        // the same but with the page size of 2 to test minimal pagination windows
 +
 +        uniqueKeys = getPaged(store, 2,
 +                              buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                              buildExpression(age, Operator.EQ, Int32Type.instance.decompose(36)));
 +
 +        Assert.assertEquals(expected, convert(uniqueKeys));
 +
 +        // and last but not least, test age range query with pagination
 +        uniqueKeys = getPaged(store, 4,
 +                buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                buildExpression(age, Operator.GT, Int32Type.instance.decompose(20)),
 +                buildExpression(age, Operator.LTE, Int32Type.instance.decompose(36)));
 +
 +        expected = new ArrayList<String>()
 +        {{
 +                add("key25");
 +                add("key20");
 +                add("key13");
 +                add("key22");
 +                add("key09");
 +                add("key14");
 +                add("key16");
 +                add("key24");
 +                add("key03");
 +                add("key08");
 +                add("key07");
 +                add("key21");
 +        }};
 +
 +        Assert.assertEquals(expected, convert(uniqueKeys));
 +
 +        Set<String> rows;
 +
 +        rows = executeCQLWithKeys(String.format("SELECT * FROM %s.%s WHERE first_name LIKE '%%a%%' limit 10 ALLOW FILTERING;", KS_NAME, CF_NAME));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key03", "key04", "key09", "key13", "key14", "key16", "key20", "key22", "key24", "key25" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = executeCQLWithKeys(String.format("SELECT * FROM %s.%s WHERE first_name LIKE '%%a%%' and token(id) >= token('key14') limit 5 ALLOW FILTERING;", KS_NAME, CF_NAME));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key03", "key04", "key14", "key16", "key24" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = executeCQLWithKeys(String.format("SELECT * FROM %s.%s WHERE first_name LIKE '%%a%%' and token(id) >= token('key14') and token(id) <= token('key24') limit 5 ALLOW FILTERING;", KS_NAME, CF_NAME));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key14", "key16", "key24" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = executeCQLWithKeys(String.format("SELECT * FROM %s.%s WHERE first_name LIKE '%%a%%' and age > 30 and token(id) >= token('key14') and token(id) <= token('key24') limit 5 ALLOW FILTERING;", KS_NAME, CF_NAME));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key14" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = executeCQLWithKeys(String.format("SELECT * FROM %s.%s WHERE first_name like '%%ie' limit 5 ALLOW FILTERING;", KS_NAME, CF_NAME));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key07", "key20", "key24" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = executeCQLWithKeys(String.format("SELECT * FROM %s.%s WHERE first_name like '%%ie' AND token(id) > token('key24') limit 5 ALLOW FILTERING;", KS_NAME, CF_NAME));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key07", "key24" }, rows.toArray(new String[rows.size()])));
 +    }
 +
 +    @Test
 +    public void testColumnNamesWithSlashes() throws Exception
 +    {
 +        testColumnNamesWithSlashes(false);
 +        cleanupData();
 +        testColumnNamesWithSlashes(true);
 +    }
 +
 +    private void testColumnNamesWithSlashes(boolean forceFlush) throws Exception
 +    {
 +        ColumnFamilyStore store = Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME);
 +
 +        Mutation rm1 = new Mutation(KS_NAME, decoratedKey(AsciiType.instance.decompose("key1")));
 +        rm1.add(PartitionUpdate.singleRowUpdate(store.metadata,
 +                                                rm1.key(),
 +                                                buildRow(buildCell(store.metadata,
 +                                                                   UTF8Type.instance.decompose("/data/output/id"),
 +                                                                   AsciiType.instance.decompose("jason"),
 +                                                                   System.currentTimeMillis()))));
 +
 +        Mutation rm2 = new Mutation(KS_NAME, decoratedKey(AsciiType.instance.decompose("key2")));
 +        rm2.add(PartitionUpdate.singleRowUpdate(store.metadata,
 +                                                rm2.key(),
 +                                                buildRow(buildCell(store.metadata,
 +                                                                   UTF8Type.instance.decompose("/data/output/id"),
 +                                                                   AsciiType.instance.decompose("pavel"),
 +                                                                   System.currentTimeMillis()))));
 +
 +        Mutation rm3 = new Mutation(KS_NAME, decoratedKey(AsciiType.instance.decompose("key3")));
 +        rm3.add(PartitionUpdate.singleRowUpdate(store.metadata,
 +                                                rm3.key(),
 +                                                buildRow(buildCell(store.metadata,
 +                                                                   UTF8Type.instance.decompose("/data/output/id"),
 +                                                                   AsciiType.instance.decompose("Aleksey"),
 +                                                                   System.currentTimeMillis()))));
 +
 +        rm1.apply();
 +        rm2.apply();
 +        rm3.apply();
 +
 +        if (forceFlush)
 +            store.forceBlockingFlush();
 +
 +        final ByteBuffer dataOutputId = UTF8Type.instance.decompose("/data/output/id");
 +
 +        Set<String> rows = getIndexed(store, 10, buildExpression(dataOutputId, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1", "key2" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(dataOutputId, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("A")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[]{ "key3" }, rows.toArray(new String[rows.size()])));
 +
 +        // doesn't really make sense to rebuild index for in-memory data
 +        if (!forceFlush)
 +            return;
 +
 +        store.indexManager.invalidateAllIndexesBlocking();
 +
 +        rows = getIndexed(store, 10, buildExpression(dataOutputId, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +        Assert.assertTrue(rows.toString(), rows.isEmpty());
 +
 +        rows = getIndexed(store, 10, buildExpression(dataOutputId, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("A")));
 +        Assert.assertTrue(rows.toString(), rows.isEmpty());
 +
 +        // now let's trigger index rebuild and check if we got the data back
 +        store.indexManager.buildIndexBlocking(store.indexManager.getIndexByName("data_output_id"));
 +
 +        rows = getIndexed(store, 10, buildExpression(dataOutputId, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1", "key2" }, rows.toArray(new String[rows.size()])));
 +
 +        // also let's try to build an index for column which has no data to make sure that doesn't fail
 +        store.indexManager.buildIndexBlocking(store.indexManager.getIndexByName("first_name"));
 +        store.indexManager.buildIndexBlocking(store.indexManager.getIndexByName("data_output_id"));
 +
 +        rows = getIndexed(store, 10, buildExpression(dataOutputId, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1", "key2" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(dataOutputId, Operator.LIKE_SUFFIX, UTF8Type.instance.decompose("el")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2" }, rows.toArray(new String[rows.size()])));
 +    }
 +
 +    @Test
 +    public void testInvalidate() throws Exception
 +    {
 +        testInvalidate(false);
 +        cleanupData();
 +        testInvalidate(true);
 +    }
 +
 +    private void testInvalidate(boolean forceFlush) throws Exception
 +    {
 +        Map<String, Pair<String, Integer>> part1 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key0", Pair.create("Maxie", -1));
 +                put("key1", Pair.create("Chelsie", 33));
 +                put("key2", Pair.create((String) null, 43));
 +                put("key3", Pair.create("Shanna", 27));
 +                put("key4", Pair.create("Amiya", 36));
 +        }};
 +
 +        ColumnFamilyStore store = loadData(part1, forceFlush);
 +
 +        final ByteBuffer firstName = UTF8Type.instance.decompose("first_name");
 +        final ByteBuffer age = UTF8Type.instance.decompose("age");
 +
 +        Set<String> rows = getIndexed(store, 10, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[]{ "key0", "key3", "key4" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(age, Operator.EQ, Int32Type.instance.decompose(33)));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[]{ "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        store.indexManager.invalidateAllIndexesBlocking();
 +
 +        rows = getIndexed(store, 10, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +        Assert.assertTrue(rows.toString(), rows.isEmpty());
 +
 +        rows = getIndexed(store, 10, buildExpression(age, Operator.EQ, Int32Type.instance.decompose(33)));
 +        Assert.assertTrue(rows.toString(), rows.isEmpty());
 +
 +
 +        Map<String, Pair<String, Integer>> part2 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key5", Pair.create("Americo", 20));
 +                put("key6", Pair.create("Fiona", 39));
 +                put("key7", Pair.create("Francis", 41));
 +                put("key8", Pair.create("Fred", 21));
 +                put("key9", Pair.create("Amely", 40));
 +                put("key14", Pair.create("Dino", 28));
 +        }};
 +
 +        loadData(part2, forceFlush);
 +
 +        rows = getIndexed(store, 10, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[]{ "key6", "key7" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(age, Operator.EQ, Int32Type.instance.decompose(40)));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[]{ "key9" }, rows.toArray(new String[rows.size()])));
 +    }
 +
 +    @Test
 +    public void testTruncate()
 +    {
 +        Map<String, Pair<String, Integer>> part1 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key01", Pair.create("Ali", 33));
 +                put("key02", Pair.create("Jeremy", 41));
 +                put("key03", Pair.create("Elvera", 22));
 +                put("key04", Pair.create("Bailey", 45));
 +                put("key05", Pair.create("Emerson", 32));
 +                put("key06", Pair.create("Kadin", 38));
 +                put("key07", Pair.create("Maggie", 36));
 +                put("key08", Pair.create("Kailey", 36));
 +                put("key09", Pair.create("Armand", 21));
 +                put("key10", Pair.create("Arnold", 35));
 +        }};
 +
 +        Map<String, Pair<String, Integer>> part2 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key11", Pair.create("Ken", 38));
 +                put("key12", Pair.create("Penelope", 43));
 +                put("key13", Pair.create("Wyatt", 34));
 +                put("key14", Pair.create("Johnpaul", 34));
 +                put("key15", Pair.create("Trycia", 43));
 +                put("key16", Pair.create("Aida", 21));
 +                put("key17", Pair.create("Devon", 42));
 +        }};
 +
 +        Map<String, Pair<String, Integer>> part3 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key18", Pair.create("Christina", 20));
 +                put("key19", Pair.create("Rick", 19));
 +                put("key20", Pair.create("Fannie", 22));
 +                put("key21", Pair.create("Keegan", 29));
 +                put("key22", Pair.create("Ignatius", 36));
 +                put("key23", Pair.create("Ellis", 26));
 +                put("key24", Pair.create("Annamarie", 29));
 +                put("key25", Pair.create("Tianna", 31));
 +                put("key26", Pair.create("Dennis", 32));
 +        }};
 +
 +        ColumnFamilyStore store = loadData(part1, 1000, true);
 +
 +        loadData(part2, 2000, true);
 +        loadData(part3, 3000, true);
 +
 +        final ByteBuffer firstName = UTF8Type.instance.decompose("first_name");
 +
 +        Set<String> rows = getIndexed(store, 100, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +        Assert.assertEquals(rows.toString(), 16, rows.size());
 +
 +        // make sure we don't prematurely delete anything
 +        store.indexManager.truncateAllIndexesBlocking(500);
 +
 +        rows = getIndexed(store, 100, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +        Assert.assertEquals(rows.toString(), 16, rows.size());
 +
 +        store.indexManager.truncateAllIndexesBlocking(1500);
 +
 +        rows = getIndexed(store, 100, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +        Assert.assertEquals(rows.toString(), 10, rows.size());
 +
 +        store.indexManager.truncateAllIndexesBlocking(2500);
 +
 +        rows = getIndexed(store, 100, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +        Assert.assertEquals(rows.toString(), 6, rows.size());
 +
 +        store.indexManager.truncateAllIndexesBlocking(3500);
 +
 +        rows = getIndexed(store, 100, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +        Assert.assertEquals(rows.toString(), 0, rows.size());
 +
 +        // add back in some data just to make sure it all still works
 +        Map<String, Pair<String, Integer>> part4 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key40", Pair.create("Tianna", 31));
 +                put("key41", Pair.create("Dennis", 32));
 +        }};
 +
 +        loadData(part4, 4000, true);
 +
 +        rows = getIndexed(store, 100, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +        Assert.assertEquals(rows.toString(), 1, rows.size());
 +    }
 +
 +
 +    @Test
 +    public void testConcurrentMemtableReadsAndWrites() throws Exception
 +    {
 +        final ColumnFamilyStore store = Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME);
 +
 +        ExecutorService scheduler = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
 +
 +        final int writeCount = 10000;
 +        final AtomicInteger updates = new AtomicInteger(0);
 +
 +        for (int i = 0; i < writeCount; i++)
 +        {
 +            final String key = "key" + i;
 +            final String firstName = "first_name#" + i;
 +            final String lastName = "last_name#" + i;
 +
 +            scheduler.submit((Runnable) () -> {
 +                try
 +                {
 +                    newMutation(key, firstName, lastName, 26, System.currentTimeMillis()).apply();
 +                    Uninterruptibles.sleepUninterruptibly(5, TimeUnit.MILLISECONDS); // back up a bit to do more reads
 +                }
 +                finally
 +                {
 +                    updates.incrementAndGet();
 +                }
 +            });
 +        }
 +
 +        final ByteBuffer firstName = UTF8Type.instance.decompose("first_name");
 +        final ByteBuffer age = UTF8Type.instance.decompose("age");
 +
 +        int previousCount = 0;
 +
 +        do
 +        {
 +            // this loop figures out if number of search results monotonically increasing
 +            // to make sure that concurrent updates don't interfere with reads, uses first_name and age
 +            // indexes to test correctness of both Trie and SkipList ColumnIndex implementations.
 +
 +            Set<DecoratedKey> rows = getPaged(store, 100, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                                                          buildExpression(age, Operator.EQ, Int32Type.instance.decompose(26)));
 +
 +            Assert.assertTrue(previousCount <= rows.size());
 +            previousCount = rows.size();
 +        }
 +        while (updates.get() < writeCount);
 +
 +        // to make sure that after all of the right are done we can read all "count" worth of rows
 +        Set<DecoratedKey> rows = getPaged(store, 100, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                                                      buildExpression(age, Operator.EQ, Int32Type.instance.decompose(26)));
 +
 +        Assert.assertEquals(writeCount, rows.size());
 +    }
 +
 +    @Test
 +    public void testSameKeyInMemtableAndSSTables()
 +    {
 +        final ByteBuffer firstName = UTF8Type.instance.decompose("first_name");
 +        final ByteBuffer age = UTF8Type.instance.decompose("age");
 +
 +        Map<String, Pair<String, Integer>> data1 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key1", Pair.create("Pavel", 14));
 +                put("key2", Pair.create("Pavel", 26));
 +                put("key3", Pair.create("Pavel", 27));
 +                put("key4", Pair.create("Jason", 27));
 +        }};
 +
 +        ColumnFamilyStore store = loadData(data1, true);
 +
 +        Map<String, Pair<String, Integer>> data2 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key1", Pair.create("Pavel", 14));
 +                put("key2", Pair.create("Pavel", 27));
 +                put("key4", Pair.create("Jason", 28));
 +        }};
 +
 +        loadData(data2, true);
 +
 +        Map<String, Pair<String, Integer>> data3 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key1", Pair.create("Pavel", 15));
 +                put("key4", Pair.create("Jason", 29));
 +        }};
 +
 +        loadData(data3, false);
 +
 +        Set<String> rows = getIndexed(store, 100, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1", "key2", "key3", "key4" }, rows.toArray(new String[rows.size()])));
 +
 +
 +        rows = getIndexed(store, 100, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                                      buildExpression(age, Operator.EQ, Int32Type.instance.decompose(15)));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 100, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                                      buildExpression(age, Operator.EQ, Int32Type.instance.decompose(29)));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key4" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 100, buildExpression(firstName, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("a")),
 +                                      buildExpression(age, Operator.EQ, Int32Type.instance.decompose(27)));
 +
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[]{"key2", "key3"}, rows.toArray(new String[rows.size()])));
 +    }
 +
 +    @Test
 +    public void testInsertingIncorrectValuesIntoAgeIndex()
 +    {
 +        ColumnFamilyStore store = Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME);
 +
 +        final ByteBuffer firstName = UTF8Type.instance.decompose("first_name");
 +        final ByteBuffer age = UTF8Type.instance.decompose("age");
 +
 +        Mutation rm = new Mutation(KS_NAME, decoratedKey(AsciiType.instance.decompose("key1")));
 +        update(rm, new ArrayList<Cell>()
 +        {{
 +            add(buildCell(age, LongType.instance.decompose(26L), System.currentTimeMillis()));
 +            add(buildCell(firstName, AsciiType.instance.decompose("pavel"), System.currentTimeMillis()));
 +        }});
 +        rm.apply();
 +
 +        store.forceBlockingFlush();
 +
 +        Set<String> rows = getIndexed(store, 10, buildExpression(firstName, Operator.EQ, UTF8Type.instance.decompose("a")),
 +                                                 buildExpression(age, Operator.GTE, Int32Type.instance.decompose(26)));
 +
 +        // index is expected to have 0 results because age value was of wrong type
 +        Assert.assertEquals(0, rows.size());
 +    }
 +
 +
 +    @Test
 +    public void testUnicodeSupport()
 +    {
 +        testUnicodeSupport(false);
 +        cleanupData();
 +        testUnicodeSupport(true);
 +    }
 +
 +    private void testUnicodeSupport(boolean forceFlush)
 +    {
 +        ColumnFamilyStore store = Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME);
 +
 +        final ByteBuffer comment = UTF8Type.instance.decompose("comment");
 +
 +        Mutation rm = new Mutation(KS_NAME, decoratedKey("key1"));
 +        update(rm, comment, UTF8Type.instance.decompose("ⓈⓅⒺⒸⒾⒶⓁ ⒞⒣⒜⒭⒮ and normal ones"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key2"));
 +        update(rm, comment, UTF8Type.instance.decompose("龍馭鬱"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key3"));
 +        update(rm, comment, UTF8Type.instance.decompose("インディアナ"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key4"));
 +        update(rm, comment, UTF8Type.instance.decompose("レストラン"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key5"));
 +        update(rm, comment, UTF8Type.instance.decompose("ベンジャミン ウエスト"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        if (forceFlush)
 +            store.forceBlockingFlush();
 +
 +        Set<String> rows;
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("ⓈⓅⒺⒸⒾ")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("normal")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("龍")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("鬱")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("馭鬱")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("龍馭鬱")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("ベンジャミン")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key5" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("レストラ")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key4" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("インディ")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key3" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("ベンジャミ")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key5" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_SUFFIX, UTF8Type.instance.decompose("ン")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key4", "key5" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_MATCHES, UTF8Type.instance.decompose("レストラン")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key4" }, rows.toArray(new String[rows.size()])));
 +    }
 +
 +    @Test
 +    public void testUnicodeSuffixModeNoSplits()
 +    {
 +        testUnicodeSuffixModeNoSplits(false);
 +        cleanupData();
 +        testUnicodeSuffixModeNoSplits(true);
 +    }
 +
 +    private void testUnicodeSuffixModeNoSplits(boolean forceFlush)
 +    {
 +        ColumnFamilyStore store = Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME);
 +
 +        final ByteBuffer comment = UTF8Type.instance.decompose("comment_suffix_split");
 +
 +        Mutation rm = new Mutation(KS_NAME, decoratedKey("key1"));
 +        update(rm, comment, UTF8Type.instance.decompose("龍馭鬱"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key2"));
 +        update(rm, comment, UTF8Type.instance.decompose("インディアナ"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key3"));
 +        update(rm, comment, UTF8Type.instance.decompose("レストラン"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key4"));
 +        update(rm, comment, UTF8Type.instance.decompose("ベンジャミン ウエスト"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        if (forceFlush)
 +            store.forceBlockingFlush();
 +
 +        Set<String> rows;
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("龍")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("鬱")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("馭鬱")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("龍馭鬱")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("ベンジャミン")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key4" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("トラン")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key3" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("ディア")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("ジャミン")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key4" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_CONTAINS, UTF8Type.instance.decompose("ン")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2", "key3", "key4" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_SUFFIX, UTF8Type.instance.decompose("ン")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key3" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_MATCHES, UTF8Type.instance.decompose("ベンジャミン ウエスト")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key4" }, rows.toArray(new String[rows.size()])));
 +    }
 +
 +    @Test
 +    public void testThatTooBigValueIsRejected()
 +    {
 +        ColumnFamilyStore store = Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME);
 +
 +        final ByteBuffer comment = UTF8Type.instance.decompose("comment_suffix_split");
 +
 +        for (int i = 0; i < 10; i++)
 +        {
 +            byte[] randomBytes = new byte[ThreadLocalRandom.current().nextInt(OnDiskIndexBuilder.MAX_TERM_SIZE, 5 * OnDiskIndexBuilder.MAX_TERM_SIZE)];
 +            ThreadLocalRandom.current().nextBytes(randomBytes);
 +
 +            final ByteBuffer bigValue = UTF8Type.instance.decompose(new String(randomBytes));
 +
 +            Mutation rm = new Mutation(KS_NAME, decoratedKey("key1"));
 +            update(rm, comment, bigValue, System.currentTimeMillis());
 +            rm.apply();
 +
 +            Set<String> rows;
 +
 +            rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_MATCHES, bigValue.duplicate()));
 +            Assert.assertEquals(0, rows.size());
 +
 +            store.forceBlockingFlush();
 +
 +            rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_MATCHES, bigValue.duplicate()));
 +            Assert.assertEquals(0, rows.size());
 +        }
 +    }
 +
 +    @Test
 +    public void testSearchTimeouts() throws Exception
 +    {
 +        final ByteBuffer firstName = UTF8Type.instance.decompose("first_name");
 +
 +        Map<String, Pair<String, Integer>> data1 = new HashMap<String, Pair<String, Integer>>()
 +        {{
 +                put("key1", Pair.create("Pavel", 14));
 +                put("key2", Pair.create("Pavel", 26));
 +                put("key3", Pair.create("Pavel", 27));
 +                put("key4", Pair.create("Jason", 27));
 +        }};
 +
 +        ColumnFamilyStore store = loadData(data1, true);
 +
 +        RowFilter filter = RowFilter.create();
 +        filter.add(store.metadata.getColumnDefinition(firstName), Operator.LIKE_CONTAINS, AsciiType.instance.fromString("a"));
 +
 +        ReadCommand command = new PartitionRangeReadCommand(store.metadata,
 +                                                            FBUtilities.nowInSeconds(),
 +                                                            ColumnFilter.all(store.metadata),
 +                                                            filter,
 +                                                            DataLimits.NONE,
 +                                                            DataRange.allData(store.metadata.partitioner),
 +                                                            Optional.empty());
 +
 +        try
 +        {
 +            new QueryPlan(store, command, 0).execute(ReadExecutionController.empty());
 +            Assert.fail();
 +        }
 +        catch (TimeQuotaExceededException e)
 +        {
 +            // correct behavior
 +        }
 +        catch (Exception e)
 +        {
 +            Assert.fail();
 +            e.printStackTrace();
 +        }
 +
 +        // to make sure that query doesn't fail in normal conditions
 +
 +        try (ReadExecutionController controller = command.executionController())
 +        {
 +            Set<String> rows = getKeys(new QueryPlan(store, command, DatabaseDescriptor.getRangeRpcTimeout()).execute(controller));
 +            Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1", "key2", "key3", "key4" }, rows.toArray(new String[rows.size()])));
 +        }
 +    }
 +
 +    @Test
 +    public void testLowerCaseAnalyzer()
 +    {
 +        testLowerCaseAnalyzer(false);
 +        cleanupData();
 +        testLowerCaseAnalyzer(true);
 +    }
 +
 +    @Test
 +    public void testChinesePrefixSearch()
 +    {
 +        ColumnFamilyStore store = Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME);
 +
 +        final ByteBuffer fullName = UTF8Type.instance.decompose("/output/full-name/");
 +
 +        Mutation rm = new Mutation(KS_NAME, decoratedKey("key1"));
 +        update(rm, fullName, UTF8Type.instance.decompose("美加 八田"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key2"));
 +        update(rm, fullName, UTF8Type.instance.decompose("仁美 瀧澤"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key3"));
 +        update(rm, fullName, UTF8Type.instance.decompose("晃宏 高須"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key4"));
 +        update(rm, fullName, UTF8Type.instance.decompose("弘孝 大竹"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key5"));
 +        update(rm, fullName, UTF8Type.instance.decompose("満枝 榎本"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key6"));
 +        update(rm, fullName, UTF8Type.instance.decompose("飛鳥 上原"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key7"));
 +        update(rm, fullName, UTF8Type.instance.decompose("大輝 鎌田"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key8"));
 +        update(rm, fullName, UTF8Type.instance.decompose("利久 寺地"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        store.forceBlockingFlush();
 +
 +
 +        Set<String> rows;
 +
 +        rows = getIndexed(store, 10, buildExpression(fullName, Operator.EQ, UTF8Type.instance.decompose("美加 八田")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(fullName, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("美加")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(fullName, Operator.EQ, UTF8Type.instance.decompose("晃宏 高須")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key3" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(fullName, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("大輝")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key7" }, rows.toArray(new String[rows.size()])));
 +    }
 +
 +    public void testLowerCaseAnalyzer(boolean forceFlush)
 +    {
 +        ColumnFamilyStore store = Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME);
 +
 +        final ByteBuffer comment = UTF8Type.instance.decompose("address");
 +
 +        Mutation rm = new Mutation(KS_NAME, decoratedKey("key1"));
 +        update(rm, comment, UTF8Type.instance.decompose("577 Rogahn Valleys Apt. 178"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key2"));
 +        update(rm, comment, UTF8Type.instance.decompose("89809 Beverly Course Suite 089"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key3"));
 +        update(rm, comment, UTF8Type.instance.decompose("165 clydie oval apt. 399"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        if (forceFlush)
 +            store.forceBlockingFlush();
 +
 +        Set<String> rows;
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("577 Rogahn Valleys")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("577 ROgAhn VallEYs")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("577 rogahn valleys")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("577 rogahn")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("57")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("89809 Beverly Course")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("89809 BEVERly COURSE")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("89809 beverly course")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("89809 Beverly")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("8980")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("165 ClYdie OvAl APT. 399")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key3" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("165 Clydie Oval Apt. 399")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key3" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("165 clydie oval apt. 399")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key3" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("165 ClYdie OvA")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key3" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("165 ClYdi")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key3" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(comment, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("165")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key3" }, rows.toArray(new String[rows.size()])));
 +    }
 +
 +    @Test
 +    public void testPrefixSSTableLookup()
 +    {
 +        // This test coverts particular case which interval lookup can return invalid results
 +        // when queried on the prefix e.g. "j".
 +        ColumnFamilyStore store = Keyspace.open(KS_NAME).getColumnFamilyStore(CF_NAME);
 +
 +        final ByteBuffer name = UTF8Type.instance.decompose("first_name_prefix");
 +
 +        Mutation rm;
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key1"));
 +        update(rm, name, UTF8Type.instance.decompose("Pavel"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key2"));
 +        update(rm, name, UTF8Type.instance.decompose("Jordan"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key3"));
 +        update(rm, name, UTF8Type.instance.decompose("Mikhail"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key4"));
 +        update(rm, name, UTF8Type.instance.decompose("Michael"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key5"));
 +        update(rm, name, UTF8Type.instance.decompose("Johnny"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        // first flush would make interval for name - 'johnny' -> 'pavel'
 +        store.forceBlockingFlush();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key6"));
 +        update(rm, name, UTF8Type.instance.decompose("Jason"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key7"));
 +        update(rm, name, UTF8Type.instance.decompose("Vijay"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        rm = new Mutation(KS_NAME, decoratedKey("key8")); // this name is going to be tokenized
 +        update(rm, name, UTF8Type.instance.decompose("Jean-Claude"), System.currentTimeMillis());
 +        rm.apply();
 +
 +        // this flush is going to produce range - 'jason' -> 'vijay'
 +        store.forceBlockingFlush();
 +
 +        // make sure that overlap of the prefixes is properly handled across sstables
 +        // since simple interval tree lookup is not going to cover it, prefix lookup actually required.
 +
 +        Set<String> rows;
 +
 +        rows = getIndexed(store, 10, buildExpression(name, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("J")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2", "key5", "key6", "key8"}, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(name, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("j")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2", "key5", "key6", "key8" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(name, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("m")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key3", "key4" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(name, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("v")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key7" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(name, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("p")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(name, Operator.LIKE_PREFIX, UTF8Type.instance.decompose("j")),
 +                                     buildExpression(name, Operator.NEQ, UTF8Type.instance.decompose("joh")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key2", "key6", "key8" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(name, Operator.LIKE_MATCHES, UTF8Type.instance.decompose("pavel")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(name, Operator.EQ, UTF8Type.instance.decompose("Pave")));
 +        Assert.assertTrue(rows.isEmpty());
 +
 +        rows = getIndexed(store, 10, buildExpression(name, Operator.EQ, UTF8Type.instance.decompose("Pavel")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key1" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(name, Operator.LIKE_MATCHES, UTF8Type.instance.decompose("JeAn")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key8" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(name, Operator.LIKE_MATCHES, UTF8Type.instance.decompose("claUde")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key8" }, rows.toArray(new String[rows.size()])));
 +
 +        rows = getIndexed(store, 10, buildExpression(name, Operator.EQ, UTF8Type.instance.decompose("Jean")));
 +        Assert.assertTrue(rows.isEmpty());
 +
 +        rows = getIndexed(store, 10, buildExpression(name, Operator.EQ, UTF8Type.instance.decompose("Jean-Claude")));
 +        Assert.assertTrue(rows.toString(), Arrays.equals(new String[] { "key8" }, rows.toArray(new String[rows.size()])));
 +    }
 +
 +    @Test
 +    public void testSettingIsLiteralOption()
 +    {
 +
 +        // special type which is UTF-8 but is only on the inside
 +        AbstractType<?> stringType = new AbstractType<String>(AbstractType.ComparisonType.CUSTOM)
 +        {
 +            public ByteBuffer fromString(String source) throws MarshalException
 +            {
 +                return UTF8Type.instance.fromString(source);
 +            }
 +
 +            public Term fromJSONObject(Object parsed) throws MarshalException
 +            {
 +                throw new UnsupportedOperationException();
 +            }
 +
 +            public TypeSerializer<String> getSerializer()
 +            {
 +                return UTF8Type.instance.getSerializer();
 +            }
 +
 +            public int compareCustom(ByteBuffer a, ByteBuffer b)
 +            {
 +                return UTF8Type.instance.compare(a, b);
 +            }
 +        };
 +
 +        // first let's check that we get 'false' for 'isLiteral' if we don't set the option with special comparator
 +        ColumnDefinition columnA = ColumnDefinition.regularDef(KS_NAME, CF_NAME, "special-A", stringType);
 +
 +        ColumnIndex indexA = new ColumnIndex(UTF8Type.instance, columnA, IndexMetadata.fromSchemaMetadata("special-index-A", IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
 +        {{
 +            put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, SASIIndex.class.getName());
 +        }}));
 +
 +        Assert.assertEquals(true,  indexA.isIndexed());
 +        Assert.assertEquals(false, indexA.isLiteral());
 +
 +        // now let's double-check that we do get 'true' when we set it
 +        ColumnDefinition columnB = ColumnDefinition.regularDef(KS_NAME, CF_NAME, "special-B", stringType);
 +
 +        ColumnIndex indexB = new ColumnIndex(UTF8Type.instance, columnB, IndexMetadata.fromSchemaMetadata("special-index-B", IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
 +        {{
 +            put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, SASIIndex.class.getName());
 +            put("is_literal", "true");
 +        }}));
 +
 +        Assert.assertEquals(true, indexB.isIndexed());
 +        Assert.assertEquals(true, indexB.isLiteral());
 +
 +        // and finally we should also get a 'true' if it's built-in UTF-8/ASCII comparator
 +        ColumnDefinition columnC = ColumnDefinition.regularDef(KS_NAME, CF_NAME, "special-C", UTF8Type.instance);
 +
 +        ColumnIndex indexC = new ColumnIndex(UTF8Type.instance, columnC, IndexMetadata.fromSchemaMetadata("special-index-C", IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
 +        {{
 +            put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, SASIIndex.class.getName());
 +        }}));
 +
 +        Assert.assertEquals(true, indexC.isIndexed());
 +        Assert.assertEquals(true, indexC.isLiteral());
 +
 +        ColumnDefinition columnD = ColumnDefinition.regularDef(KS_NAME, CF_NAME, "special-D", AsciiType.instance);
 +
 +        ColumnIndex indexD = new ColumnIndex(UTF8Type.instance, columnD, IndexMetadata.fromSchemaMetadata("special-index-D", IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
 +        {{
 +            put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, SASIIndex.class.getName());
 +        }}));
 +
 +        Assert.assertEquals(true, indexD.isIndexed());
 +        Assert.assertEquals(true, indexD.isLiteral());
 +
 +        // and option should supersedes the comparator type
 +        ColumnDefinition columnE = ColumnDefinition.regularDef(KS_NAME, CF_NAME, "special-E", UTF8Type.instance);
 +
 +        ColumnIndex indexE = new ColumnIndex(UTF8Type.instance, columnE, IndexMetadata.fromSchemaMetadata("special-index-E", IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
 +        {{
 +            put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, SASIIndex.class.getName());
 +            put("is_literal", "false");
 +        }}));
 +
 +        Assert.assertEquals(true,  indexE.isIndexed());
 +        Assert.assertEquals(false, indexE.isLiteral());
++
++        // test frozen-collection
++        ColumnDefinition columnF = ColumnDefinition.regularDef(KS_NAME,
++                                                               CF_NAME,
++                                                               "special-F",
++                                                               ListType.getInstance(UTF8Type.instance, false));
++
++        ColumnIndex indexF = new ColumnIndex(UTF8Type.instance, columnF, IndexMetadata.fromSchemaMetadata("special-index-F", IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
++        {{
++            put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, SASIIndex.class.getName());
++        }}));
++
++        Assert.assertEquals(true,  indexF.isIndexed());
++        Assert.assertEquals(false, indexF.isLiteral());
 +    }
 +
 +    @Test
 +    public void testClusteringIndexes() throws Exception
 +    {
 +        testClusteringIndexes(false);
 +        cleanupData();
 +        testClusteringIndexes(true);
 +    }
 +
 +    public void testClusteringIndexes(boolean forceFlush) throws Exception
 +    {
 +        ColumnFamilyStore store = Keyspace.open(KS_NAME).getColumnFamilyStore(CLUSTERING_CF_NAME_1);
 +
 +        executeCQL(CLUSTERING_CF_NA

<TRUNCATED>

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[8/8] cassandra git commit: Merge branch 'cassandra-3.11' into trunk

Posted by ad...@apache.org.
Merge branch 'cassandra-3.11' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cad94165
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cad94165
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cad94165

Branch: refs/heads/trunk
Commit: cad941653b59c4609ba2900cda86dc366d3ccd38
Parents: 6dfd11c 47a2839
Author: Andrés de la Peña <a....@gmail.com>
Authored: Tue Aug 8 15:27:15 2017 +0100
Committer: Andrés de la Peña <a....@gmail.com>
Committed: Tue Aug 8 15:27:15 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/rows/AbstractCell.java  |  2 +-
 .../org/apache/cassandra/db/rows/BTreeRow.java  |  2 +-
 .../apache/cassandra/schema/ColumnMetadata.java | 21 ++++++++++---
 .../apache/cassandra/tools/JsonTransformer.java | 32 +++++++++++++++++--
 .../org/apache/cassandra/cql3/ViewTest.java     | 33 ++++++++++++++++++++
 .../cassandra/index/sasi/SASIIndexTest.java     | 14 +++++++++
 7 files changed, 96 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/cad94165/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cad94165/src/java/org/apache/cassandra/db/rows/AbstractCell.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cad94165/src/java/org/apache/cassandra/db/rows/BTreeRow.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cad94165/src/java/org/apache/cassandra/schema/ColumnMetadata.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/schema/ColumnMetadata.java
index ea80708,0000000..3b55c03
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/ColumnMetadata.java
+++ b/src/java/org/apache/cassandra/schema/ColumnMetadata.java
@@@ -1,626 -1,0 +1,639 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.schema;
 +
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.function.Predicate;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.MoreObjects;
 +import com.google.common.base.Objects;
 +import com.google.common.collect.Collections2;
 +
 +import org.apache.cassandra.cql3.*;
 +import org.apache.cassandra.cql3.selection.Selectable;
 +import org.apache.cassandra.cql3.selection.Selector;
 +import org.apache.cassandra.cql3.selection.SimpleSelector;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.db.marshal.*;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.serializers.MarshalException;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.github.jamm.Unmetered;
 +
 +@Unmetered
 +public final class ColumnMetadata extends ColumnSpecification implements Selectable, Comparable<ColumnMetadata>
 +{
 +    public static final Comparator<Object> asymmetricColumnDataComparator =
 +        (a, b) -> ((ColumnData) a).column().compareTo((ColumnMetadata) b);
 +
 +    public static final int NO_POSITION = -1;
 +
 +    public enum ClusteringOrder
 +    {
 +        ASC, DESC, NONE
 +    }
 +
 +    /**
 +     * The type of CQL3 column this definition represents.
 +     * There is 4 main type of CQL3 columns: those parts of the partition key,
 +     * those parts of the clustering columns and amongst the others, regular and
 +     * static ones.
 +     *
 +     * IMPORTANT: this enum is serialized as toString() and deserialized by calling
 +     * Kind.valueOf(), so do not override toString() or rename existing values.
 +     */
 +    public enum Kind
 +    {
 +        // NOTE: if adding a new type, must modify comparisonOrder
 +        PARTITION_KEY,
 +        CLUSTERING,
 +        REGULAR,
 +        STATIC;
 +
 +        public boolean isPrimaryKeyKind()
 +        {
 +            return this == PARTITION_KEY || this == CLUSTERING;
 +        }
 +
 +    }
 +
 +    public final Kind kind;
 +
 +    /*
 +     * If the column is a partition key or clustering column, its position relative to
 +     * other columns of the same kind. Otherwise,  NO_POSITION (-1).
 +     *
 +     * Note that partition key and clustering columns are numbered separately so
 +     * the first clustering column is 0.
 +     */
 +    private final int position;
 +
 +    private final Comparator<CellPath> cellPathComparator;
 +    private final Comparator<Object> asymmetricCellPathComparator;
 +    private final Comparator<? super Cell> cellComparator;
 +
 +    private int hash;
 +
 +    /**
 +     * These objects are compared frequently, so we encode several of their comparison components
 +     * into a single long value so that this can be done efficiently
 +     */
 +    private final long comparisonOrder;
 +
 +    private static long comparisonOrder(Kind kind, boolean isComplex, long position, ColumnIdentifier name)
 +    {
 +        assert position >= 0 && position < 1 << 12;
 +        return   (((long) kind.ordinal()) << 61)
 +               | (isComplex ? 1L << 60 : 0)
 +               | (position << 48)
 +               | (name.prefixComparison >>> 16);
 +    }
 +
 +    public static ColumnMetadata partitionKeyColumn(TableMetadata table, ByteBuffer name, AbstractType<?> type, int position)
 +    {
 +        return new ColumnMetadata(table, name, type, position, Kind.PARTITION_KEY);
 +    }
 +
 +    public static ColumnMetadata partitionKeyColumn(String keyspace, String table, String name, AbstractType<?> type, int position)
 +    {
 +        return new ColumnMetadata(keyspace, table, ColumnIdentifier.getInterned(name, true), type, position, Kind.PARTITION_KEY);
 +    }
 +
 +    public static ColumnMetadata clusteringColumn(TableMetadata table, ByteBuffer name, AbstractType<?> type, int position)
 +    {
 +        return new ColumnMetadata(table, name, type, position, Kind.CLUSTERING);
 +    }
 +
 +    public static ColumnMetadata clusteringColumn(String keyspace, String table, String name, AbstractType<?> type, int position)
 +    {
 +        return new ColumnMetadata(keyspace, table, ColumnIdentifier.getInterned(name, true), type, position, Kind.CLUSTERING);
 +    }
 +
 +    public static ColumnMetadata regularColumn(TableMetadata table, ByteBuffer name, AbstractType<?> type)
 +    {
 +        return new ColumnMetadata(table, name, type, NO_POSITION, Kind.REGULAR);
 +    }
 +
 +    public static ColumnMetadata regularColumn(String keyspace, String table, String name, AbstractType<?> type)
 +    {
 +        return new ColumnMetadata(keyspace, table, ColumnIdentifier.getInterned(name, true), type, NO_POSITION, Kind.REGULAR);
 +    }
 +
 +    public static ColumnMetadata staticColumn(TableMetadata table, ByteBuffer name, AbstractType<?> type)
 +    {
 +        return new ColumnMetadata(table, name, type, NO_POSITION, Kind.STATIC);
 +    }
 +
 +    public static ColumnMetadata staticColumn(String keyspace, String table, String name, AbstractType<?> type)
 +    {
 +        return new ColumnMetadata(keyspace, table, ColumnIdentifier.getInterned(name, true), type, NO_POSITION, Kind.STATIC);
 +    }
 +
 +    public ColumnMetadata(TableMetadata table, ByteBuffer name, AbstractType<?> type, int position, Kind kind)
 +    {
 +        this(table.keyspace,
 +             table.name,
 +             ColumnIdentifier.getInterned(name, table.columnDefinitionNameComparator(kind)),
 +             type,
 +             position,
 +             kind);
 +    }
 +
 +    @VisibleForTesting
 +    public ColumnMetadata(String ksName,
 +                          String cfName,
 +                          ColumnIdentifier name,
 +                          AbstractType<?> type,
 +                          int position,
 +                          Kind kind)
 +    {
 +        super(ksName, cfName, name, type);
 +        assert name != null && type != null && kind != null;
 +        assert name.isInterned();
 +        assert (position == NO_POSITION) == !kind.isPrimaryKeyKind(); // The position really only make sense for partition and clustering columns (and those must have one),
 +                                                                      // so make sure we don't sneak it for something else since it'd breaks equals()
 +        this.kind = kind;
 +        this.position = position;
 +        this.cellPathComparator = makeCellPathComparator(kind, type);
 +        this.cellComparator = cellPathComparator == null ? ColumnData.comparator : (a, b) -> cellPathComparator.compare(a.path(), b.path());
 +        this.asymmetricCellPathComparator = cellPathComparator == null ? null : (a, b) -> cellPathComparator.compare(((Cell)a).path(), (CellPath) b);
 +        this.comparisonOrder = comparisonOrder(kind, isComplex(), Math.max(0, position), name);
 +    }
 +
 +    private static Comparator<CellPath> makeCellPathComparator(Kind kind, AbstractType<?> type)
 +    {
 +        if (kind.isPrimaryKeyKind() || !type.isMultiCell())
 +            return null;
 +
 +        AbstractType<?> nameComparator = type.isCollection()
 +                                       ? ((CollectionType) type).nameComparator()
 +                                       : ((UserType) type).nameComparator();
 +
 +
 +        return (path1, path2) ->
 +        {
 +            if (path1.size() == 0 || path2.size() == 0)
 +            {
 +                if (path1 == CellPath.BOTTOM)
 +                    return path2 == CellPath.BOTTOM ? 0 : -1;
 +                if (path1 == CellPath.TOP)
 +                    return path2 == CellPath.TOP ? 0 : 1;
 +                return path2 == CellPath.BOTTOM ? 1 : -1;
 +            }
 +
 +            // This will get more complicated once we have non-frozen UDT and nested collections
 +            assert path1.size() == 1 && path2.size() == 1;
 +            return nameComparator.compare(path1.get(0), path2.get(0));
 +        };
 +    }
 +
 +    public ColumnMetadata copy()
 +    {
 +        return new ColumnMetadata(ksName, cfName, name, type, position, kind);
 +    }
 +
 +    public ColumnMetadata withNewName(ColumnIdentifier newName)
 +    {
 +        return new ColumnMetadata(ksName, cfName, newName, type, position, kind);
 +    }
 +
 +    public ColumnMetadata withNewType(AbstractType<?> newType)
 +    {
 +        return new ColumnMetadata(ksName, cfName, name, newType, position, kind);
 +    }
 +
 +    public boolean isPartitionKey()
 +    {
 +        return kind == Kind.PARTITION_KEY;
 +    }
 +
 +    public boolean isClusteringColumn()
 +    {
 +        return kind == Kind.CLUSTERING;
 +    }
 +
 +    public boolean isStatic()
 +    {
 +        return kind == Kind.STATIC;
 +    }
 +
 +    public boolean isRegular()
 +    {
 +        return kind == Kind.REGULAR;
 +    }
 +
 +    public ClusteringOrder clusteringOrder()
 +    {
 +        if (!isClusteringColumn())
 +            return ClusteringOrder.NONE;
 +
 +        return type.isReversed() ? ClusteringOrder.DESC : ClusteringOrder.ASC;
 +    }
 +
 +    public int position()
 +    {
 +        return position;
 +    }
 +
 +    @Override
 +    public boolean equals(Object o)
 +    {
 +        if (this == o)
 +            return true;
 +
 +        if (!(o instanceof ColumnMetadata))
 +            return false;
 +
 +        ColumnMetadata cd = (ColumnMetadata) o;
 +
 +        return Objects.equal(ksName, cd.ksName)
 +            && Objects.equal(cfName, cd.cfName)
 +            && Objects.equal(name, cd.name)
 +            && Objects.equal(type, cd.type)
 +            && Objects.equal(kind, cd.kind)
 +            && Objects.equal(position, cd.position);
 +    }
 +
 +    @Override
 +    public int hashCode()
 +    {
 +        // This achieves the same as Objects.hashcode, but avoids the object array allocation
 +        // which features significantly in the allocation profile and caches the result.
 +        int result = hash;
 +        if (result == 0)
 +        {
 +            result = 31 + (ksName == null ? 0 : ksName.hashCode());
 +            result = 31 * result + (cfName == null ? 0 : cfName.hashCode());
 +            result = 31 * result + (name == null ? 0 : name.hashCode());
 +            result = 31 * result + (type == null ? 0 : type.hashCode());
 +            result = 31 * result + (kind == null ? 0 : kind.hashCode());
 +            result = 31 * result + position;
 +            hash = result;
 +        }
 +        return result;
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        return name.toString();
 +    }
 +
 +    public String debugString()
 +    {
 +        return MoreObjects.toStringHelper(this)
 +                          .add("name", name)
 +                          .add("type", type)
 +                          .add("kind", kind)
 +                          .add("position", position)
 +                          .toString();
 +    }
 +
 +    public boolean isPrimaryKeyColumn()
 +    {
 +        return kind.isPrimaryKeyKind();
 +    }
 +
 +    @Override
 +    public boolean selectColumns(Predicate<ColumnMetadata> predicate)
 +    {
 +        return predicate.test(this);
 +    }
 +
 +    @Override
 +    public boolean processesSelection()
 +    {
 +        return false;
 +    }
 +
 +    /**
 +     * Converts the specified column definitions into column identifiers.
 +     *
 +     * @param definitions the column definitions to convert.
 +     * @return the column identifiers corresponding to the specified definitions
 +     */
 +    public static Collection<ColumnIdentifier> toIdentifiers(Collection<ColumnMetadata> definitions)
 +    {
 +        return Collections2.transform(definitions, columnDef -> columnDef.name);
 +    }
 +
 +    public int compareTo(ColumnMetadata other)
 +    {
 +        if (this == other)
 +            return 0;
 +
 +        if (comparisonOrder != other.comparisonOrder)
 +            return Long.compare(comparisonOrder, other.comparisonOrder);
 +
 +        return this.name.compareTo(other.name);
 +    }
 +
 +    public Comparator<CellPath> cellPathComparator()
 +    {
 +        return cellPathComparator;
 +    }
 +
 +    public Comparator<Object> asymmetricCellPathComparator()
 +    {
 +        return asymmetricCellPathComparator;
 +    }
 +
 +    public Comparator<? super Cell> cellComparator()
 +    {
 +        return cellComparator;
 +    }
 +
 +    public boolean isComplex()
 +    {
 +        return cellPathComparator != null;
 +    }
 +
 +    public boolean isSimple()
 +    {
 +        return !isComplex();
 +    }
 +
 +    public CellPath.Serializer cellPathSerializer()
 +    {
 +        // Collections are our only complex so far, so keep it simple
 +        return CollectionType.cellPathSerializer;
 +    }
 +
 +    public void validateCell(Cell cell)
 +    {
 +        if (cell.isTombstone())
 +        {
 +            if (cell.value().hasRemaining())
 +                throw new MarshalException("A tombstone should not have a value");
 +            if (cell.path() != null)
 +                validateCellPath(cell.path());
 +        }
 +        else if(type.isUDT())
 +        {
 +            // To validate a non-frozen UDT field, both the path and the value
 +            // are needed, the path being an index into an array of value types.
 +            ((UserType)type).validateCell(cell);
 +        }
 +        else
 +        {
 +            type.validateCellValue(cell.value());
 +            if (cell.path() != null)
 +                validateCellPath(cell.path());
 +        }
 +    }
 +
 +    private void validateCellPath(CellPath path)
 +    {
 +        if (!isComplex())
 +            throw new MarshalException("Only complex cells should have a cell path");
 +
 +        assert type.isMultiCell();
 +        if (type.isCollection())
 +            ((CollectionType)type).nameComparator().validate(path.get(0));
 +        else
 +            ((UserType)type).nameComparator().validate(path.get(0));
 +    }
 +
 +    public static String toCQLString(Iterable<ColumnMetadata> defs)
 +    {
 +        return toCQLString(defs.iterator());
 +    }
 +
 +    public static String toCQLString(Iterator<ColumnMetadata> defs)
 +    {
 +        if (!defs.hasNext())
 +            return "";
 +
 +        StringBuilder sb = new StringBuilder();
 +        sb.append(defs.next().name);
 +        while (defs.hasNext())
 +            sb.append(", ").append(defs.next().name);
 +        return sb.toString();
 +    }
 +
 +    /**
 +     * The type of the cell values for cell belonging to this column.
 +     *
-      * This is the same than the column type, except for collections where it's the 'valueComparator'
++     * This is the same than the column type, except for non-frozen collections where it's the 'valueComparator'
 +     * of the collection.
++     * 
++     * This method should not be used to get value type of non-frozon UDT.
 +     */
 +    public AbstractType<?> cellValueType()
 +    {
-         return type instanceof CollectionType
-              ? ((CollectionType)type).valueComparator()
-              : type;
++        assert !(type instanceof UserType && type.isMultiCell());
++        return type instanceof CollectionType && type.isMultiCell()
++                ? ((CollectionType)type).valueComparator()
++                : type;
++    }
++
++    /**
++     * Check if column is counter type. For thrift, it checks collection's value type
++     */
++    public boolean isCounterColumn()
++    {
++        if (type instanceof CollectionType) // for thrift
++            return ((CollectionType) type).valueComparator().isCounter();
++        return type.isCounter();
 +    }
 +
 +    public Selector.Factory newSelectorFactory(TableMetadata table, AbstractType<?> expectedType, List<ColumnMetadata> defs, VariableSpecifications boundNames) throws InvalidRequestException
 +    {
 +        return SimpleSelector.newFactory(this, addAndGetIndex(this, defs));
 +    }
 +
 +    public AbstractType<?> getExactTypeIfKnown(String keyspace)
 +    {
 +        return type;
 +    }
 +
 +    /**
 +     * Because legacy-created tables may have a non-text comparator, we cannot determine the proper 'key' until
 +     * we know the comparator. ColumnMetadata.Raw is a placeholder that can be converted to a real ColumnIdentifier
 +     * once the comparator is known with prepare(). This should only be used with identifiers that are actual
 +     * column names. See CASSANDRA-8178 for more background.
 +     */
 +    public static abstract class Raw extends Selectable.Raw
 +    {
 +        /**
 +         * Creates a {@code ColumnMetadata.Raw} from an unquoted identifier string.
 +         */
 +        public static Raw forUnquoted(String text)
 +        {
 +            return new Literal(text, false);
 +        }
 +
 +        /**
 +         * Creates a {@code ColumnMetadata.Raw} from a quoted identifier string.
 +         */
 +        public static Raw forQuoted(String text)
 +        {
 +            return new Literal(text, true);
 +        }
 +
 +        /**
 +         * Creates a {@code ColumnMetadata.Raw} from a pre-existing {@code ColumnMetadata}
 +         * (useful in the rare cases where we already have the column but need
 +         * a {@code ColumnMetadata.Raw} for typing purposes).
 +         */
 +        public static Raw forColumn(ColumnMetadata column)
 +        {
 +            return new ForColumn(column);
 +        }
 +
 +        /**
 +         * Get the identifier corresponding to this raw column, without assuming this is an
 +         * existing column (unlike {@link Selectable.Raw#prepare}).
 +         */
 +        public abstract ColumnIdentifier getIdentifier(TableMetadata table);
 +
 +        public abstract String rawText();
 +
 +        @Override
 +        public abstract ColumnMetadata prepare(TableMetadata table);
 +
 +        @Override
 +        public final int hashCode()
 +        {
 +            return toString().hashCode();
 +        }
 +
 +        @Override
 +        public final boolean equals(Object o)
 +        {
 +            if(!(o instanceof Raw))
 +                return false;
 +
 +            Raw that = (Raw)o;
 +            return this.toString().equals(that.toString());
 +        }
 +
 +        private static class Literal extends Raw
 +        {
 +            private final String text;
 +
 +            public Literal(String rawText, boolean keepCase)
 +            {
 +                this.text =  keepCase ? rawText : rawText.toLowerCase(Locale.US);
 +            }
 +
 +            public ColumnIdentifier getIdentifier(TableMetadata table)
 +            {
 +                if (!table.isStaticCompactTable())
 +                    return ColumnIdentifier.getInterned(text, true);
 +
 +                AbstractType<?> columnNameType = table.staticCompactOrSuperTableColumnNameType();
 +                if (columnNameType instanceof UTF8Type)
 +                    return ColumnIdentifier.getInterned(text, true);
 +
 +                // We have a legacy-created table with a non-text comparator. Check if we have a matching column, otherwise assume we should use
 +                // columnNameType
 +                ByteBuffer bufferName = ByteBufferUtil.bytes(text);
 +                for (ColumnMetadata def : table.columns())
 +                {
 +                    if (def.name.bytes.equals(bufferName))
 +                        return def.name;
 +                }
 +                return ColumnIdentifier.getInterned(columnNameType, columnNameType.fromString(text), text);
 +            }
 +
 +            public ColumnMetadata prepare(TableMetadata table)
 +            {
 +                if (!table.isStaticCompactTable())
 +                    return find(table);
 +
 +                AbstractType<?> columnNameType = table.staticCompactOrSuperTableColumnNameType();
 +                if (columnNameType instanceof UTF8Type)
 +                    return find(table);
 +
 +                // We have a legacy-created table with a non-text comparator. Check if we have a match column, otherwise assume we should use
 +                // columnNameType
 +                ByteBuffer bufferName = ByteBufferUtil.bytes(text);
 +                for (ColumnMetadata def : table.columns())
 +                {
 +                    if (def.name.bytes.equals(bufferName))
 +                        return def;
 +                }
 +                return find(columnNameType.fromString(text), table);
 +            }
 +
 +            private ColumnMetadata find(TableMetadata table)
 +            {
 +                return find(ByteBufferUtil.bytes(text), table);
 +            }
 +
 +            private ColumnMetadata find(ByteBuffer id, TableMetadata table)
 +            {
 +                ColumnMetadata def = table.getColumn(id);
 +                if (def == null)
 +                    throw new InvalidRequestException(String.format("Undefined column name %s", toString()));
 +                return def;
 +            }
 +
 +            public String rawText()
 +            {
 +                return text;
 +            }
 +
 +            @Override
 +            public String toString()
 +            {
 +                return ColumnIdentifier.maybeQuote(text);
 +            }
 +        }
 +
 +        // Use internally in the rare case where we need a ColumnMetadata.Raw for type-checking but
 +        // actually already have the column itself.
 +        private static class ForColumn extends Raw
 +        {
 +            private final ColumnMetadata column;
 +
 +            private ForColumn(ColumnMetadata column)
 +            {
 +                this.column = column;
 +            }
 +
 +            public ColumnIdentifier getIdentifier(TableMetadata table)
 +            {
 +                return column.name;
 +            }
 +
 +            public ColumnMetadata prepare(TableMetadata table)
 +            {
 +                assert table.getColumn(column.name) != null; // Sanity check that we're not doing something crazy
 +                return column;
 +            }
 +
 +            public String rawText()
 +            {
 +                return column.name.toString();
 +            }
 +
 +            @Override
 +            public String toString()
 +            {
 +                return column.name.toCQLString();
 +            }
 +        }
 +    }
 +
 +
 +
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cad94165/src/java/org/apache/cassandra/tools/JsonTransformer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/JsonTransformer.java
index 17b7e5a,c3a0a17..e6aaf07
--- a/src/java/org/apache/cassandra/tools/JsonTransformer.java
+++ b/src/java/org/apache/cassandra/tools/JsonTransformer.java
@@@ -30,9 -30,8 +30,8 @@@ import java.util.List
  import java.util.concurrent.TimeUnit;
  import java.util.stream.Stream;
  
 -import org.apache.cassandra.config.CFMetaData;
 -import org.apache.cassandra.config.ColumnDefinition;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
- 
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.marshal.AbstractType;
  import org.apache.cassandra.db.marshal.CollectionType;
@@@ -47,8 -47,7 +47,9 @@@ import org.apache.cassandra.db.rows.Row
  import org.apache.cassandra.db.rows.Unfiltered;
  import org.apache.cassandra.db.rows.UnfilteredRowIterator;
  import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.schema.ColumnMetadata;
 +import org.apache.cassandra.schema.TableMetadata;
+ import org.apache.cassandra.transport.ProtocolVersion;
  import org.apache.cassandra.utils.ByteBufferUtil;
  import org.codehaus.jackson.JsonFactory;
  import org.codehaus.jackson.JsonGenerator;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cad94165/test/unit/org/apache/cassandra/cql3/ViewTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cad94165/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
index d622725,e9051b4..986e604
--- a/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
@@@ -1653,6 -1656,20 +1653,20 @@@ public class SASIIndexTes
  
          Assert.assertEquals(true,  indexE.isIndexed());
          Assert.assertEquals(false, indexE.isLiteral());
+ 
+         // test frozen-collection
 -        ColumnDefinition columnF = ColumnDefinition.regularDef(KS_NAME,
 -                                                               CF_NAME,
 -                                                               "special-F",
 -                                                               ListType.getInstance(UTF8Type.instance, false));
++        ColumnMetadata columnF = ColumnMetadata.regularColumn(KS_NAME,
++                                                              CF_NAME,
++                                                              "special-F",
++                                                              ListType.getInstance(UTF8Type.instance, false));
+ 
+         ColumnIndex indexF = new ColumnIndex(UTF8Type.instance, columnF, IndexMetadata.fromSchemaMetadata("special-index-F", IndexMetadata.Kind.CUSTOM, new HashMap<String, String>()
+         {{
+             put(IndexTarget.CUSTOM_INDEX_OPTION_NAME, SASIIndex.class.getName());
+         }}));
+ 
+         Assert.assertEquals(true,  indexF.isIndexed());
+         Assert.assertEquals(false, indexF.isLiteral());
      }
  
      @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org