You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2018/07/20 08:54:23 UTC

cassandra git commit: Wait for schema agreement prior to building MVs

Repository: cassandra
Updated Branches:
  refs/heads/trunk 8bc2fba3e -> 13150b001


Wait for schema agreement prior to building MVs

patch by Aleksey Yeschenko; reviewed by Sam Tunnicliffe for CASSANDRA-14571


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/13150b00
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/13150b00
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/13150b00

Branch: refs/heads/trunk
Commit: 13150b001a8ddf82a77ac9525c446b7e9e32325c
Parents: 8bc2fba
Author: Aleksey Yeshchenko <al...@apple.com>
Authored: Wed Jul 18 22:55:23 2018 +0100
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Fri Jul 20 09:52:51 2018 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/db/view/ViewBuilderTask.java      | 11 +++++
 src/java/org/apache/cassandra/gms/Gossiper.java | 47 ++++++++++++++++++++
 3 files changed, 59 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/13150b00/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 63831d7..fe4087b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Wait for schema agreement prior to building MVs (CASSANDRA-14571)
  * Make all DDL statements idempotent and not dependent on global state (CASSANDRA-13426)
  * Bump the hints messaging version to match the current one (CASSANDRA-14536)
  * OffsetAwareConfigurationLoader doesn't set ssl storage port causing bind errors in CircleCI (CASSANDRA-14546)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/13150b00/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java b/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java
index 1b16f18..453cb62 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Function;
@@ -52,6 +53,7 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.service.StorageProxy;
@@ -122,6 +124,15 @@ public class ViewBuilderTask extends CompactionInfo.Holder implements Callable<L
         else
             logger.debug("Resuming view build for range {} from token {} with {} covered keys", range, prevToken, keysBuilt);
 
+        /*
+         * It's possible for view building to start before MV creation got propagated to other nodes. For this reason
+         * we should wait for schema to converge before attempting to send any view mutations to other nodes, or else
+         * face UnknownTableException upon Mutation deserialization on the nodes that haven't processed the schema change.
+         */
+        boolean schemaConverged = Gossiper.instance.waitForSchemaAgreement(10, TimeUnit.SECONDS, () -> this.isStopped);
+        if (!schemaConverged)
+            logger.warn("Failed to get schema to converge before building view {}.{}", baseCfs.keyspace.getName(), view.name);
+
         Function<org.apache.cassandra.db.lifecycle.View, Iterable<SSTableReader>> function;
         function = org.apache.cassandra.db.lifecycle.View.select(SSTableSet.CANONICAL, s -> range.intersects(s.getBounds()));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/13150b00/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 3975187..7bb2583 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -23,6 +23,7 @@ import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.*;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BooleanSupplier;
 import java.util.stream.Collectors;
 
 import javax.annotation.Nullable;
@@ -1833,4 +1834,50 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
             logger.info("No gossip backlog; proceeding");
     }
 
+    /**
+     * Blockingly wait for all live nodes to agree on the current schema version.
+     *
+     * @param maxWait maximum time to wait for schema agreement
+     * @param unit TimeUnit of maxWait
+     * @return true if agreement was reached, false if not
+     */
+    public boolean waitForSchemaAgreement(long maxWait, TimeUnit unit, BooleanSupplier abortCondition)
+    {
+        int waited = 0;
+        int toWait = 50;
+
+        Set<InetAddressAndPort> members = getLiveTokenOwners();
+
+        while (true)
+        {
+            if (nodesAgreeOnSchema(members))
+                return true;
+
+            if (waited >= unit.toMillis(maxWait) || abortCondition.getAsBoolean())
+                return false;
+
+            Uninterruptibles.sleepUninterruptibly(toWait, TimeUnit.MILLISECONDS);
+            waited += toWait;
+            toWait = Math.min(1000, toWait * 2);
+        }
+    }
+
+    private boolean nodesAgreeOnSchema(Collection<InetAddressAndPort> nodes)
+    {
+        UUID expectedVersion = null;
+
+        for (InetAddressAndPort node : nodes)
+        {
+            EndpointState state = getEndpointStateForEndpoint(node);
+            UUID remoteVersion = state.getSchemaVersion();
+
+            if (null == expectedVersion)
+                expectedVersion = remoteVersion;
+
+            if (null == expectedVersion || !expectedVersion.equals(remoteVersion))
+                return false;
+        }
+
+        return true;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org