You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pr@cassandra.apache.org by GitBox <gi...@apache.org> on 2020/07/07 11:06:07 UTC

[GitHub] [cassandra] adelapena commented on a change in pull request #659: CASSANDRA-15907 Operational Improvements & Hardening for Replica Filtering Protection

adelapena commented on a change in pull request #659:
URL: https://github.com/apache/cassandra/pull/659#discussion_r450783936



##########
File path: test/distributed/org/apache/cassandra/distributed/test/ReplicaFilteringProtectionTest.java
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.CQLStatement;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.impl.RowUtil;
+import org.apache.cassandra.exceptions.TooManyCachedRowsException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.transport.Server;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Exercises the functionality of {@link org.apache.cassandra.service.ReplicaFilteringProtection}, the
+ * mechanism that ensures distributed index and filtering queries at read consistency levels > ONE/LOCAL_ONE
+ * avoid stale replica results.
+ */
+public class ReplicaFilteringProtectionTest extends TestBaseImpl
+{
+    private static final int REPLICAS = 2;
+    private static final int ROWS = 3;
+
+    private static Cluster cluster;
+
+    @BeforeClass
+    public static void setup() throws IOException
+    {
+        cluster = init(Cluster.build()
+                              .withNodes(REPLICAS)
+                              .withConfig(config -> config.set("hinted_handoff_enabled", false)
+                                                          .set("commitlog_sync", "batch")
+                                                          .set("num_tokens", 1)).start());
+    }
+
+    @AfterClass
+    public static void teardown()
+    {
+        cluster.close();
+    }
+
+    @Test
+    public void testMissedUpdatesBelowCachingWarnThreshold()
+    {
+        String tableName = "missed_updates_no_warning";
+        String fullTableName = KEYSPACE + '.' + tableName;
+        cluster.schemaChange(withKeyspace("CREATE TABLE %s." + tableName + " (k int PRIMARY KEY, v text)"));
+
+        // The warning threshold provided is one more than the total number of rows returned
+        // to the coordinator from all replicas and therefore should not be triggered.
+        testMissedUpdates(fullTableName, REPLICAS * ROWS + 1, Integer.MAX_VALUE, false);
+    }
+
+    @Test
+    public void testMissedUpdatesAboveCachingWarnThreshold()
+    {
+        String tableName = "missed_updates_cache_warn";
+        String fullTableName = KEYSPACE + '.' + tableName;
+        cluster.schemaChange(withKeyspace("CREATE TABLE %s." + tableName + " (k int PRIMARY KEY, v text)"));
+
+        // The warning threshold provided is one less than the total number of rows returned
+        // to the coordinator from all replicas and therefore should be triggered but not fail the query.
+        testMissedUpdates(fullTableName, REPLICAS * ROWS - 1, Integer.MAX_VALUE, true);
+    }
+
+    @Test
+    public void testMissedUpdatesAroundCachingFailThreshold()
+    {
+        String tableName = "missed_updates_cache_fail";
+        String fullTableName = KEYSPACE + '.' + tableName;
+        cluster.schemaChange(withKeyspace("CREATE TABLE %s." + tableName + " (k int PRIMARY KEY, v text)"));
+
+        // The failure threshold provided is exactly the total number of rows returned
+        // to the coordinator from all replicas and therefore should just warn.
+        testMissedUpdates(fullTableName, 1, REPLICAS * ROWS, true);
+
+        try
+        {
+            // The failure threshold provided is one less than the total number of rows returned
+            // to the coordinator from all replicas and therefore should fail the query.
+            testMissedUpdates(fullTableName, 1, REPLICAS * ROWS - 1, true);
+        }
+        catch (RuntimeException e)
+        {
+            assertEquals(e.getCause().getClass().getName(), TooManyCachedRowsException.class.getName());
+        }
+    }
+
+    private void testMissedUpdates(String fullTableName, int warnThreshold, int failThreshold, boolean shouldWarn)
+    {
+        cluster.get(1).runOnInstance(() -> StorageService.instance.setCachedReplicaRowsWarnThreshold(warnThreshold));
+        cluster.get(1).runOnInstance(() -> StorageService.instance.setCachedReplicaRowsFailThreshold(failThreshold));
+
+        for (int i = 0; i < ROWS; i++)
+        {
+            cluster.coordinator(1).execute("INSERT INTO " + fullTableName + "(k, v) VALUES (?, 'old')", ALL, i);
+        }
+
+        String query = "SELECT * FROM " + fullTableName + " WHERE v = ? LIMIT ? ALLOW FILTERING";
+
+        Object[][] initialRows = cluster.coordinator(1).execute(query, ALL, "old", ROWS);
+        assertRows(initialRows, row(1, "old"), row(0, "old"), row(2, "old"));
+
+        // Update all rows on only one replica, leaving the entire dataset of the remaining replica out-of-date:
+        for (int i = 0; i < ROWS; i++)
+        {
+            cluster.get(1).executeInternal("UPDATE " + fullTableName + " SET v = 'new' WHERE k = ?", i);
+        }
+
+        // TODO: These should be able to use ICoordinator#executeWithResult() once CASSANDRA-15920 is resolved.
+        Object[] oldResponse = cluster.get(1).callOnInstance(() -> executeInternal(query, "old", ROWS));
+        Object[][] oldRows = (Object[][]) oldResponse[0];
+        assertRows(oldRows);
+        @SuppressWarnings("unchecked") List<String> oldWarnings = (List<String>) oldResponse[1];
+        assertEquals(shouldWarn, oldWarnings.stream().anyMatch(w -> w.contains("cached_replica_rows_warn_threshold")));

Review comment:
       Sure, we can rely on the thresholds. For `testMissedUpdatesBelowCachingWarnThreshold` in isolation we don't have that implicit verification, but we know that the test scenario is properly created because the other tests work.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org