You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2019/01/17 12:15:55 UTC

[cassandra] branch cassandra-2.1 updated: Special case page handling for DISTINCT queries

This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cassandra-2.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-2.1 by this push:
     new dd228d4  Special case page handling for DISTINCT queries
dd228d4 is described below

commit dd228d4581b020fb2fb788858481c81357d7fa72
Author: Sam Tunnicliffe <sa...@beobal.com>
AuthorDate: Thu Jan 3 19:03:11 2019 +0000

    Special case page handling for DISTINCT queries
    
    Simplify the removal of the first row in a paged range slice page
    with DISTINCT if that row was already returned at the end of the
    previous page
    
    Patch by Sam Tunnicliffe and Marcus Eriksson; reviewed by Sam Tunnicliffe
    and Marcus Eriksson for CASSANDRA-14956
    
    Co-authored-by: Sam Tunnicliffe <sa...@beobal.com>
    Co-authored-by: Marcus Eriksson <ma...@apache.org>
---
 CHANGES.txt                                        |   1 +
 .../service/pager/AbstractQueryPager.java          |   2 +-
 .../service/pager/RangeSliceQueryPager.java        |  23 +++
 .../unit/org/apache/cassandra/cql3/PagingTest.java | 160 +++++++++++++++++++++
 4 files changed, 185 insertions(+), 1 deletion(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 7b88686..3582d4f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.21
+ * Paged Range Slice queries with DISTINCT can drop rows from results (CASSANDRA-14956)
  * Update release checksum algorithms to SHA-256, SHA-512 (CASSANDRA-14970)
  * Check checksum before decompressing data (CASSANDRA-14284)
  * CVE-2017-5929 Security vulnerability in Logback warning in NEWS.txt (CASSANDRA-14183)
diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index 8bbf6d6..445a507 100644
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@ -207,7 +207,7 @@ abstract class AbstractQueryPager implements QueryPager
 
     protected abstract boolean isReversed();
 
-    private List<Row> discardFirst(List<Row> rows)
+    protected List<Row> discardFirst(List<Row> rows)
     {
         return discardFirst(rows, 1);
     }
diff --git a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
index 3ac777e..a37db02 100644
--- a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.service.pager;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.cassandra.config.CFMetaData;
@@ -114,6 +115,28 @@ public class RangeSliceQueryPager extends AbstractQueryPager
                 && firstCell.name().isSameCQL3RowAs(metadata.comparator, lastReturnedName);
     }
 
+    protected List<Row> discardFirst(List<Row> rows)
+    {
+        if (rows.isEmpty())
+            return rows;
+
+        // Special case for distinct queries because the superclass' discardFirst keeps dropping cells
+        // until it has removed the first *live* row. In a distinct query we only fetch the first row
+        // from a given partition, which may be entirely non-live. In the case where such a non-live
+        // row is the last in page N & the first in page N+1, we would also end up discarding an
+        // additional live row from page N+1.
+        // The simplest solution is to just remove whichever row is first in the page, without bothering
+        // to do liveness checks etc.
+        if (isDistinct())
+        {
+            List<Row> newRows = new ArrayList<>(Math.max(1, rows.size() - 1));
+            newRows.addAll(rows.subList(1, rows.size()));
+            return newRows;
+        }
+
+        return super.discardFirst(rows);
+    }
+
     private boolean isDistinct()
     {
         // As this pager is never used for Thrift queries, checking the countCQL3Rows is enough.
diff --git a/test/unit/org/apache/cassandra/cql3/PagingTest.java b/test/unit/org/apache/cassandra/cql3/PagingTest.java
new file mode 100644
index 0000000..531ddde
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/PagingTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import java.net.InetAddress;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+import org.apache.cassandra.dht.LongToken;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.locator.AbstractEndpointSnitch;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.service.EmbeddedCassandraService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static junit.framework.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+
+
+public class PagingTest
+{
+    private static Cluster cluster;
+    private static Session session;
+
+    private static final String KEYSPACE = "paging_test";
+    private static final String createKsStatement = "CREATE KEYSPACE " + KEYSPACE +
+                                                    " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2 };";
+
+    private static final String dropKsStatement = "DROP KEYSPACE IF EXISTS " + KEYSPACE;
+
+    @BeforeClass
+    public static void setup() throws Exception
+    {
+        DatabaseDescriptor.setPartitioner(new Murmur3Partitioner());
+        EmbeddedCassandraService cassandra = new EmbeddedCassandraService();
+        cassandra.start();
+
+        // Currently the native server start method return before the server is fully binded to the socket, so we need
+        // to wait slightly before trying to connect to it. We should fix this but in the meantime using a sleep.
+        Thread.sleep(500);
+
+        cluster = Cluster.builder().addContactPoint("127.0.0.1")
+                                   .withPort(DatabaseDescriptor.getNativeTransportPort())
+                                   .build();
+        session = cluster.connect();
+
+        session.execute(dropKsStatement);
+        session.execute(createKsStatement);
+    }
+
+    @AfterClass
+    public static void tearDown()
+    {
+        cluster.close();
+    }
+
+    /**
+     * Makes sure that we don't drop any live rows when paging with DISTINCT queries
+     *
+     * * We need to have more rows than fetch_size
+     * * The node must have a token within the first page (so that the range gets split up in StorageProxy#getRestrictedRanges)
+     *   - This means that the second read in the second range will read back too many rows
+     * * The extra rows are dropped (so that we only return fetch_size rows to client)
+     * * This means that the last row recorded in AbstractQueryPager#recordLast is a non-live one
+     * * For the next page, the first row returned will be the same non-live row as above
+     * * The bug in CASSANDRA-14956 caused us to drop that non-live row + the first live row in the next page
+     */
+    @Test
+    public void testPaging() throws InterruptedException
+    {
+        String table = KEYSPACE + ".paging";
+        String createTableStatement = "CREATE TABLE IF NOT EXISTS " + table + " (id int, id2 int, id3 int, val text, PRIMARY KEY ((id, id2), id3));";
+        String dropTableStatement = "DROP TABLE IF EXISTS " + table + ';';
+
+        // custom snitch to avoid merging ranges back together after StorageProxy#getRestrictedRanges splits them up
+        IEndpointSnitch snitch = new AbstractEndpointSnitch()
+        {
+            private IEndpointSnitch oldSnitch = DatabaseDescriptor.getEndpointSnitch();
+            public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+            {
+                return oldSnitch.compareEndpoints(target, a1, a2);
+            }
+
+            public String getRack(InetAddress endpoint)
+            {
+                return oldSnitch.getRack(endpoint);
+            }
+
+            public String getDatacenter(InetAddress endpoint)
+            {
+                return oldSnitch.getDatacenter(endpoint);
+            }
+
+            @Override
+            public boolean isWorthMergingForRangeQuery(List<InetAddress> merged, List<InetAddress> l1, List<InetAddress> l2)
+            {
+                return false;
+            }
+        };
+        DatabaseDescriptor.setEndpointSnitch(snitch);
+        StorageService.instance.getTokenMetadata().clearUnsafe();
+        StorageService.instance.getTokenMetadata().updateNormalToken(new LongToken(5097162189738624638L), FBUtilities.getBroadcastAddress());
+        session.execute(createTableStatement);
+
+        for (int i = 0; i < 110; i++)
+        {
+            // removing row with idx 10 causes the last row in the first page read to be empty
+            String ttlClause = i == 10 ? "USING TTL 1" : "";
+            session.execute(String.format("INSERT INTO %s (id, id2, id3, val) VALUES (%d, %d, %d, '%d') %s", table, i, i, i, i, ttlClause));
+        }
+        Thread.sleep(1500);
+
+        Statement stmt = new SimpleStatement(String.format("SELECT DISTINCT token(id, id2), id, id2 FROM %s", table));
+        stmt.setFetchSize(100);
+        ResultSet res = session.execute(stmt);
+        stmt.setFetchSize(200);
+        ResultSet res2 = session.execute(stmt);
+
+        Iterator<Row> iter1 = res.iterator();
+        Iterator<Row> iter2 = res2.iterator();
+
+        while (iter1.hasNext() && iter2.hasNext())
+        {
+            Row row1 = iter1.next();
+            Row row2 = iter2.next();
+            assertEquals(row1.getInt("id"), row2.getInt("id"));
+        }
+        assertFalse(iter1.hasNext());
+        assertFalse(iter2.hasNext());
+        session.execute(dropTableStatement);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org