You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2015/02/10 22:07:46 UTC

cassandra git commit: Fix multicolumn relations with indexes on some clustering cols

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 28c380c96 -> 9649594c7


Fix multicolumn relations with indexes on some clustering cols

Patch by Benjamin Lerer; reviewed by Tyler Hobbs for CASSANDRA-8275


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9649594c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9649594c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9649594c

Branch: refs/heads/cassandra-2.0
Commit: 9649594c761dbb72e58ddd71a10f0794378337ca
Parents: 28c380c
Author: blerer <b_...@hotmail.com>
Authored: Tue Feb 10 15:07:02 2015 -0600
Committer: Tyler Hobbs <ty...@apache.org>
Committed: Tue Feb 10 15:07:02 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../cql3/statements/SelectStatement.java        |  46 ++++--
 .../cassandra/cql3/MultiColumnRelationTest.java | 122 ++++++++++++++++
 .../cql3/SingleColumnRelationTest.java          | 145 +++++++++++++++++++
 4 files changed, 303 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9649594c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fa9c77d..861730f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 2.0.13:
+ * Fix some multi-column relations with indexes on some clustering
+   columns (CASSANDRA-8275)
  * Fix IllegalArgumentException in dynamic snitch (CASSANDRA-8448)
  * Add support for UPDATE ... IF EXISTS (CASSANDRA-8610)
  * Fix reversal of list prepends (CASSANDRA-8733)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9649594c/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 19615b6..2fa57b9 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -35,7 +35,6 @@ import org.apache.cassandra.cql3.CFDefinition.Name.Kind;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.dht.*;
@@ -83,8 +82,10 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
     /** Restrictions on non-primary key columns (i.e. secondary index restrictions) */
     private final Map<CFDefinition.Name, Restriction> metadataRestrictions = new HashMap<CFDefinition.Name, Restriction>();
 
-    // The name of all restricted names not covered by the key or index filter
-    private final Set<CFDefinition.Name> restrictedNames = new HashSet<CFDefinition.Name>();
+    // The map keys are the name of the columns that must be converted into IndexExpressions if a secondary index need
+    // to be used. The value specify if the column has an index that can be used to for the relation in which the column
+    // is specified.
+    private final Map<CFDefinition.Name, Boolean> restrictedNames = new HashMap<CFDefinition.Name, Boolean>();
     private Restriction.Slice sliceRestriction;
 
     private boolean isReversed;
@@ -1027,7 +1028,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             return Collections.emptyList();
 
         List<IndexExpression> expressions = new ArrayList<IndexExpression>();
-        for (CFDefinition.Name name : restrictedNames)
+        for (CFDefinition.Name name : restrictedNames.keySet())
         {
             Restriction restriction;
             switch (name.kind)
@@ -1068,12 +1069,21 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             }
             else
             {
-                List<ByteBuffer> values = restriction.values(variables);
+                ByteBuffer value;
+                if (restriction.isMultiColumn())
+                {
+                    List<ByteBuffer> values = restriction.values(variables);
+                    value = values.get(name.position);
+                }
+                else
+                {
+                    List<ByteBuffer> values = restriction.values(variables);
+                    if (values.size() != 1)
+                        throw new InvalidRequestException("IN restrictions are not supported on indexed columns");
 
-                if (values.size() != 1)
-                    throw new InvalidRequestException("IN restrictions are not supported on indexed columns");
+                    value = values.get(0);
+                }
 
-                ByteBuffer value = values.get(0);
                 validateIndexExpressionValue(value, name);
                 expressions.add(new IndexExpression(name.name.key, IndexOperator.EQ, value));
             }
@@ -1496,7 +1506,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             // All (or none) of the partition key columns have been specified;
             // hence there is no need to turn these restrictions into index expressions.
             if (!stmt.usesSecondaryIndexing)
-                stmt.restrictedNames.removeAll(cfDef.partitionKeys());
+                stmt.restrictedNames.keySet().removeAll(cfDef.partitionKeys());
 
             if (stmt.selectsOnlyStaticColumns && stmt.hasClusteringColumnsRestriction())
                 throw new InvalidRequestException("Cannot restrict clustering columns when selecting only static columns");
@@ -1507,8 +1517,17 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             if (stmt.isKeyRange && hasQueriableClusteringColumnIndex)
                 stmt.usesSecondaryIndexing = true;
 
-            if (!stmt.usesSecondaryIndexing)
-                stmt.restrictedNames.removeAll(cfDef.clusteringColumns());
+            // The clustering columns that can be used to perform a slice filtering on the secondary index do not
+            // need to be converted into IndexExpressions. Therefore, if they are not indexed by an index that support
+            // the relation in which they have been specified, we can removes them from the restrictedNames map.
+            for (Name clusteringColumn : cfDef.clusteringColumns())
+            {
+                Boolean indexed = stmt.restrictedNames.get(clusteringColumn);
+                if (indexed == null)
+                    break;
+                if (!indexed)
+                    stmt.restrictedNames.remove(clusteringColumn);
+            }
 
             // Even if usesSecondaryIndexing is false at this point, we'll still have to use one if
             // there is restrictions not covered by the PK.
@@ -1540,9 +1559,12 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             if (name == null)
                 handleUnrecognizedEntity(entity, relation);
 
-            stmt.restrictedNames.add(name);
             if (cfDef.cfm.getColumnDefinition(name.name.key).isIndexed() && relation.operator() == Relation.Type.EQ)
+            {
+                stmt.restrictedNames.put(name, Boolean.TRUE);
                 return new boolean[]{true, name.kind == CFDefinition.Name.Kind.COLUMN_ALIAS};
+            }
+            stmt.restrictedNames.put(name, Boolean.FALSE);
             return new boolean[]{false, false};
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9649594c/test/unit/org/apache/cassandra/cql3/MultiColumnRelationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/MultiColumnRelationTest.java b/test/unit/org/apache/cassandra/cql3/MultiColumnRelationTest.java
index ea4f1a6..30a9226 100644
--- a/test/unit/org/apache/cassandra/cql3/MultiColumnRelationTest.java
+++ b/test/unit/org/apache/cassandra/cql3/MultiColumnRelationTest.java
@@ -76,6 +76,15 @@ public class MultiColumnRelationTest
                     "CREATE TABLE IF NOT EXISTS %s.multiple_clustering_reversed" + tableSuffix +
                         "(a int, b int, c int, d int, PRIMARY KEY (a, b, c, d)) WITH " + compactOption + " CLUSTERING ORDER BY (b DESC, c ASC, d DESC)");
         }
+
+        executeSchemaChange("CREATE TABLE IF NOT EXISTS %s.multiple_clustering_with_indices (a int, b int, c int, d int, e int, PRIMARY KEY (a, b, c, d))");
+        executeSchemaChange("CREATE INDEX ON %s.multiple_clustering_with_indices (b)");
+        executeSchemaChange("CREATE INDEX ON %s.multiple_clustering_with_indices (e)");
+
+        executeSchemaChange("CREATE TABLE IF NOT EXISTS %s.partition_with_indices (a int, b int, c int, d int, e int, f int, PRIMARY KEY ((a, b), c, d, e))");
+        executeSchemaChange("CREATE INDEX ON %s.partition_with_indices (c)");
+        executeSchemaChange("CREATE INDEX ON %s.partition_with_indices (f)");
+
         clientState = ClientState.forInternalCalls();
     }
 
@@ -1178,6 +1187,119 @@ public class MultiColumnRelationTest
         }
     }
 
+    @Test
+    public void testMultipleClusteringWithIndex() throws Throwable
+    {
+        execute("INSERT INTO %s.multiple_clustering_with_indices (a, b, c, d, e) VALUES (0, 0, 0, 0, 0)");
+        execute("INSERT INTO %s.multiple_clustering_with_indices (a, b, c, d, e) VALUES (0, 0, 1, 0, 1)");
+        execute("INSERT INTO %s.multiple_clustering_with_indices (a, b, c, d, e) VALUES (0, 0, 1, 1, 2)");
+        execute("INSERT INTO %s.multiple_clustering_with_indices (a, b, c, d, e) VALUES (0, 1, 0, 0, 0)");
+        execute("INSERT INTO %s.multiple_clustering_with_indices (a, b, c, d, e) VALUES (0, 1, 1, 0, 1)");
+        execute("INSERT INTO %s.multiple_clustering_with_indices (a, b, c, d, e) VALUES (0, 1, 1, 1, 2)");
+        execute("INSERT INTO %s.multiple_clustering_with_indices (a, b, c, d, e) VALUES (0, 2, 0, 0, 0)");
+
+        UntypedResultSet results = execute("SELECT * FROM %s.multiple_clustering_with_indices WHERE (b) = (1)");
+        assertEquals(3, results.size());
+        checkRow(0, results, 0, 1, 0, 0, 0);
+        checkRow(1, results, 0, 1, 1, 0, 1);
+        checkRow(2, results, 0, 1, 1, 1, 2);
+
+        results = execute("SELECT * FROM %s.multiple_clustering_with_indices  WHERE (b, c) = (1, 1) ALLOW FILTERING");
+        assertEquals(2, results.size());
+        checkRow(0, results, 0, 1, 1, 0, 1);
+        checkRow(1, results, 0, 1, 1, 1, 2);
+
+        results = execute("SELECT * FROM %s.multiple_clustering_with_indices  WHERE (b, c) = (1, 1) AND e = 2 ALLOW FILTERING");
+        assertEquals(1, results.size());
+        checkRow(0, results, 0, 1, 1, 1, 2);
+
+        results = execute("SELECT * FROM %s.multiple_clustering_with_indices  WHERE (b) IN ((1)) AND e = 2 ALLOW FILTERING");
+        assertEquals(1, results.size());
+        checkRow(0, results, 0, 1, 1, 1, 2);
+
+        results = execute("SELECT * FROM %s.multiple_clustering_with_indices  WHERE (b) IN ((0), (1)) AND e = 2 ALLOW FILTERING");
+        assertEquals(2, results.size());
+        checkRow(0, results, 0, 0, 1, 1, 2);
+        checkRow(1, results, 0, 1, 1, 1, 2);
+
+        results = execute("SELECT * FROM %s.multiple_clustering_with_indices  WHERE (b, c) IN ((0, 1)) AND e = 2 ALLOW FILTERING");
+        assertEquals(1, results.size());
+        checkRow(0, results, 0, 0, 1, 1, 2);
+
+        results = execute("SELECT * FROM %s.multiple_clustering_with_indices  WHERE (b, c) IN ((0, 1), (1, 1)) AND e = 2 ALLOW FILTERING");
+        assertEquals(2, results.size());
+        checkRow(0, results, 0, 0, 1, 1, 2);
+        checkRow(1, results, 0, 1, 1, 1, 2);
+
+        results = execute("SELECT * FROM %s.multiple_clustering_with_indices  WHERE (b) >= (1) AND e = 2 ALLOW FILTERING");
+        assertEquals(1, results.size());
+        checkRow(0, results, 0, 1, 1, 1, 2);
+
+        results = execute("SELECT * FROM %s.multiple_clustering_with_indices  WHERE (b, c) >= (1, 1) AND e = 2 ALLOW FILTERING");
+        assertEquals(1, results.size());
+        checkRow(0, results, 0, 1, 1, 1, 2);
+    }
+
+    @Test
+    public void testPartitionWithIndex() throws Throwable
+    {
+        execute("INSERT INTO %s.partition_with_indices (a, b, c, d, e, f) VALUES (0, 0, 0, 0, 0, 0)");
+        execute("INSERT INTO %s.partition_with_indices (a, b, c, d, e, f) VALUES (0, 0, 0, 1, 0, 1)");
+        execute("INSERT INTO %s.partition_with_indices (a, b, c, d, e, f) VALUES (0, 0, 0, 1, 1, 2)");
+
+        execute("INSERT INTO %s.partition_with_indices (a, b, c, d, e, f) VALUES (0, 0, 1, 0, 0, 3)");
+        execute("INSERT INTO %s.partition_with_indices (a, b, c, d, e, f) VALUES (0, 0, 1, 1, 0, 4)");
+        execute("INSERT INTO %s.partition_with_indices (a, b, c, d, e, f) VALUES (0, 0, 1, 1, 1, 5)");
+
+        execute("INSERT INTO %s.partition_with_indices (a, b, c, d, e, f) VALUES (0, 0, 2, 0, 0, 5)");
+
+        UntypedResultSet results = execute("SELECT * FROM %s.partition_with_indices WHERE a = 0 AND (c) = (1) ALLOW FILTERING");
+        assertEquals(3, results.size());
+        checkRow(0, results, 0, 0, 1, 0, 0, 3);
+        checkRow(1, results, 0, 0, 1, 1, 0, 4);
+        checkRow(2, results, 0, 0, 1, 1, 1, 5);
+
+        results = execute("SELECT * FROM %s.partition_with_indices WHERE a = 0 AND (c, d) = (1, 1) ALLOW FILTERING");
+        assertEquals(2, results.size());
+        checkRow(0, results, 0, 0, 1, 1, 0, 4);
+        checkRow(1, results, 0, 0, 1, 1, 1, 5);
+
+        results = execute("SELECT * FROM %s.partition_with_indices WHERE a = 0  AND (c) IN ((1)) AND f = 5 ALLOW FILTERING");
+        assertEquals(1, results.size());
+        checkRow(0, results, 0, 0, 1, 1, 1, 5);
+
+        results = execute("SELECT * FROM %s.partition_with_indices WHERE a = 0 AND (c) IN ((1), (2)) AND f = 5 ALLOW FILTERING");
+        assertEquals(2, results.size());
+        checkRow(0, results, 0, 0, 1, 1, 1, 5);
+        checkRow(1, results, 0, 0, 2, 0, 0, 5);
+
+        results = execute("SELECT * FROM %s.partition_with_indices WHERE a = 0  AND (c, d) IN ((1, 0)) AND f = 3 ALLOW FILTERING");
+        assertEquals(1, results.size());
+        checkRow(0, results, 0, 0, 1, 0, 0, 3);
+
+        results = execute("SELECT * FROM %s.partition_with_indices WHERE a = 0 AND (c) >= (1) AND f = 5 ALLOW FILTERING");
+        assertEquals(2, results.size());
+        checkRow(0, results, 0, 0, 1, 1, 1, 5);
+        checkRow(1, results, 0, 0, 2, 0, 0, 5);
+
+        results = execute("SELECT * FROM %s.partition_with_indices WHERE a = 0 AND (c, d) >= (1, 1) AND f = 5 ALLOW FILTERING");
+        assertEquals(2, results.size());
+        checkRow(0, results, 0, 0, 1, 1, 1, 5);
+        checkRow(1, results, 0, 0, 2, 0, 0, 5);
+    }
+
+    @Test(expected=InvalidRequestException.class)
+    public void testMissingPartitionComponentWithInRestrictionOnIndexedColumn() throws Throwable
+    {
+        execute("SELECT * FROM %s.partition_with_indices WHERE a = 0 AND (c, d) IN ((1, 1)) ALLOW FILTERING");
+    }
+
+    @Test(expected=InvalidRequestException.class)
+    public void testMissingPartitionComponentWithSliceRestrictionOnIndexedColumn() throws Throwable
+    {
+        execute("SELECT * FROM %s.partition_with_indices WHERE a = 0 AND (c, d) >= (1, 1) ALLOW FILTERING");
+    }
+
     @Test(expected=InvalidRequestException.class)
     public void testPrepareLiteralInWithShortTuple() throws Throwable
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9649594c/test/unit/org/apache/cassandra/cql3/SingleColumnRelationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/SingleColumnRelationTest.java b/test/unit/org/apache/cassandra/cql3/SingleColumnRelationTest.java
new file mode 100644
index 0000000..34d3bf1
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/SingleColumnRelationTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.service.ClientState;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static org.apache.cassandra.cql3.QueryProcessor.process;
+import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
+import static org.junit.Assert.assertEquals;
+
+public class SingleColumnRelationTest
+{
+    static ClientState clientState;
+    static String keyspace = "single_column_relation_test";
+
+    @BeforeClass
+    public static void setUpClass() throws Throwable
+    {
+        SchemaLoader.loadSchema();
+        executeSchemaChange("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}");
+
+        executeSchemaChange("CREATE TABLE IF NOT EXISTS %s.partition_with_indices (a int, b int, c int, d int, e int, f int, PRIMARY KEY ((a, b), c, d, e))");
+        executeSchemaChange("CREATE INDEX ON %s.partition_with_indices (c)");
+        executeSchemaChange("CREATE INDEX ON %s.partition_with_indices (f)");
+
+        clientState = ClientState.forInternalCalls();
+    }
+
+    @AfterClass
+    public static void stopGossiper()
+    {
+        Gossiper.instance.stop();
+    }
+
+    private static void executeSchemaChange(String query) throws Throwable
+    {
+        try
+        {
+            process(String.format(query, keyspace), ConsistencyLevel.ONE);
+        } catch (RuntimeException exc)
+        {
+            throw exc.getCause();
+        }
+    }
+
+    private static UntypedResultSet execute(String query) throws Throwable
+    {
+        try
+        {
+            return processInternal(String.format(query, keyspace));
+        } catch (RuntimeException exc)
+        {
+            if (exc.getCause() != null)
+                throw exc.getCause();
+            throw exc;
+        }
+    }
+
+    @Test
+    public void testPartitionWithIndex() throws Throwable
+    {
+        execute("INSERT INTO %s.partition_with_indices (a, b, c, d, e, f) VALUES (0, 0, 0, 0, 0, 0)");
+        execute("INSERT INTO %s.partition_with_indices (a, b, c, d, e, f) VALUES (0, 0, 0, 1, 0, 1)");
+        execute("INSERT INTO %s.partition_with_indices (a, b, c, d, e, f) VALUES (0, 0, 0, 1, 1, 2)");
+
+        execute("INSERT INTO %s.partition_with_indices (a, b, c, d, e, f) VALUES (0, 0, 1, 0, 0, 3)");
+        execute("INSERT INTO %s.partition_with_indices (a, b, c, d, e, f) VALUES (0, 0, 1, 1, 0, 4)");
+        execute("INSERT INTO %s.partition_with_indices (a, b, c, d, e, f) VALUES (0, 0, 1, 1, 1, 5)");
+
+        execute("INSERT INTO %s.partition_with_indices (a, b, c, d, e, f) VALUES (0, 0, 2, 0, 0, 5)");
+
+        UntypedResultSet results = execute("SELECT * FROM %s.partition_with_indices WHERE a = 0 AND c = 1 ALLOW FILTERING");
+        assertEquals(3, results.size());
+        checkRow(0, results, 0, 0, 1, 0, 0, 3);
+        checkRow(1, results, 0, 0, 1, 1, 0, 4);
+        checkRow(2, results, 0, 0, 1, 1, 1, 5);
+
+        results = execute("SELECT * FROM %s.partition_with_indices WHERE a = 0 AND c = 1 AND d = 1 ALLOW FILTERING");
+        assertEquals(2, results.size());
+        checkRow(0, results, 0, 0, 1, 1, 0, 4);
+        checkRow(1, results, 0, 0, 1, 1, 1, 5);
+
+        results = execute("SELECT * FROM %s.partition_with_indices WHERE a = 0 AND c >= 1 AND f = 5 ALLOW FILTERING");
+        assertEquals(2, results.size());
+        checkRow(0, results, 0, 0, 1, 1, 1, 5);
+        checkRow(1, results, 0, 0, 2, 0, 0, 5);
+
+        results = execute("SELECT * FROM %s.partition_with_indices WHERE a = 0 AND c = 1 AND d >= 1 AND f = 5 ALLOW FILTERING");
+        assertEquals(1, results.size());
+        checkRow(0, results, 0, 0, 1, 1, 1, 5);
+    }
+
+    @Test(expected=InvalidRequestException.class)
+    public void testMissingPartitionComponentAndFileringOnTheSecondClusteringColumnWithoutAllowFiltering() throws Throwable
+    {
+        execute("SELECT * FROM %s.partition_with_indices WHERE d >= 1 AND f = 5");
+    }
+
+    @Test(expected=InvalidRequestException.class)
+    public void testMissingPartitionComponentWithSliceRestrictionOnIndexedColumn() throws Throwable
+    {
+        execute("SELECT * FROM %s.partition_with_indices WHERE a = 0 AND c >= 1 ALLOW FILTERING");
+    }
+
+    private static void checkRow(int rowIndex, UntypedResultSet results, Integer... expectedValues)
+    {
+        List<UntypedResultSet.Row> rows = newArrayList(results.iterator());
+        UntypedResultSet.Row row = rows.get(rowIndex);
+        Iterator<ColumnSpecification> columns = row.getColumns().iterator();
+        for (Integer expected : expectedValues)
+        {
+            String columnName = columns.next().name.toString();
+            int actual = row.getInt(columnName);
+            assertEquals(String.format("Expected value %d for column %s in row %d, but got %s", actual, columnName, rowIndex, expected),
+                         (long) expected, actual);
+        }
+    }
+}