You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ad...@apache.org on 2021/04/23 18:42:00 UTC
[cassandra] branch cassandra-3.0 updated: Fix materialized view
builders inserting truncated data
This is an automated email from the ASF dual-hosted git repository.
adelapena pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
new ca40e77 Fix materialized view builders inserting truncated data
ca40e77 is described below
commit ca40e770a5bbdeed69700ae498866f940782864b
Author: Andrés de la Peña <a....@gmail.com>
AuthorDate: Fri Apr 23 19:10:12 2021 +0100
Fix materialized view builders inserting truncated data
patch by Andrés de la Peña; reviewed by Ekaterina Dimitrova for CASSANDRA-16567
---
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ColumnFamilyStore.java | 5 ++
.../org/apache/cassandra/db/view/TableViews.java | 10 +++
src/java/org/apache/cassandra/db/view/View.java | 19 +++--
.../org/apache/cassandra/db/view/ViewBuilder.java | 23 ++++--
test/unit/org/apache/cassandra/cql3/ViewTest.java | 84 +++++++++++++++++++---
6 files changed, 119 insertions(+), 23 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 12fd27a..4cba62d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.25:
+ * Fix materialized view builders inserting truncated data (CASSANDRA-16567)
* Don't wait for schema migrations from removed nodes (CASSANDRA-16577)
* Scheduled (delayed) schema pull tasks should not run after MIGRATION stage shutdown during decommission (CASSANDRA-16495)
* Ignore trailing zeros in hint files (CASSANDRA-16523)
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 47623e8..405aec7 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2054,6 +2054,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
// position in the System keyspace.
logger.trace("truncating {}", name);
+ viewManager.stopBuild();
+
final long truncatedAt;
final ReplayPosition replayAfter;
@@ -2101,6 +2103,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
};
runWithCompactionsDisabled(Executors.callable(truncateRunnable), true, true);
+
+ viewManager.build();
+
logger.trace("truncate complete");
}
diff --git a/src/java/org/apache/cassandra/db/view/TableViews.java b/src/java/org/apache/cassandra/db/view/TableViews.java
index d2d4a45..34d112f 100644
--- a/src/java/org/apache/cassandra/db/view/TableViews.java
+++ b/src/java/org/apache/cassandra/db/view/TableViews.java
@@ -83,6 +83,16 @@ public class TableViews extends AbstractCollection<View>
return Iterables.transform(views, view -> keyspace.getColumnFamilyStore(view.getDefinition().viewName));
}
+ public void build()
+ {
+ views.forEach(View::build);
+ }
+
+ public void stopBuild()
+ {
+ views.forEach(View::stopBuild);
+ }
+
public void forceBlockingFlush()
{
for (ColumnFamilyStore viewCfs : allViewsCfs())
diff --git a/src/java/org/apache/cassandra/db/view/View.java b/src/java/org/apache/cassandra/db/view/View.java
index 9716dc4..b5cd387 100644
--- a/src/java/org/apache/cassandra/db/view/View.java
+++ b/src/java/org/apache/cassandra/db/view/View.java
@@ -206,15 +206,20 @@ public class View
public synchronized void build()
{
- if (this.builder != null)
+ stopBuild();
+ builder = new ViewBuilder(baseCfs, this);
+ CompactionManager.instance.submitViewBuilder(builder);
+ }
+
+ synchronized void stopBuild()
+ {
+ if (builder != null)
{
- logger.debug("Stopping current view builder due to schema change");
- this.builder.stop();
- this.builder = null;
+ logger.debug("Stopping current view builder due to schema change or truncate");
+ builder.stop();
+ builder.waitForCompletion();
+ builder = null;
}
-
- this.builder = new ViewBuilder(baseCfs, this);
- CompactionManager.instance.submitViewBuilder(builder);
}
@Nullable
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index 57bba29..ca60da3 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -22,15 +22,11 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-
-import javax.annotation.Nullable;
import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,8 +45,6 @@ import org.apache.cassandra.io.sstable.ReducingKeyIterator;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.pager.QueryPager;
-import org.apache.cassandra.transport.Server;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.UUIDGen;
@@ -61,6 +55,7 @@ public class ViewBuilder extends CompactionInfo.Holder
private final ColumnFamilyStore baseCfs;
private final View view;
private final UUID compactionId;
+ private final CountDownLatch completed = new CountDownLatch(1);
private volatile Token prevToken = null;
private static final Logger logger = LoggerFactory.getLogger(ViewBuilder.class);
@@ -109,6 +104,7 @@ public class ViewBuilder extends CompactionInfo.Holder
if (SystemKeyspace.isViewBuilt(ksname, viewName))
{
logger.debug("View already marked built for {}.{}", baseCfs.metadata.ksName, view.name);
+ completed.countDown();
return;
}
Iterable<Range<Token>> ranges = StorageService.instance.getLocalRanges(baseCfs.metadata.ksName);
@@ -184,6 +180,7 @@ public class ViewBuilder extends CompactionInfo.Holder
TimeUnit.MINUTES);
logger.warn("Materialized View failed to complete, sleeping 5 minutes before restarting", e);
}
+ completed.countDown();
}
public CompactionInfo getCompactionInfo()
@@ -209,4 +206,16 @@ public class ViewBuilder extends CompactionInfo.Holder
{
return false;
}
+
+ void waitForCompletion()
+ {
+ try
+ {
+ completed.await();
+ }
+ catch (InterruptedException ie)
+ {
+ throw new AssertionError(ie);
+ }
+ }
}
diff --git a/test/unit/org/apache/cassandra/cql3/ViewTest.java b/test/unit/org/apache/cassandra/cql3/ViewTest.java
index 0d49e4b..877218e 100644
--- a/test/unit/org/apache/cassandra/cql3/ViewTest.java
+++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java
@@ -18,12 +18,10 @@
package org.apache.cassandra.cql3;
-import static org.junit.Assert.*;
-
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
-import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.Uninterruptibles;
@@ -33,11 +31,13 @@ import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.exceptions.InvalidQueryException;
import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
import org.apache.cassandra.concurrent.SEPExecutor;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
@@ -48,16 +48,26 @@ import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.marshal.AsciiType;
-import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.utils.FBUtilities;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import static junit.framework.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+@RunWith(BMUnitRunner.class)
public class ViewTest extends CQLTester
{
+ /** Latch used by {@link #testTruncateWhileBuilding()} Byteman injections. */
+ @SuppressWarnings("unused")
+ private static final CountDownLatch blockViewBuild = new CountDownLatch(1);
+
int protocolVersion = 4;
private final List<String> views = new ArrayList<>();
@@ -90,11 +100,13 @@ public class ViewTest extends CQLTester
private void updateView(String query, Object... params) throws Throwable
{
executeNet(protocolVersion, query, params);
- while (!(((SEPExecutor) StageManager.getStage(Stage.VIEW_MUTATION)).getPendingTasks() == 0
- && ((SEPExecutor) StageManager.getStage(Stage.VIEW_MUTATION)).getActiveCount() == 0))
- {
- Thread.sleep(1);
- }
+ waitForViewMutations();
+ }
+
+ private void waitForViewMutations()
+ {
+ SEPExecutor executor = (SEPExecutor) StageManager.getStage(Stage.VIEW_MUTATION);
+ Util.spinAssertEquals(0L, () -> executor.getPendingTasks() + executor.getActiveCount(), 60);
}
@Test
@@ -1408,4 +1420,58 @@ public class ViewTest extends CQLTester
}
}
+ /**
+ * Tests that truncating a table stops the ongoing builds of its materialized views,
+ * so they don't write into the MV data that has been truncated in the base table.
+ *
+ * See CASSANDRA-16567 for further details.
+ */
+ @Test
+ @BMRules(rules = {
+ @BMRule(name = "Block view builder",
+ targetClass = "ViewBuilder",
+ targetMethod = "buildKey",
+ action = "com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly" +
+ "(org.apache.cassandra.cql3.ViewTest.blockViewBuild);"),
+ @BMRule(name = "Unblock view builder",
+ targetClass = "ColumnFamilyStore",
+ targetMethod = "truncateBlocking",
+ action = "org.apache.cassandra.cql3.ViewTest.blockViewBuild.countDown();")
+ })
+ public void testTruncateWhileBuilding() throws Throwable
+ {
+ createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY(k, c))");
+ execute("USE " + keyspace());
+ executeNet(protocolVersion, "USE " + keyspace());
+ execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 0, 0, 0);
+ createView("mv",
+ "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s " +
+ "WHERE k IS NOT NULL AND c IS NOT NULL AND v IS NOT NULL " +
+ "PRIMARY KEY (v, c, k)");
+
+ // check that the delayed view builder is either running or pending,
+ // and that it hasn't written anything yet
+ assertThat(runningCompactions()).isPositive();
+ assertFalse(SystemKeyspace.isViewBuilt(KEYSPACE, "mv"));
+ waitForViewMutations();
+ assertRows(execute("SELECT * FROM mv"));
+
+ // truncate the view, this should unblock the view builder, wait for its cancellation,
+ // drop the sstables and, finally, start a new view build
+ updateView("TRUNCATE %s");
+
+ // check that there aren't any rows after truncating
+ assertRows(execute("SELECT * FROM mv"));
+
+ // check that the view builder finishes and that the view is still empty after that
+ Util.spinAssertEquals(0, ViewTest::runningCompactions, 60);
+ assertTrue(SystemKeyspace.isViewBuilt(KEYSPACE, "mv"));
+ waitForViewMutations();
+ assertRows(execute("SELECT * FROM mv"));
+ }
+
+ private static int runningCompactions()
+ {
+ return CompactionManager.instance.getPendingTasks() + CompactionManager.instance.getActiveCompactions();
+ }
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org