You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2019/01/17 12:15:55 UTC
[cassandra] branch cassandra-2.1 updated: Special case page
handling for DISTINCT queries
This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch cassandra-2.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-2.1 by this push:
new dd228d4 Special case page handling for DISTINCT queries
dd228d4 is described below
commit dd228d4581b020fb2fb788858481c81357d7fa72
Author: Sam Tunnicliffe <sa...@beobal.com>
AuthorDate: Thu Jan 3 19:03:11 2019 +0000
Special case page handling for DISTINCT queries
Simplify the removal of the first row in a paged range slice page
with DISTINCT if that row was already returned at the end of the
previous page
Patch by Sam Tunnicliffe and Marcus Eriksson; reviewed by Sam Tunnicliffe
and Marcus Eriksson for CASSANDRA-14956
Co-authored-by: Sam Tunnicliffe <sa...@beobal.com>
Co-authored-by: Marcus Eriksson <ma...@apache.org>
---
CHANGES.txt | 1 +
.../service/pager/AbstractQueryPager.java | 2 +-
.../service/pager/RangeSliceQueryPager.java | 23 +++
.../unit/org/apache/cassandra/cql3/PagingTest.java | 160 +++++++++++++++++++++
4 files changed, 185 insertions(+), 1 deletion(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 7b88686..3582d4f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.21
+ * Paged Range Slice queries with DISTINCT can drop rows from results (CASSANDRA-14956)
* Update release checksum algorithms to SHA-256, SHA-512 (CASSANDRA-14970)
* Check checksum before decompressing data (CASSANDRA-14284)
* CVE-2017-5929 Security vulnerability in Logback warning in NEWS.txt (CASSANDRA-14183)
diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index 8bbf6d6..445a507 100644
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@ -207,7 +207,7 @@ abstract class AbstractQueryPager implements QueryPager
protected abstract boolean isReversed();
- private List<Row> discardFirst(List<Row> rows)
+ protected List<Row> discardFirst(List<Row> rows)
{
return discardFirst(rows, 1);
}
diff --git a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
index 3ac777e..a37db02 100644
--- a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.service.pager;
+import java.util.ArrayList;
import java.util.List;
import org.apache.cassandra.config.CFMetaData;
@@ -114,6 +115,28 @@ public class RangeSliceQueryPager extends AbstractQueryPager
&& firstCell.name().isSameCQL3RowAs(metadata.comparator, lastReturnedName);
}
+ protected List<Row> discardFirst(List<Row> rows)
+ {
+ if (rows.isEmpty())
+ return rows;
+
+ // Special case for distinct queries because the superclass' discardFirst keeps dropping cells
+ // until it has removed the first *live* row. In a distinct query we only fetch the first row
+ // from a given partition, which may be entirely non-live. In the case where such a non-live
+ // row is the last in page N & the first in page N+1, we would also end up discarding an
+ // additional live row from page N+1.
+ // The simplest solution is to just remove whichever row is first in the page, without bothering
+ // to do liveness checks etc.
+ if (isDistinct())
+ {
+ List<Row> newRows = new ArrayList<>(Math.max(1, rows.size() - 1));
+ newRows.addAll(rows.subList(1, rows.size()));
+ return newRows;
+ }
+
+ return super.discardFirst(rows);
+ }
+
private boolean isDistinct()
{
// As this pager is never used for Thrift queries, checking the countCQL3Rows is enough.
diff --git a/test/unit/org/apache/cassandra/cql3/PagingTest.java b/test/unit/org/apache/cassandra/cql3/PagingTest.java
new file mode 100644
index 0000000..531ddde
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/PagingTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.cql3;
+
+import java.net.InetAddress;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+import org.apache.cassandra.dht.LongToken;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.locator.AbstractEndpointSnitch;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.service.EmbeddedCassandraService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static junit.framework.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+
+
+public class PagingTest
+{
+ private static Cluster cluster;
+ private static Session session;
+
+ private static final String KEYSPACE = "paging_test";
+ private static final String createKsStatement = "CREATE KEYSPACE " + KEYSPACE +
+ " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2 };";
+
+ private static final String dropKsStatement = "DROP KEYSPACE IF EXISTS " + KEYSPACE;
+
+ @BeforeClass
+ public static void setup() throws Exception
+ {
+ DatabaseDescriptor.setPartitioner(new Murmur3Partitioner());
+ EmbeddedCassandraService cassandra = new EmbeddedCassandraService();
+ cassandra.start();
+
+ // Currently the native server start method return before the server is fully binded to the socket, so we need
+ // to wait slightly before trying to connect to it. We should fix this but in the meantime using a sleep.
+ Thread.sleep(500);
+
+ cluster = Cluster.builder().addContactPoint("127.0.0.1")
+ .withPort(DatabaseDescriptor.getNativeTransportPort())
+ .build();
+ session = cluster.connect();
+
+ session.execute(dropKsStatement);
+ session.execute(createKsStatement);
+ }
+
+ @AfterClass
+ public static void tearDown()
+ {
+ cluster.close();
+ }
+
+ /**
+ * Makes sure that we don't drop any live rows when paging with DISTINCT queries
+ *
+ * * We need to have more rows than fetch_size
+ * * The node must have a token within the first page (so that the range gets split up in StorageProxy#getRestrictedRanges)
+ * - This means that the second read in the second range will read back too many rows
+ * * The extra rows are dropped (so that we only return fetch_size rows to client)
+ * * This means that the last row recorded in AbstractQueryPager#recordLast is a non-live one
+ * * For the next page, the first row returned will be the same non-live row as above
+ * * The bug in CASSANDRA-14956 caused us to drop that non-live row + the first live row in the next page
+ */
+ @Test
+ public void testPaging() throws InterruptedException
+ {
+ String table = KEYSPACE + ".paging";
+ String createTableStatement = "CREATE TABLE IF NOT EXISTS " + table + " (id int, id2 int, id3 int, val text, PRIMARY KEY ((id, id2), id3));";
+ String dropTableStatement = "DROP TABLE IF EXISTS " + table + ';';
+
+ // custom snitch to avoid merging ranges back together after StorageProxy#getRestrictedRanges splits them up
+ IEndpointSnitch snitch = new AbstractEndpointSnitch()
+ {
+ private IEndpointSnitch oldSnitch = DatabaseDescriptor.getEndpointSnitch();
+ public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+ {
+ return oldSnitch.compareEndpoints(target, a1, a2);
+ }
+
+ public String getRack(InetAddress endpoint)
+ {
+ return oldSnitch.getRack(endpoint);
+ }
+
+ public String getDatacenter(InetAddress endpoint)
+ {
+ return oldSnitch.getDatacenter(endpoint);
+ }
+
+ @Override
+ public boolean isWorthMergingForRangeQuery(List<InetAddress> merged, List<InetAddress> l1, List<InetAddress> l2)
+ {
+ return false;
+ }
+ };
+ DatabaseDescriptor.setEndpointSnitch(snitch);
+ StorageService.instance.getTokenMetadata().clearUnsafe();
+ StorageService.instance.getTokenMetadata().updateNormalToken(new LongToken(5097162189738624638L), FBUtilities.getBroadcastAddress());
+ session.execute(createTableStatement);
+
+ for (int i = 0; i < 110; i++)
+ {
+ // removing row with idx 10 causes the last row in the first page read to be empty
+ String ttlClause = i == 10 ? "USING TTL 1" : "";
+ session.execute(String.format("INSERT INTO %s (id, id2, id3, val) VALUES (%d, %d, %d, '%d') %s", table, i, i, i, i, ttlClause));
+ }
+ Thread.sleep(1500);
+
+ Statement stmt = new SimpleStatement(String.format("SELECT DISTINCT token(id, id2), id, id2 FROM %s", table));
+ stmt.setFetchSize(100);
+ ResultSet res = session.execute(stmt);
+ stmt.setFetchSize(200);
+ ResultSet res2 = session.execute(stmt);
+
+ Iterator<Row> iter1 = res.iterator();
+ Iterator<Row> iter2 = res2.iterator();
+
+ while (iter1.hasNext() && iter2.hasNext())
+ {
+ Row row1 = iter1.next();
+ Row row2 = iter2.next();
+ assertEquals(row1.getInt("id"), row2.getInt("id"));
+ }
+ assertFalse(iter1.hasNext());
+ assertFalse(iter2.hasNext());
+ session.execute(dropTableStatement);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org