You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/01/29 10:09:49 UTC

git commit: Fix paging discardFirst IllgalArgumentException

Updated Branches:
  refs/heads/cassandra-2.0 479bf8a11 -> 55211bca7


Fix paging discardFirst IllgalArgumentException

patch by slebresne; reviewed by thobbs for CASSANDRA-6555


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/55211bca
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/55211bca
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/55211bca

Branch: refs/heads/cassandra-2.0
Commit: 55211bca799cba0b9bb443c5111102216d416591
Parents: 479bf8a
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jan 29 10:08:08 2014 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jan 29 10:08:08 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../service/pager/AbstractQueryPager.java       |  43 +++--
 .../service/pager/AbstractQueryPagerTest.java   | 184 +++++++++++++++++++
 3 files changed, 216 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/55211bca/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ac75587..c626d37 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@
  * Add support for 2.1 global counter shards (CASSANDRA-6505)
  * Fix NPE when streaming connection is not yet established (CASSANDRA-6210)
  * Avoid rare duplicate read repair triggering (CASSANDRA-6606)
+ * Fix paging discardFirst (CASSANDRA-6555)
 Merged from 1.2:
  * fsync compression metadata (CASSANDRA-6531)
  * Validate CF existence on execution for prepared statement (CASSANDRA-6535)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/55211bca/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index ba4d8f2..297a85f 100644
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@ -23,6 +23,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Iterator;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
@@ -52,16 +54,27 @@ abstract class AbstractQueryPager implements QueryPager
                                  IDiskAtomFilter columnFilter,
                                  long timestamp)
     {
+        this(consistencyLevel, toFetch, localQuery, Schema.instance.getCFMetaData(keyspace, columnFamily), columnFilter, timestamp);
+    }
+
+    protected AbstractQueryPager(ConsistencyLevel consistencyLevel,
+                                 int toFetch,
+                                 boolean localQuery,
+                                 CFMetaData cfm,
+                                 IDiskAtomFilter columnFilter,
+                                 long timestamp)
+    {
         this.consistencyLevel = consistencyLevel;
         this.localQuery = localQuery;
 
-        this.cfm = Schema.instance.getCFMetaData(keyspace, columnFamily);
+        this.cfm = cfm;
         this.columnFilter = columnFilter;
         this.timestamp = timestamp;
 
         this.remaining = toFetch;
     }
 
+
     public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException
     {
         if (isExhausted())
@@ -178,9 +191,10 @@ abstract class AbstractQueryPager implements QueryPager
         return discardFirst(rows, 1);
     }
 
-    private List<Row> discardFirst(List<Row> rows, int toDiscard)
+    @VisibleForTesting
+    List<Row> discardFirst(List<Row> rows, int toDiscard)
     {
-        if (toDiscard == 0)
+        if (toDiscard == 0 || rows.isEmpty())
             return rows;
 
         int i = 0;
@@ -197,12 +211,14 @@ abstract class AbstractQueryPager implements QueryPager
         }
 
         // If there is less live data than to discard, all is discarded
-        if (toDiscard > 0 && i >= rows.size())
+        if (toDiscard > 0)
             return Collections.<Row>emptyList();
 
+        // i is the index of the first row that we are sure to keep. On top of that,
+        // we also keep firstCf is it hasn't been fully emptied by the last iteration above.
         int count = firstCf.getColumnCount();
-        int newSize = rows.size() - i;
-        List<Row> newRows = new ArrayList<Row>(count == 0 ? newSize-1 : newSize);
+        int newSize = rows.size() - (count == 0 ? i : i - 1);
+        List<Row> newRows = new ArrayList<Row>(newSize);
         if (count != 0)
             newRows.add(new Row(firstKey, firstCf));
         newRows.addAll(rows.subList(i, rows.size()));
@@ -215,9 +231,10 @@ abstract class AbstractQueryPager implements QueryPager
         return discardLast(rows, 1);
     }
 
-    private List<Row> discardLast(List<Row> rows, int toDiscard)
+    @VisibleForTesting
+    List<Row> discardLast(List<Row> rows, int toDiscard)
     {
-        if (toDiscard == 0)
+        if (toDiscard == 0 || rows.isEmpty())
             return rows;
 
         int i = rows.size()-1;
@@ -234,13 +251,15 @@ abstract class AbstractQueryPager implements QueryPager
         }
 
         // If there is less live data than to discard, all is discarded
-        if (toDiscard > 0 && i < 0)
+        if (toDiscard > 0)
             return Collections.<Row>emptyList();
 
+        // i is the index of the last row that we are sure to keep. On top of that,
+        // we also keep lastCf is it hasn't been fully emptied by the last iteration above.
         int count = lastCf.getColumnCount();
-        int newSize = i+1;
-        List<Row> newRows = new ArrayList<Row>(count == 0 ? newSize-1 : newSize);
-        newRows.addAll(rows.subList(0, i));
+        int newSize = count == 0 ? i+1 : i+2;
+        List<Row> newRows = new ArrayList<Row>(newSize);
+        newRows.addAll(rows.subList(0, i+1));
         if (count != 0)
             newRows.add(new Row(lastKey, lastCf));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/55211bca/test/unit/org/apache/cassandra/service/pager/AbstractQueryPagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/pager/AbstractQueryPagerTest.java b/test/unit/org/apache/cassandra/service/pager/AbstractQueryPagerTest.java
new file mode 100644
index 0000000..5467ec0
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/pager/AbstractQueryPagerTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.pager;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.*;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnCounter;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class AbstractQueryPagerTest
+{
+    @Test
+    public void DiscardFirstTest()
+    {
+        TestPager pager = new TestPager();
+        List<Row> rows = Arrays.asList(createRow("r1", 1),
+                                       createRow("r2", 3),
+                                       createRow("r3", 2));
+
+        assertEquals(3, rows.size());
+        assertRow(rows.get(0), "r1", 0);
+        assertRow(rows.get(1), "r2", 0, 1, 2);
+        assertRow(rows.get(2), "r3", 0, 1);
+
+        rows = pager.discardFirst(rows, 1);
+
+        assertEquals(2, rows.size());
+        assertRow(rows.get(0), "r2", 0, 1, 2);
+        assertRow(rows.get(1), "r3", 0, 1);
+
+        rows = pager.discardFirst(rows, 1);
+
+        assertEquals(2, rows.size());
+        assertRow(rows.get(0), "r2", 1, 2);
+        assertRow(rows.get(1), "r3", 0, 1);
+
+        rows = pager.discardFirst(rows, 3);
+
+        assertEquals(1, rows.size());
+        assertRow(rows.get(0), "r3", 1);
+
+        rows = pager.discardFirst(rows, 1);
+
+        assertTrue(rows.isEmpty());
+    }
+
+    @Test
+    public void DiscardLastTest()
+    {
+        TestPager pager = new TestPager();
+        List<Row> rows = Arrays.asList(createRow("r1", 2),
+                                       createRow("r2", 3),
+                                       createRow("r3", 1));
+
+        assertEquals(3, rows.size());
+        assertRow(rows.get(0), "r1", 0, 1);
+        assertRow(rows.get(1), "r2", 0, 1, 2);
+        assertRow(rows.get(2), "r3", 0);
+
+        rows = pager.discardLast(rows, 1);
+
+        assertEquals(2, rows.size());
+        assertRow(rows.get(0), "r1", 0, 1);
+        assertRow(rows.get(1), "r2", 0, 1, 2);
+
+        rows = pager.discardLast(rows, 1);
+
+        assertEquals(2, rows.size());
+        assertRow(rows.get(0), "r1", 0, 1);
+        assertRow(rows.get(1), "r2", 0, 1);
+
+        rows = pager.discardLast(rows, 3);
+
+        assertEquals(1, rows.size());
+        assertRow(rows.get(0), "r1", 0);
+
+        rows = pager.discardLast(rows, 1);
+
+        assertTrue(rows.isEmpty());
+    }
+
+    private void assertRow(Row row, String name, int... values)
+    {
+        assertEquals(row.key.key, ByteBufferUtil.bytes(name));
+        assertEquals(values.length, row.cf.getColumnCount());
+
+        int i = 0;
+        for (Column c : row.cf)
+            assertEquals(values[i++], i(c.name()));
+    }
+
+    private Row createRow(String name, int nbCol)
+    {
+        return new Row(Util.dk(name), createCF(nbCol));
+    }
+
+    private ColumnFamily createCF(int nbCol)
+    {
+        ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(createMetadata());
+        for (int i = 0; i < nbCol; i++)
+            cf.addColumn(bb(i), bb(i), 0);
+        return cf;
+    }
+
+    private CFMetaData createMetadata()
+    {
+        return new CFMetaData("ks", "cf", ColumnFamilyType.Standard, Int32Type.instance);
+    }
+
+    private static ByteBuffer bb(int i)
+    {
+        return ByteBufferUtil.bytes(i);
+    }
+
+    private static int i(ByteBuffer bb)
+    {
+        return ByteBufferUtil.toInt(bb);
+    }
+
+    private static class TestPager extends AbstractQueryPager
+    {
+        public TestPager()
+        {
+            // We use this to test more thorougly DiscardFirst and DiscardLast (more generic pager behavior is tested in
+            // QueryPagerTest). The only thing those method use is the result of the columnCounter() method. So to keep
+            // it simple, we fake all actual parameters in the ctor below but just override the columnCounter() method.
+            super(null, 0, false, null, null, 0);
+        }
+
+        @Override
+        public ColumnCounter columnCounter()
+        {
+            return new ColumnCounter(0);
+        }
+
+        public PagingState state()
+        {
+            return null;
+        }
+
+        protected List<Row> queryNextPage(int pageSize, ConsistencyLevel consistency, boolean localQuery)
+        {
+            return null;
+        }
+
+        protected boolean containsPreviousLast(Row first)
+        {
+            return false;
+        }
+
+        protected boolean recordLast(Row last)
+        {
+            return false;
+        }
+
+        protected boolean isReversed()
+        {
+            return false;
+        }
+    }
+}