You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2021/04/23 18:42:00 UTC

[cassandra] branch cassandra-3.0 updated: Fix materialized view builders inserting truncated data

This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new ca40e77  Fix materialized view builders inserting truncated data
ca40e77 is described below

commit ca40e770a5bbdeed69700ae498866f940782864b
Author: Andrés de la Peña <a....@gmail.com>
AuthorDate: Fri Apr 23 19:10:12 2021 +0100

    Fix materialized view builders inserting truncated data
    
    patch by Andrés de la Peña; reviewed by Ekaterina Dimitrova for CASSANDRA-16567
---
 CHANGES.txt                                        |  1 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |  5 ++
 .../org/apache/cassandra/db/view/TableViews.java   | 10 +++
 src/java/org/apache/cassandra/db/view/View.java    | 19 +++--
 .../org/apache/cassandra/db/view/ViewBuilder.java  | 23 ++++--
 test/unit/org/apache/cassandra/cql3/ViewTest.java  | 84 +++++++++++++++++++---
 6 files changed, 119 insertions(+), 23 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 12fd27a..4cba62d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.25:
+ * Fix materialized view builders inserting truncated data (CASSANDRA-16567)
  * Don't wait for schema migrations from removed nodes (CASSANDRA-16577)
  * Scheduled (delayed) schema pull tasks should not run after MIGRATION stage shutdown during decommission (CASSANDRA-16495)
  * Ignore trailing zeros in hint files (CASSANDRA-16523)
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 47623e8..405aec7 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2054,6 +2054,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         // position in the System keyspace.
         logger.trace("truncating {}", name);
 
+        viewManager.stopBuild();
+
         final long truncatedAt;
         final ReplayPosition replayAfter;
 
@@ -2101,6 +2103,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         };
 
         runWithCompactionsDisabled(Executors.callable(truncateRunnable), true, true);
+
+        viewManager.build();
+
         logger.trace("truncate complete");
     }
 
diff --git a/src/java/org/apache/cassandra/db/view/TableViews.java b/src/java/org/apache/cassandra/db/view/TableViews.java
index d2d4a45..34d112f 100644
--- a/src/java/org/apache/cassandra/db/view/TableViews.java
+++ b/src/java/org/apache/cassandra/db/view/TableViews.java
@@ -83,6 +83,16 @@ public class TableViews extends AbstractCollection<View>
         return Iterables.transform(views, view -> keyspace.getColumnFamilyStore(view.getDefinition().viewName));
     }
 
+    public void build()
+    {
+        views.forEach(View::build);
+    }
+
+    public void stopBuild()
+    {
+        views.forEach(View::stopBuild);
+    }
+
     public void forceBlockingFlush()
     {
         for (ColumnFamilyStore viewCfs : allViewsCfs())
diff --git a/src/java/org/apache/cassandra/db/view/View.java b/src/java/org/apache/cassandra/db/view/View.java
index 9716dc4..b5cd387 100644
--- a/src/java/org/apache/cassandra/db/view/View.java
+++ b/src/java/org/apache/cassandra/db/view/View.java
@@ -206,15 +206,20 @@ public class View
 
     public synchronized void build()
     {
-        if (this.builder != null)
+        stopBuild();
+        builder = new ViewBuilder(baseCfs, this);
+        CompactionManager.instance.submitViewBuilder(builder);
+    }
+
+    synchronized void stopBuild()
+    {
+        if (builder != null)
         {
-            logger.debug("Stopping current view builder due to schema change");
-            this.builder.stop();
-            this.builder = null;
+            logger.debug("Stopping current view builder due to schema change or truncate");
+            builder.stop();
+            builder.waitForCompletion();
+            builder = null;
         }
-
-        this.builder = new ViewBuilder(baseCfs, this);
-        CompactionManager.instance.submitViewBuilder(builder);
     }
 
     @Nullable
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index 57bba29..ca60da3 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -22,15 +22,11 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-
-import javax.annotation.Nullable;
 
 import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,8 +45,6 @@ import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.pager.QueryPager;
-import org.apache.cassandra.transport.Server;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.UUIDGen;
@@ -61,6 +55,7 @@ public class ViewBuilder extends CompactionInfo.Holder
     private final ColumnFamilyStore baseCfs;
     private final View view;
     private final UUID compactionId;
+    private final CountDownLatch completed = new CountDownLatch(1);
     private volatile Token prevToken = null;
 
     private static final Logger logger = LoggerFactory.getLogger(ViewBuilder.class);
@@ -109,6 +104,7 @@ public class ViewBuilder extends CompactionInfo.Holder
         if (SystemKeyspace.isViewBuilt(ksname, viewName))
         {
             logger.debug("View already marked built for {}.{}", baseCfs.metadata.ksName, view.name);
+            completed.countDown();
             return;
         }
         Iterable<Range<Token>> ranges = StorageService.instance.getLocalRanges(baseCfs.metadata.ksName);
@@ -184,6 +180,7 @@ public class ViewBuilder extends CompactionInfo.Holder
                                                          TimeUnit.MINUTES);
             logger.warn("Materialized View failed to complete, sleeping 5 minutes before restarting", e);
         }
+        completed.countDown();
     }
 
     public CompactionInfo getCompactionInfo()
@@ -209,4 +206,16 @@ public class ViewBuilder extends CompactionInfo.Holder
     {
         return false;
     }
+
+    void waitForCompletion()
+    {
+        try
+        {
+            completed.await();
+        }
+        catch (InterruptedException ie)
+        {
+            throw new AssertionError(ie);
+        }
+    }
 }
diff --git a/test/unit/org/apache/cassandra/cql3/ViewTest.java b/test/unit/org/apache/cassandra/cql3/ViewTest.java
index 0d49e4b..877218e 100644
--- a/test/unit/org/apache/cassandra/cql3/ViewTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java
@@ -18,12 +18,10 @@
 
 package org.apache.cassandra.cql3;
 
-import static org.junit.Assert.*;
-
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
-import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -33,11 +31,13 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.exceptions.InvalidQueryException;
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
 import org.apache.cassandra.concurrent.SEPExecutor;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
@@ -48,16 +48,26 @@ import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.marshal.AsciiType;
-import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.FBUtilities;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
 
+import static junit.framework.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+@RunWith(BMUnitRunner.class)
 public class ViewTest extends CQLTester
 {
+    /** Latch used by {@link #testTruncateWhileBuilding()} Byteman injections. */
+    @SuppressWarnings("unused")
+    private static final CountDownLatch blockViewBuild = new CountDownLatch(1);
+
     int protocolVersion = 4;
     private final List<String> views = new ArrayList<>();
 
@@ -90,11 +100,13 @@ public class ViewTest extends CQLTester
     private void updateView(String query, Object... params) throws Throwable
     {
         executeNet(protocolVersion, query, params);
-        while (!(((SEPExecutor) StageManager.getStage(Stage.VIEW_MUTATION)).getPendingTasks() == 0
-                && ((SEPExecutor) StageManager.getStage(Stage.VIEW_MUTATION)).getActiveCount() == 0))
-        {
-            Thread.sleep(1);
-        }
+        waitForViewMutations();
+    }
+
+    private void waitForViewMutations()
+    {
+        SEPExecutor executor = (SEPExecutor) StageManager.getStage(Stage.VIEW_MUTATION);
+        Util.spinAssertEquals(0L, () -> executor.getPendingTasks() + executor.getActiveCount(), 60);
     }
 
     @Test
@@ -1408,4 +1420,58 @@ public class ViewTest extends CQLTester
         }
     }
 
+    /**
+     * Tests that truncating a table stops the ongoing builds of its materialized views,
+     * so they don't write into the MV data that has been truncated in the base table.
+     *
+     * See CASSANDRA-16567 for further details.
+     */
+    @Test
+    @BMRules(rules = {
+    @BMRule(name = "Block view builder",
+    targetClass = "ViewBuilder",
+    targetMethod = "buildKey",
+    action = "com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly" +
+             "(org.apache.cassandra.cql3.ViewTest.blockViewBuild);"),
+    @BMRule(name = "Unblock view builder",
+    targetClass = "ColumnFamilyStore",
+    targetMethod = "truncateBlocking",
+    action = "org.apache.cassandra.cql3.ViewTest.blockViewBuild.countDown();")
+    })
+    public void testTruncateWhileBuilding() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY(k, c))");
+        execute("USE " + keyspace());
+        executeNet(protocolVersion, "USE " + keyspace());
+        execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 0, 0, 0);
+        createView("mv",
+                   "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s " +
+                   "WHERE k IS NOT NULL AND c IS NOT NULL AND v IS NOT NULL " +
+                   "PRIMARY KEY (v, c, k)");
+
+        // check that the delayed view builder is either running or pending,
+        // and that it hasn't written anything yet
+        assertThat(runningCompactions()).isPositive();
+        assertFalse(SystemKeyspace.isViewBuilt(KEYSPACE, "mv"));
+        waitForViewMutations();
+        assertRows(execute("SELECT * FROM mv"));
+
+        // truncate the view, this should unblock the view builder, wait for its cancellation,
+        // drop the sstables and, finally, start a new view build
+        updateView("TRUNCATE %s");
+
+        // check that there aren't any rows after truncating
+        assertRows(execute("SELECT * FROM mv"));
+
+        // check that the view builder finishes and that the view is still empty after that
+        Util.spinAssertEquals(0, ViewTest::runningCompactions, 60);
+        assertTrue(SystemKeyspace.isViewBuilt(KEYSPACE, "mv"));
+        waitForViewMutations();
+        assertRows(execute("SELECT * FROM mv"));
+    }
+
+    private static int runningCompactions()
+    {
+        return CompactionManager.instance.getPendingTasks() + CompactionManager.instance.getActiveCompactions();
+    }
 }
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org