You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2017/12/13 13:03:36 UTC

[1/4] cassandra git commit: Prevent continuous schema exchange between 3.0 to 3.11 nodes

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.11 f6381db8e -> e646e5032
  refs/heads/trunk 0d8199bab -> 7a40abb6a


Prevent continuous schema exchange between 3.0 to 3.11 nodes

patch by Robert Stupp; reviewed by Andrés de la Peña for CASSANDRA-14109


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e646e503
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e646e503
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e646e503

Branch: refs/heads/cassandra-3.11
Commit: e646e5032b68622f7ec1dd0c53137be08baabed9
Parents: f6381db
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed Dec 13 13:58:53 2017 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Dec 13 13:58:53 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 NEWS.txt                                        |  9 +++
 .../org/apache/cassandra/config/Schema.java     | 79 ++++++++++++++++++--
 src/java/org/apache/cassandra/db/Columns.java   |  7 ++
 .../cassandra/db/SchemaCheckVerbHandler.java    |  9 ++-
 .../apache/cassandra/db/rows/AbstractRow.java   | 10 ++-
 .../db/rows/RangeTombstoneBoundMarker.java      |  7 ++
 .../db/rows/RangeTombstoneBoundaryMarker.java   |  7 ++
 .../apache/cassandra/db/rows/RowIterators.java  | 21 +++++-
 .../apache/cassandra/db/rows/Unfiltered.java    | 12 +++
 .../org/apache/cassandra/gms/EndpointState.java | 23 ++++++
 src/java/org/apache/cassandra/gms/Gossiper.java | 65 +++++++++-------
 .../cassandra/hints/HintsDispatchTrigger.java   |  3 +-
 .../apache/cassandra/schema/SchemaKeyspace.java | 15 +++-
 .../cassandra/service/MigrationManager.java     | 58 +++++++++-----
 .../cassandra/service/StorageService.java       | 25 +++++++
 .../cassandra/utils/CassandraVersion.java       |  5 ++
 17 files changed, 300 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index eaf312f..60794f0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.11.2
+ * Prevent continuous schema exchange between 3.0 and 3.11 nodes (CASSANDRA-14109)
  * Fix imbalanced disks when replacing node with same address with JBOD (CASSANDRA-14084)
  * Reload compaction strategies when disk boundaries are invalidated (CASSANDRA-13948)
  * Remove OpenJDK log warning (CASSANDRA-13916)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 0c32278..f4b15e7 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -25,6 +25,15 @@ Upgrading
     - Cassandra is not logging anymore by default an Heap histogram on OutOfMemoryError. To enable that behavior
       set the 'cassandra.printHeapHistogramOnOutOfMemoryError' System property to 'true'. See CASSANDRA-13006
       for more details.
+    - Upgrades from 3.0 might have produced unnecessary schema migrations while
+      there was at least one 3.0 node in the cluster. It is therefore highly
+      recommended to upgrade from 3.0 to at least 3.11.2. The root cause of
+      this schema mismatch was a difference in the way how schema digests were computed
+      in 3.0 and 3.11.2. To mitigate this issue, 3.11.2 and newer announce
+      3.0 compatible digests as long as there is at least one 3.0 node in the
+      cluster. Once all nodes have been upgraded, the "real" schema version will be
+      announced. Note: this fix is only necessary in 3.11.2 and therefore only applies
+      to 3.11. (CASSANDRA-14109)
 
 Materialized Views
 -------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index 8fc83df..253a66b 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -22,6 +22,7 @@ import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,6 +34,7 @@ import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.locator.LocalStrategy;
@@ -40,7 +42,6 @@ import org.apache.cassandra.schema.*;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.utils.ConcurrentBiMap;
 import org.apache.cassandra.utils.Pair;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 public class Schema
 {
@@ -58,6 +59,7 @@ public class Schema
     private final ConcurrentBiMap<Pair<String, String>, UUID> cfIdMap = new ConcurrentBiMap<>();
 
     private volatile UUID version;
+    private volatile UUID altVersion;
 
     /**
      * Initialize empty schema object and load the hardcoded system tables
@@ -518,30 +520,82 @@ public class Schema
     /* Version control */
 
     /**
-     * @return current schema version
+     * The schema version to announce.
+     * This will be either the "real" schema version including the {@code cdc} column,
+     * if no node in the cluster is running at 3.0, or a 3.0 compatible
+     * schema version, with the {@code cdc} column excluded, if at least one node is
+     * running 3.0.
+     *
+     * @return "current" schema version
      */
     public UUID getVersion()
     {
+        return Gossiper.instance.isEnabled() && Gossiper.instance.isAnyNodeOn30()
+               ? altVersion
+               : version;
+    }
+
+    /**
+     * The 3.11 schema version, always includes the {@code cdc} column.
+     */
+    public UUID getRealVersion()
+    {
         return version;
     }
 
     /**
+     * The "alternative" schema version, compatible to 3.0, always excludes the
+     * {@code cdc} column.
+     */
+    public UUID getAltVersion()
+    {
+        return altVersion;
+    }
+
+    /**
+     * Checks whether the given schema version is the same as the current local schema
+     * version, either the 3.0 compatible or "real" one.
+     */
+    public boolean isSameVersion(UUID schemaVersion)
+    {
+        return schemaVersion != null
+               && (schemaVersion.equals(version) || schemaVersion.equals(altVersion));
+    }
+
+    /**
+     * Checks whether the current schema is empty.
+     */
+    public boolean isEmpty()
+    {
+        return SchemaConstants.emptyVersion.equals(version);
+    }
+
+    /**
      * Read schema from system keyspace and calculate MD5 digest of every row, resulting digest
      * will be converted into UUID which would act as content-based version of the schema.
+     *
+     * 3.11 note: we calculate the "real" schema version and the 3.0 compatible schema
+     * version here.
      */
     public void updateVersion()
     {
-        version = SchemaKeyspace.calculateSchemaDigest();
-        SystemKeyspace.updateSchemaVersion(version);
+        Pair<UUID, UUID> mixedVersions = SchemaKeyspace.calculateSchemaDigest();
+        version = mixedVersions.left;
+        altVersion = mixedVersions.right;
+        SystemKeyspace.updateSchemaVersion(getVersion());
     }
 
-    /*
+    /**
      * Like updateVersion, but also announces via gossip
+     *
+     * 3.11 note: we announce the "current" schema version, which can be either the 3.0
+     * compatible one, if at least one node is still running 3.0, or the "real" schema version.
      */
     public void updateVersionAndAnnounce()
     {
         updateVersion();
-        MigrationManager.passiveAnnounce(version);
+        UUID current = getVersion();
+        MigrationManager.passiveAnnounce(current, current == getAltVersion());
     }
 
     /**
@@ -785,4 +839,17 @@ public class Schema
 
         return transformed;
     }
+
+    /**
+     * Converts the given schema version to a string. Returns {@code unknown}, if {@code version} is {@code null}
+     * or {@code "(empty)"}, if {@code version} refers to an {@link SchemaConstants#emptyVersion empty) schema.
+     */
+    public static String schemaVersionToString(UUID version)
+    {
+        return version == null
+               ? "unknown"
+               : SchemaConstants.emptyVersion.equals(version)
+                 ? "(empty)"
+                 : version.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/Columns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java
index 18729de..965e401 100644
--- a/src/java/org/apache/cassandra/db/Columns.java
+++ b/src/java/org/apache/cassandra/db/Columns.java
@@ -367,6 +367,13 @@ public class Columns extends AbstractCollection<ColumnDefinition> implements Col
             digest.update(c.name.bytes.duplicate());
     }
 
+    public void digest(MessageDigest digest, Set<ByteBuffer> columnsToExclude)
+    {
+        for (ColumnDefinition c : this)
+            if (!columnsToExclude.contains(c.name.bytes))
+                digest.update(c.name.bytes.duplicate());
+    }
+
     /**
      * Apply a function to each column definition in forwards or reversed order.
      * @param function

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java b/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
index 4270a24..be501de 100644
--- a/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
@@ -36,7 +36,14 @@ public class SchemaCheckVerbHandler implements IVerbHandler
     public void doVerb(MessageIn message, int id)
     {
         logger.trace("Received schema check request.");
-        MessageOut<UUID> response = new MessageOut<UUID>(MessagingService.Verb.INTERNAL_RESPONSE, Schema.instance.getVersion(), UUIDSerializer.serializer);
+
+        /*
+        3.11 is special here: We return the 3.0 compatible version, if the requesting node
+        is running 3.0. Otherwise the "real" schema version.
+        */
+        MessageOut<UUID> response = new MessageOut<>(MessagingService.Verb.INTERNAL_RESPONSE,
+                                                     Schema.instance.getVersion(),
+                                                     UUIDSerializer.serializer);
         MessagingService.instance().sendReply(response, id, message.from);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/rows/AbstractRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRow.java b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
index 847cb47..13e6502 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractRow.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
@@ -19,7 +19,9 @@ package org.apache.cassandra.db.rows;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.util.AbstractCollection;
+import java.util.Collections;
 import java.util.Objects;
+import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -63,6 +65,11 @@ public abstract class AbstractRow extends AbstractCollection<ColumnData> impleme
 
     public void digest(MessageDigest digest)
     {
+        digest(digest, Collections.emptySet());
+    }
+
+    public void digest(MessageDigest digest, Set<ByteBuffer> columnsToExclude)
+    {
         FBUtilities.updateWithByte(digest, kind().ordinal());
         clustering().digest(digest);
 
@@ -70,7 +77,8 @@ public abstract class AbstractRow extends AbstractCollection<ColumnData> impleme
         primaryKeyLivenessInfo().digest(digest);
 
         for (ColumnData cd : this)
-            cd.digest(digest);
+            if (!columnsToExclude.contains(cd.column.name.bytes))
+                cd.digest(digest);
     }
 
     public void validateData(CFMetaData metadata)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
index a82bb64..fb94da3 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.rows;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.util.Objects;
+import java.util.Set;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
@@ -132,6 +133,12 @@ public class RangeTombstoneBoundMarker extends AbstractRangeTombstoneMarker<Clus
         deletion.digest(digest);
     }
 
+    @Override
+    public void digest(MessageDigest digest, Set<ByteBuffer> columnsToExclude)
+    {
+        digest(digest);
+    }
+
     public String toString(CFMetaData metadata)
     {
         return "Marker " + bound.toString(metadata) + '@' + deletion.markedForDeleteAt();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
index 70d6a9d..9190ecf 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.rows;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.util.Objects;
+import java.util.Set;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
@@ -151,6 +152,12 @@ public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker<C
         startDeletion.digest(digest);
     }
 
+    @Override
+    public void digest(MessageDigest digest, Set<ByteBuffer> columnsToExclude)
+    {
+        digest(digest);
+    }
+
     public String toString(CFMetaData metadata)
     {
         return String.format("Marker %s@%d-%d", bound.toString(metadata), endDeletion.markedForDeleteAt(), startDeletion.markedForDeleteAt());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/rows/RowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowIterators.java b/src/java/org/apache/cassandra/db/rows/RowIterators.java
index bce6a7d..1463bf5 100644
--- a/src/java/org/apache/cassandra/db/rows/RowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/RowIterators.java
@@ -17,7 +17,9 @@
  */
 package org.apache.cassandra.db.rows;
 
+import java.nio.ByteBuffer;
 import java.security.MessageDigest;
+import java.util.Set;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,7 +38,7 @@ public abstract class RowIterators
 
     private RowIterators() {}
 
-    public static void digest(RowIterator iterator, MessageDigest digest)
+    public static void digest(RowIterator iterator, MessageDigest digest, MessageDigest altDigest, Set<ByteBuffer> columnsToExclude)
     {
         // TODO: we're not computing digest the same way that old nodes. This is
         // currently ok as this is only used for schema digest and the is no exchange
@@ -48,8 +50,23 @@ public abstract class RowIterators
         FBUtilities.updateWithBoolean(digest, iterator.isReverseOrder());
         iterator.staticRow().digest(digest);
 
+        if (altDigest != null)
+        {
+            // Compute the "alternative digest" here.
+            altDigest.update(iterator.partitionKey().getKey().duplicate());
+            iterator.columns().regulars.digest(altDigest, columnsToExclude);
+            iterator.columns().statics.digest(altDigest, columnsToExclude);
+            FBUtilities.updateWithBoolean(altDigest, iterator.isReverseOrder());
+            iterator.staticRow().digest(altDigest, columnsToExclude);
+        }
+
         while (iterator.hasNext())
-            iterator.next().digest(digest);
+        {
+            Row row = iterator.next();
+            row.digest(digest);
+            if (altDigest != null)
+                row.digest(altDigest, columnsToExclude);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/rows/Unfiltered.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Unfiltered.java b/src/java/org/apache/cassandra/db/rows/Unfiltered.java
index e75c632..3d8a9b1 100644
--- a/src/java/org/apache/cassandra/db/rows/Unfiltered.java
+++ b/src/java/org/apache/cassandra/db/rows/Unfiltered.java
@@ -17,7 +17,9 @@
  */
 package org.apache.cassandra.db.rows;
 
+import java.nio.ByteBuffer;
 import java.security.MessageDigest;
+import java.util.Set;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.Clusterable;
@@ -46,6 +48,16 @@ public interface Unfiltered extends Clusterable
     public void digest(MessageDigest digest);
 
     /**
+     * Digest the atom using the provided {@code MessageDigest}.
+     * This method only exists in 3.11.
+     * Same like {@link #digest(MessageDigest)}, but excludes the given columns from digest calculation.
+     */
+    public default void digest(MessageDigest digest, Set<ByteBuffer> columnsToExclude)
+    {
+        throw new UnsupportedOperationException("no no no - don't use this one - use digest(MessageDigest) instead");
+    }
+
+    /**
      * Validate the data of this atom.
      *
      * @param metadata the metadata for the table this atom is part of.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/gms/EndpointState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java
index 70f2a68..674b597 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -22,14 +22,19 @@ import java.util.Collections;
 import java.util.EnumMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 
+import javax.annotation.Nullable;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.CassandraVersion;
+
 /**
  * This abstraction represents both the HeartBeatState and the ApplicationState in an EndpointState
  * instance. Any state for a given endpoint can be retrieved from this instance.
@@ -154,6 +159,24 @@ public class EndpointState
         return pieces[0];
     }
 
+    @Nullable
+    public UUID getSchemaVersion()
+    {
+        VersionedValue applicationState = getApplicationState(ApplicationState.SCHEMA);
+        return applicationState != null
+               ? UUID.fromString(applicationState.value)
+               : null;
+    }
+
+    @Nullable
+    public CassandraVersion getReleaseVersion()
+    {
+        VersionedValue applicationState = getApplicationState(ApplicationState.RELEASE_VERSION);
+        return applicationState != null
+               ? new CassandraVersion(applicationState.value)
+               : null;
+    }
+
     public String toString()
     {
         return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " + applicationState.get();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 14601d7..2dac5c2 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -24,7 +24,7 @@ import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.*;
 import java.util.concurrent.locks.ReentrantLock;
-
+import javax.annotation.Nullable;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
@@ -32,9 +32,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
-
-import org.apache.cassandra.utils.CassandraVersion;
-import org.apache.cassandra.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,14 +40,17 @@ import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.CassandraVersion;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.Pair;
 
 /**
  * This module is responsible for Gossiping information for the local endpoint. This abstraction
@@ -75,6 +75,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     static final List<String> DEAD_STATES = Arrays.asList(VersionedValue.REMOVING_TOKEN, VersionedValue.REMOVED_TOKEN,
                                                           VersionedValue.STATUS_LEFT, VersionedValue.HIBERNATE);
     static ArrayList<String> SILENT_SHUTDOWN_STATES = new ArrayList<>();
+
     static
     {
         SILENT_SHUTDOWN_STATES.addAll(DEAD_STATES);
@@ -130,6 +131,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
 
     private final Map<InetAddress, Long> expireTimeEndpointMap = new ConcurrentHashMap<InetAddress, Long>();
 
+    private volatile boolean anyNodeOn30 = false; // we assume the regular case here - all nodes are on 3.11
     private volatile boolean inShadowRound = false;
     // seeds gathered during shadow round that indicated to be in the shadow round phase as well
     private final Set<InetAddress> seedsInShadowRound = new ConcurrentSkipListSet<>(inetcomparator);
@@ -852,20 +854,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         return endpointStateMap.get(ep);
     }
 
-    public boolean valuesEqual(InetAddress ep1, InetAddress ep2, ApplicationState as)
-    {
-        EndpointState state1 = getEndpointStateForEndpoint(ep1);
-        EndpointState state2 = getEndpointStateForEndpoint(ep2);
-
-        if (state1 == null || state2 == null)
-            return false;
-
-        VersionedValue value1 = state1.getApplicationState(as);
-        VersionedValue value2 = state2.getApplicationState(as);
-
-        return !(value1 == null || value2 == null) && value1.value.equals(value2.value);
-    }
-
     public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
     {
         return endpointStateMap.entrySet();
@@ -1198,6 +1186,26 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
                 handleMajorStateChange(ep, remoteState);
             }
         }
+
+        boolean any30 = anyEndpointOn30();
+        if (any30 != anyNodeOn30)
+        {
+            logger.info(any30
+                        ? "There is at least one 3.0 node in the cluster - will store and announce compatible schema version"
+                        : "There are no 3.0 nodes in the cluster - will store and announce real schema version");
+
+            anyNodeOn30 = any30;
+            executor.submit(Schema.instance::updateVersionAndAnnounce);
+        }
+    }
+
+    private boolean anyEndpointOn30()
+    {
+        return endpointStateMap.values()
+                               .stream()
+                               .map(EndpointState::getReleaseVersion)
+                               .filter(Objects::nonNull)
+                               .anyMatch(CassandraVersion::is30);
     }
 
     private void applyNewStates(InetAddress addr, EndpointState localState, EndpointState remoteState)
@@ -1547,6 +1555,11 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled());
     }
 
+    public boolean isAnyNodeOn30()
+    {
+        return anyNodeOn30;
+    }
+
     protected void maybeFinishShadowRound(InetAddress respondent, boolean isInShadowRound, Map<InetAddress, EndpointState> epStateMap)
     {
         if (inShadowRound)
@@ -1629,16 +1642,18 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         return System.currentTimeMillis() + Gossiper.aVeryLongTime;
     }
 
+    @Nullable
     public CassandraVersion getReleaseVersion(InetAddress ep)
     {
         EndpointState state = getEndpointStateForEndpoint(ep);
-        if (state != null)
-        {
-            VersionedValue applicationState = state.getApplicationState(ApplicationState.RELEASE_VERSION);
-            if (applicationState != null)
-                return new CassandraVersion(applicationState.value);
-        }
-        return null;
+        return state != null ? state.getReleaseVersion() : null;
+    }
+
+    @Nullable
+    public UUID getSchemaVersion(InetAddress ep)
+    {
+        EndpointState state = getEndpointStateForEndpoint(ep);
+        return state != null ? state.getSchemaVersion() : null;
     }
 
     public static void waitToSettle()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java b/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
index 47d986f..cc1c221 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.hints;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.Gossiper;
 
@@ -64,7 +65,7 @@ final class HintsDispatchTrigger implements Runnable
                .filter(store -> !isScheduled(store))
                .filter(HintsStore::isLive)
                .filter(store -> store.isWriting() || store.hasFiles())
-               .filter(store -> Gossiper.instance.valuesEqual(getBroadcastAddress(), store.address(), ApplicationState.SCHEMA))
+               .filter(store -> Schema.instance.isSameVersion(Gossiper.instance.getSchemaVersion(store.address())))
                .forEach(this::schedule);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 7834b12..b6add96 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -311,18 +311,24 @@ public final class SchemaKeyspace
     /**
      * Read schema from system keyspace and calculate MD5 digest of every row, resulting digest
      * will be converted into UUID which would act as content-based version of the schema.
+     *
+     * This implementation is special cased for 3.11 as it returns the schema digests for 3.11
+     * <em>and</em> 3.0 - i.e. with and without the beloved {@code cdc} column.
      */
-    public static UUID calculateSchemaDigest()
+    public static Pair<UUID, UUID> calculateSchemaDigest()
     {
         MessageDigest digest;
+        MessageDigest digest30;
         try
         {
             digest = MessageDigest.getInstance("MD5");
+            digest30 = MessageDigest.getInstance("MD5");
         }
         catch (NoSuchAlgorithmException e)
         {
             throw new RuntimeException(e);
         }
+        Set<ByteBuffer> cdc = Collections.singleton(ByteBufferUtil.bytes("cdc"));
 
         for (String table : ALL_FOR_DIGEST)
         {
@@ -340,12 +346,15 @@ public final class SchemaKeyspace
                     try (RowIterator partition = schema.next())
                     {
                         if (!isSystemKeyspaceSchemaPartition(partition.partitionKey()))
-                            RowIterators.digest(partition, digest);
+                        {
+                            RowIterators.digest(partition, digest, digest30, cdc);
+                        }
                     }
                 }
             }
         }
-        return UUID.nameUUIDFromBytes(digest.digest());
+
+        return Pair.create(UUID.nameUUIDFromBytes(digest.digest()), UUID.nameUUIDFromBytes(digest30.digest()));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 3332d2c..a1b3597 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -31,7 +31,6 @@ import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.config.SchemaConstants;
 import org.apache.cassandra.config.ViewDefinition;
 import org.apache.cassandra.cql3.functions.UDAggregate;
 import org.apache.cassandra.cql3.functions.UDFunction;
@@ -78,10 +77,9 @@ public class MigrationManager
 
     public static void scheduleSchemaPull(InetAddress endpoint, EndpointState state)
     {
-        VersionedValue value = state.getApplicationState(ApplicationState.SCHEMA);
-
-        if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && value != null)
-            maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint);
+        UUID schemaVersion = state.getSchemaVersion();
+        if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && schemaVersion != null)
+            maybeScheduleSchemaPull(schemaVersion, endpoint);
     }
 
     /**
@@ -90,16 +88,37 @@ public class MigrationManager
      */
     private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint)
     {
-        if ((Schema.instance.getVersion() != null && Schema.instance.getVersion().equals(theirVersion)) || !shouldPullSchemaFrom(endpoint))
+        if (Schema.instance.getVersion() == null)
+        {
+            logger.debug("Not pulling schema from {}, because local schama version is not known yet",
+                         endpoint);
+            return;
+        }
+        if (Schema.instance.isSameVersion(theirVersion))
+        {
+            logger.debug("Not pulling schema from {}, because schema versions match: " +
+                         "local/real={}, local/compatible={}, remote={}",
+                         endpoint,
+                         Schema.schemaVersionToString(Schema.instance.getRealVersion()),
+                         Schema.schemaVersionToString(Schema.instance.getAltVersion()),
+                         Schema.schemaVersionToString(theirVersion));
+            return;
+        }
+        if (!shouldPullSchemaFrom(endpoint))
         {
             logger.debug("Not pulling schema because versions match or shouldPullSchemaFrom returned false");
             return;
         }
 
-        if (SchemaConstants.emptyVersion.equals(Schema.instance.getVersion()) || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
+        if (Schema.instance.isEmpty() || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
         {
             // If we think we may be bootstrapping or have recently started, submit MigrationTask immediately
-            logger.debug("Submitting migration task for {}", endpoint);
+            logger.debug("Immediately submitting migration task for {}, " +
+                         "schema versions: local/real={}, local/compatible={}, remote={}",
+                         endpoint,
+                         Schema.schemaVersionToString(Schema.instance.getRealVersion()),
+                         Schema.schemaVersionToString(Schema.instance.getAltVersion()),
+                         Schema.schemaVersionToString(theirVersion));
             submitMigrationTask(endpoint);
         }
         else
@@ -109,20 +128,22 @@ public class MigrationManager
             Runnable runnable = () ->
             {
                 // grab the latest version of the schema since it may have changed again since the initial scheduling
-                EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
-                if (epState == null)
+                UUID epSchemaVersion = Gossiper.instance.getSchemaVersion(endpoint);
+                if (epSchemaVersion == null)
                 {
                     logger.debug("epState vanished for {}, not submitting migration task", endpoint);
                     return;
                 }
-                VersionedValue value = epState.getApplicationState(ApplicationState.SCHEMA);
-                UUID currentVersion = UUID.fromString(value.value);
-                if (Schema.instance.getVersion().equals(currentVersion))
+                if (Schema.instance.isSameVersion(epSchemaVersion))
                 {
-                    logger.debug("not submitting migration task for {} because our versions match", endpoint);
+                    logger.debug("Not submitting migration task for {} because our versions match ({})", endpoint, epSchemaVersion);
                     return;
                 }
-                logger.debug("submitting migration task for {}", endpoint);
+                logger.debug("submitting migration task for {}, schema version mismatch: local/real={}, local/compatible={}, remote={}",
+                             endpoint,
+                             Schema.schemaVersionToString(Schema.instance.getRealVersion()),
+                             Schema.schemaVersionToString(Schema.instance.getAltVersion()),
+                             Schema.schemaVersionToString(epSchemaVersion));
                 submitMigrationTask(endpoint);
             };
             ScheduledExecutors.nonPeriodicTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
@@ -585,11 +606,14 @@ public class MigrationManager
      * Used to notify nodes as they arrive in the cluster.
      *
      * @param version The schema version to announce
+     * @param compatible flag whether {@code version} is a 3.0 compatible version
      */
-    public static void passiveAnnounce(UUID version)
+    public static void passiveAnnounce(UUID version, boolean compatible)
     {
         Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.schema(version));
-        logger.debug("Gossiping my schema version {}", version);
+        logger.debug("Gossiping my {} schema version {}",
+                     compatible ? "3.0 compatible" : "3.11",
+                     Schema.schemaVersionToString(version));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 15027b2..c5e2912 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -831,6 +831,31 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
     }
 
+    public void waitForSchema(int delay)
+    {
+        logger.debug("Waiting for schema (max {} seconds)", delay);
+        // first sleep the delay to make sure we see all our peers
+        for (int i = 0; i < delay; i += 1000)
+        {
+            // if we see schema, we can proceed to the next check directly
+            if (!Schema.instance.isEmpty())
+            {
+                logger.debug("current schema version: {} (3.0 compatible: {})", Schema.instance.getRealVersion(), Schema.instance.getAltVersion());
+                break;
+            }
+            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+        }
+        // if our schema hasn't matched yet, wait until it has
+        // we do this by waiting for all in-flight migration requests and responses to complete
+        // (post CASSANDRA-1391 we don't expect this to be necessary very often, but it doesn't hurt to be careful)
+        if (!MigrationManager.isReadyForBootstrap())
+        {
+            setMode(Mode.JOINING, "waiting for schema information to complete", true);
+            MigrationManager.waitUntilReadyForBootstrap();
+        }
+        logger.info("Has schema with version {}", Schema.instance.getVersion());
+    }
+
     private void joinTokenRing(int delay) throws ConfigurationException
     {
         joined = true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/utils/CassandraVersion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CassandraVersion.java b/src/java/org/apache/cassandra/utils/CassandraVersion.java
index aed0fe7..bf9fe6a 100644
--- a/src/java/org/apache/cassandra/utils/CassandraVersion.java
+++ b/src/java/org/apache/cassandra/utils/CassandraVersion.java
@@ -118,6 +118,11 @@ public class CassandraVersion implements Comparable<CassandraVersion>
         return compareIdentifiers(build, other.build, -1);
     }
 
+    public boolean is30()
+    {
+        return major == 3 && minor == 0;
+    }
+
     /**
      * Returns a version that is backward compatible with this version amongst a list
      * of provided version, or null if none can be found.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[4/4] cassandra git commit: Bring code improvements of CASSANDRA-14109 into trunk (but not the change itself)

Posted by sn...@apache.org.
Bring code improvements of CASSANDRA-14109 into trunk (but not the change itself)

patch by Robert Stupp; reviewed by Andrés de la Peña for CASSANDRA-14109


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7a40abb6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7a40abb6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7a40abb6

Branch: refs/heads/trunk
Commit: 7a40abb6a5108688fb1b10c375bb751cbb782ea4
Parents: 3e4e7d9
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed Dec 13 14:00:38 2017 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Dec 13 14:00:38 2017 +0100

----------------------------------------------------------------------
 .../org/apache/cassandra/gms/EndpointState.java | 27 +++++++++--
 src/java/org/apache/cassandra/gms/Gossiper.java | 31 ++++---------
 .../cassandra/hints/HintsDispatchTrigger.java   |  3 +-
 .../cassandra/schema/MigrationManager.java      | 48 ++++++++++++++------
 .../org/apache/cassandra/schema/Schema.java     | 30 ++++++++++++
 .../cassandra/service/StorageService.java       |  4 +-
 6 files changed, 100 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a40abb6/src/java/org/apache/cassandra/gms/EndpointState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java
index 70f2a68..847041f 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -18,18 +18,19 @@
 package org.apache.cassandra.gms;
 
 import java.io.*;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicReference;
 
+import javax.annotation.Nullable;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.CassandraVersion;
+
 /**
  * This abstraction represents both the HeartBeatState and the ApplicationState in an EndpointState
  * instance. Any state for a given endpoint can be retrieved from this instance.
@@ -154,6 +155,24 @@ public class EndpointState
         return pieces[0];
     }
 
+    @Nullable
+    public UUID getSchemaVersion()
+    {
+        VersionedValue applicationState = getApplicationState(ApplicationState.SCHEMA);
+        return applicationState != null
+               ? UUID.fromString(applicationState.value)
+               : null;
+    }
+
+    @Nullable
+    public CassandraVersion getReleaseVersion()
+    {
+        VersionedValue applicationState = getApplicationState(ApplicationState.RELEASE_VERSION);
+        return applicationState != null
+               ? new CassandraVersion(applicationState.value)
+               : null;
+    }
+
     public String toString()
     {
         return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " + applicationState.get();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a40abb6/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 15dad71..e675d92 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -25,6 +25,7 @@ import java.util.Map.Entry;
 import java.util.concurrent.*;
 import java.util.concurrent.locks.ReentrantLock;
 
+import javax.annotation.Nullable;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
@@ -852,20 +853,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         return endpointStateMap.get(ep);
     }
 
-    public boolean valuesEqual(InetAddress ep1, InetAddress ep2, ApplicationState as)
-    {
-        EndpointState state1 = getEndpointStateForEndpoint(ep1);
-        EndpointState state2 = getEndpointStateForEndpoint(ep2);
-
-        if (state1 == null || state2 == null)
-            return false;
-
-        VersionedValue value1 = state1.getApplicationState(as);
-        VersionedValue value2 = state2.getApplicationState(as);
-
-        return !(value1 == null || value2 == null) && value1.value.equals(value2.value);
-    }
-
     public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
     {
         return endpointStateMap.entrySet();
@@ -1624,16 +1611,18 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         return System.currentTimeMillis() + Gossiper.aVeryLongTime;
     }
 
+    @Nullable
     public CassandraVersion getReleaseVersion(InetAddress ep)
     {
         EndpointState state = getEndpointStateForEndpoint(ep);
-        if (state != null)
-        {
-            VersionedValue applicationState = state.getApplicationState(ApplicationState.RELEASE_VERSION);
-            if (applicationState != null)
-                return new CassandraVersion(applicationState.value);
-        }
-        return null;
+        return state != null ? state.getReleaseVersion() : null;
+    }
+
+    @Nullable
+    public UUID getSchemaVersion(InetAddress ep)
+    {
+        EndpointState state = getEndpointStateForEndpoint(ep);
+        return state != null ? state.getSchemaVersion() : null;
     }
 
     public static void waitToSettle()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a40abb6/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java b/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
index 47d986f..34d1eb2 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
@@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.schema.Schema;
 
 import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddress;
 
@@ -64,7 +65,7 @@ final class HintsDispatchTrigger implements Runnable
                .filter(store -> !isScheduled(store))
                .filter(HintsStore::isLive)
                .filter(store -> store.isWriting() || store.hasFiles())
-               .filter(store -> Gossiper.instance.valuesEqual(getBroadcastAddress(), store.address(), ApplicationState.SCHEMA))
+               .filter(store -> Schema.instance.isSameVersion(Gossiper.instance.getSchemaVersion(store.address())))
                .forEach(this::schedule);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a40abb6/src/java/org/apache/cassandra/schema/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/MigrationManager.java b/src/java/org/apache/cassandra/schema/MigrationManager.java
index d8a3b72..50e5c28 100644
--- a/src/java/org/apache/cassandra/schema/MigrationManager.java
+++ b/src/java/org/apache/cassandra/schema/MigrationManager.java
@@ -61,10 +61,9 @@ public class MigrationManager
 
     public static void scheduleSchemaPull(InetAddress endpoint, EndpointState state)
     {
-        VersionedValue value = state.getApplicationState(ApplicationState.SCHEMA);
-
-        if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && value != null)
-            maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint);
+        UUID schemaVersion = state.getSchemaVersion();
+        if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && schemaVersion != null)
+            maybeScheduleSchemaPull(schemaVersion, endpoint);
     }
 
     /**
@@ -73,16 +72,34 @@ public class MigrationManager
      */
     private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint)
     {
-        if ((Schema.instance.getVersion() != null && Schema.instance.getVersion().equals(theirVersion)) || !shouldPullSchemaFrom(endpoint))
+        if (Schema.instance.getVersion() == null)
+        {
+            logger.debug("Not pulling schema from {}, because local schama version is not known yet",
+                         endpoint);
+            return;
+        }
+        if (Schema.instance.isSameVersion(theirVersion))
+        {
+            logger.debug("Not pulling schema from {}, because schema versions match ({})",
+                         endpoint,
+                         Schema.schemaVersionToString(theirVersion));
+            return;
+        }
+        if (!shouldPullSchemaFrom(endpoint))
         {
-            logger.debug("Not pulling schema because versions match or shouldPullSchemaFrom returned false");
+            logger.debug("Not pulling schema from {}, because versions match ({}/{}), or shouldPullSchemaFrom returned false",
+                         endpoint, Schema.instance.getVersion(), theirVersion);
             return;
         }
 
-        if (SchemaConstants.emptyVersion.equals(Schema.instance.getVersion()) || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
+        if (Schema.instance.isEmpty() || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
         {
             // If we think we may be bootstrapping or have recently started, submit MigrationTask immediately
-            logger.debug("Submitting migration task for {}", endpoint);
+            logger.debug("Immediately submitting migration task for {} due to {}, " +
+                         "schema versions: local={}, remote={}",
+                         endpoint,
+                         Schema.schemaVersionToString(Schema.instance.getVersion()),
+                         Schema.schemaVersionToString(theirVersion));
             submitMigrationTask(endpoint);
         }
         else
@@ -92,20 +109,21 @@ public class MigrationManager
             Runnable runnable = () ->
             {
                 // grab the latest version of the schema since it may have changed again since the initial scheduling
-                EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
-                if (epState == null)
+                UUID epSchemaVersion = Gossiper.instance.getSchemaVersion(endpoint);
+                if (epSchemaVersion == null)
                 {
                     logger.debug("epState vanished for {}, not submitting migration task", endpoint);
                     return;
                 }
-                VersionedValue value = epState.getApplicationState(ApplicationState.SCHEMA);
-                UUID currentVersion = UUID.fromString(value.value);
-                if (Schema.instance.getVersion().equals(currentVersion))
+                if (Schema.instance.isSameVersion(epSchemaVersion))
                 {
-                    logger.debug("not submitting migration task for {} because our versions match", endpoint);
+                    logger.debug("Not submitting migration task for {} because our versions match ({})", endpoint, epSchemaVersion);
                     return;
                 }
-                logger.debug("submitting migration task for {}", endpoint);
+                logger.debug("Submitting migration task for {}, schema version mismatch: local={}, remote={}",
+                             endpoint,
+                             Schema.schemaVersionToString(Schema.instance.getVersion()),
+                             Schema.schemaVersionToString(epSchemaVersion));
                 submitMigrationTask(endpoint);
             };
             ScheduledExecutors.nonPeriodicTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a40abb6/src/java/org/apache/cassandra/schema/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Schema.java b/src/java/org/apache/cassandra/schema/Schema.java
index 711724b..594b2ab 100644
--- a/src/java/org/apache/cassandra/schema/Schema.java
+++ b/src/java/org/apache/cassandra/schema/Schema.java
@@ -524,6 +524,22 @@ public final class Schema
     }
 
     /**
+     * Checks whether the given schema version is the same as the current local schema.
+     */
+    public boolean isSameVersion(UUID schemaVersion)
+    {
+        return schemaVersion != null && schemaVersion.equals(version);
+    }
+
+    /**
+     * Checks whether the current schema is empty.
+     */
+    public boolean isEmpty()
+    {
+        return SchemaConstants.emptyVersion.equals(version);
+    }
+
+    /**
      * Read schema from system keyspace and calculate MD5 digest of every row, resulting digest
      * will be converted into UUID which would act as content-based version of the schema.
      */
@@ -819,4 +835,18 @@ public final class Schema
     {
         changeListeners.forEach(l -> l.onDropAggregate(udf.name().keyspace, udf.name().name, udf.argTypes()));
     }
+
+
+    /**
+     * Converts the given schema version to a string. Returns {@code unknown}, if {@code version} is {@code null}
+     * or {@code "(empty)"}, if {@code version} refers to an {@link SchemaConstants#emptyVersion empty) schema.
+     */
+    public static String schemaVersionToString(UUID version)
+    {
+        return version == null
+               ? "unknown"
+               : SchemaConstants.emptyVersion.equals(version)
+                 ? "(empty)"
+                 : version.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a40abb6/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 37d163f..89d5358 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -796,9 +796,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         for (int i = 0; i < delay; i += 1000)
         {
             // if we see schema, we can proceed to the next check directly
-            if (!Schema.instance.getVersion().equals(SchemaConstants.emptyVersion))
+            if (!Schema.instance.isEmpty())
             {
-                logger.debug("got schema: {}", Schema.instance.getVersion());
+                logger.debug("current schema version: {}", Schema.instance.getVersion());
                 break;
             }
             Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[2/4] cassandra git commit: Prevent continuous schema exchange between 3.0 to 3.11 nodes

Posted by sn...@apache.org.
Prevent continuous schema exchange between 3.0 to 3.11 nodes

patch by Robert Stupp; reviewed by Andrés de la Peña for CASSANDRA-14109


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e646e503
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e646e503
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e646e503

Branch: refs/heads/trunk
Commit: e646e5032b68622f7ec1dd0c53137be08baabed9
Parents: f6381db
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed Dec 13 13:58:53 2017 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Dec 13 13:58:53 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 NEWS.txt                                        |  9 +++
 .../org/apache/cassandra/config/Schema.java     | 79 ++++++++++++++++++--
 src/java/org/apache/cassandra/db/Columns.java   |  7 ++
 .../cassandra/db/SchemaCheckVerbHandler.java    |  9 ++-
 .../apache/cassandra/db/rows/AbstractRow.java   | 10 ++-
 .../db/rows/RangeTombstoneBoundMarker.java      |  7 ++
 .../db/rows/RangeTombstoneBoundaryMarker.java   |  7 ++
 .../apache/cassandra/db/rows/RowIterators.java  | 21 +++++-
 .../apache/cassandra/db/rows/Unfiltered.java    | 12 +++
 .../org/apache/cassandra/gms/EndpointState.java | 23 ++++++
 src/java/org/apache/cassandra/gms/Gossiper.java | 65 +++++++++-------
 .../cassandra/hints/HintsDispatchTrigger.java   |  3 +-
 .../apache/cassandra/schema/SchemaKeyspace.java | 15 +++-
 .../cassandra/service/MigrationManager.java     | 58 +++++++++-----
 .../cassandra/service/StorageService.java       | 25 +++++++
 .../cassandra/utils/CassandraVersion.java       |  5 ++
 17 files changed, 300 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index eaf312f..60794f0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.11.2
+ * Prevent continuous schema exchange between 3.0 and 3.11 nodes (CASSANDRA-14109)
  * Fix imbalanced disks when replacing node with same address with JBOD (CASSANDRA-14084)
  * Reload compaction strategies when disk boundaries are invalidated (CASSANDRA-13948)
  * Remove OpenJDK log warning (CASSANDRA-13916)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 0c32278..f4b15e7 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -25,6 +25,15 @@ Upgrading
     - Cassandra is not logging anymore by default an Heap histogram on OutOfMemoryError. To enable that behavior
       set the 'cassandra.printHeapHistogramOnOutOfMemoryError' System property to 'true'. See CASSANDRA-13006
       for more details.
+    - Upgrades from 3.0 might have produced unnecessary schema migrations while
+      there was at least one 3.0 node in the cluster. It is therefore highly
+      recommended to upgrade from 3.0 to at least 3.11.2. The root cause of
+      this schema mismatch was a difference in the way how schema digests were computed
+      in 3.0 and 3.11.2. To mitigate this issue, 3.11.2 and newer announce
+      3.0 compatible digests as long as there is at least one 3.0 node in the
+      cluster. Once all nodes have been upgraded, the "real" schema version will be
+      announced. Note: this fix is only necessary in 3.11.2 and therefore only applies
+      to 3.11. (CASSANDRA-14109)
 
 Materialized Views
 -------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index 8fc83df..253a66b 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -22,6 +22,7 @@ import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,6 +34,7 @@ import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.locator.LocalStrategy;
@@ -40,7 +42,6 @@ import org.apache.cassandra.schema.*;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.utils.ConcurrentBiMap;
 import org.apache.cassandra.utils.Pair;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 public class Schema
 {
@@ -58,6 +59,7 @@ public class Schema
     private final ConcurrentBiMap<Pair<String, String>, UUID> cfIdMap = new ConcurrentBiMap<>();
 
     private volatile UUID version;
+    private volatile UUID altVersion;
 
     /**
      * Initialize empty schema object and load the hardcoded system tables
@@ -518,30 +520,82 @@ public class Schema
     /* Version control */
 
     /**
-     * @return current schema version
+     * The schema version to announce.
+     * This will be either the "real" schema version including the {@code cdc} column,
+     * if no node in the cluster is running at 3.0, or a 3.0 compatible
+     * schema version, with the {@code cdc} column excluded, if at least one node is
+     * running 3.0.
+     *
+     * @return "current" schema version
      */
     public UUID getVersion()
     {
+        return Gossiper.instance.isEnabled() && Gossiper.instance.isAnyNodeOn30()
+               ? altVersion
+               : version;
+    }
+
+    /**
+     * The 3.11 schema version, always includes the {@code cdc} column.
+     */
+    public UUID getRealVersion()
+    {
         return version;
     }
 
     /**
+     * The "alternative" schema version, compatible to 3.0, always excludes the
+     * {@code cdc} column.
+     */
+    public UUID getAltVersion()
+    {
+        return altVersion;
+    }
+
+    /**
+     * Checks whether the given schema version is the same as the current local schema
+     * version, either the 3.0 compatible or "real" one.
+     */
+    public boolean isSameVersion(UUID schemaVersion)
+    {
+        return schemaVersion != null
+               && (schemaVersion.equals(version) || schemaVersion.equals(altVersion));
+    }
+
+    /**
+     * Checks whether the current schema is empty.
+     */
+    public boolean isEmpty()
+    {
+        return SchemaConstants.emptyVersion.equals(version);
+    }
+
+    /**
      * Read schema from system keyspace and calculate MD5 digest of every row, resulting digest
      * will be converted into UUID which would act as content-based version of the schema.
+     *
+     * 3.11 note: we calculate the "real" schema version and the 3.0 compatible schema
+     * version here.
      */
     public void updateVersion()
     {
-        version = SchemaKeyspace.calculateSchemaDigest();
-        SystemKeyspace.updateSchemaVersion(version);
+        Pair<UUID, UUID> mixedVersions = SchemaKeyspace.calculateSchemaDigest();
+        version = mixedVersions.left;
+        altVersion = mixedVersions.right;
+        SystemKeyspace.updateSchemaVersion(getVersion());
     }
 
-    /*
+    /**
      * Like updateVersion, but also announces via gossip
+     *
+     * 3.11 note: we announce the "current" schema version, which can be either the 3.0
+     * compatible one, if at least one node is still running 3.0, or the "real" schema version.
      */
     public void updateVersionAndAnnounce()
     {
         updateVersion();
-        MigrationManager.passiveAnnounce(version);
+        UUID current = getVersion();
+        MigrationManager.passiveAnnounce(current, current == getAltVersion());
     }
 
     /**
@@ -785,4 +839,17 @@ public class Schema
 
         return transformed;
     }
+
+    /**
+     * Converts the given schema version to a string. Returns {@code unknown}, if {@code version} is {@code null}
+     * or {@code "(empty)"}, if {@code version} refers to an {@link SchemaConstants#emptyVersion empty) schema.
+     */
+    public static String schemaVersionToString(UUID version)
+    {
+        return version == null
+               ? "unknown"
+               : SchemaConstants.emptyVersion.equals(version)
+                 ? "(empty)"
+                 : version.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/Columns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java
index 18729de..965e401 100644
--- a/src/java/org/apache/cassandra/db/Columns.java
+++ b/src/java/org/apache/cassandra/db/Columns.java
@@ -367,6 +367,13 @@ public class Columns extends AbstractCollection<ColumnDefinition> implements Col
             digest.update(c.name.bytes.duplicate());
     }
 
+    public void digest(MessageDigest digest, Set<ByteBuffer> columnsToExclude)
+    {
+        for (ColumnDefinition c : this)
+            if (!columnsToExclude.contains(c.name.bytes))
+                digest.update(c.name.bytes.duplicate());
+    }
+
     /**
      * Apply a function to each column definition in forwards or reversed order.
      * @param function

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java b/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
index 4270a24..be501de 100644
--- a/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
@@ -36,7 +36,14 @@ public class SchemaCheckVerbHandler implements IVerbHandler
     public void doVerb(MessageIn message, int id)
     {
         logger.trace("Received schema check request.");
-        MessageOut<UUID> response = new MessageOut<UUID>(MessagingService.Verb.INTERNAL_RESPONSE, Schema.instance.getVersion(), UUIDSerializer.serializer);
+
+        /*
+        3.11 is special here: We return the 3.0 compatible version, if the requesting node
+        is running 3.0. Otherwise the "real" schema version.
+        */
+        MessageOut<UUID> response = new MessageOut<>(MessagingService.Verb.INTERNAL_RESPONSE,
+                                                     Schema.instance.getVersion(),
+                                                     UUIDSerializer.serializer);
         MessagingService.instance().sendReply(response, id, message.from);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/rows/AbstractRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRow.java b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
index 847cb47..13e6502 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractRow.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
@@ -19,7 +19,9 @@ package org.apache.cassandra.db.rows;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.util.AbstractCollection;
+import java.util.Collections;
 import java.util.Objects;
+import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -63,6 +65,11 @@ public abstract class AbstractRow extends AbstractCollection<ColumnData> impleme
 
     public void digest(MessageDigest digest)
     {
+        digest(digest, Collections.emptySet());
+    }
+
+    public void digest(MessageDigest digest, Set<ByteBuffer> columnsToExclude)
+    {
         FBUtilities.updateWithByte(digest, kind().ordinal());
         clustering().digest(digest);
 
@@ -70,7 +77,8 @@ public abstract class AbstractRow extends AbstractCollection<ColumnData> impleme
         primaryKeyLivenessInfo().digest(digest);
 
         for (ColumnData cd : this)
-            cd.digest(digest);
+            if (!columnsToExclude.contains(cd.column.name.bytes))
+                cd.digest(digest);
     }
 
     public void validateData(CFMetaData metadata)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
index a82bb64..fb94da3 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.rows;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.util.Objects;
+import java.util.Set;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
@@ -132,6 +133,12 @@ public class RangeTombstoneBoundMarker extends AbstractRangeTombstoneMarker<Clus
         deletion.digest(digest);
     }
 
+    @Override
+    public void digest(MessageDigest digest, Set<ByteBuffer> columnsToExclude)
+    {
+        digest(digest);
+    }
+
     public String toString(CFMetaData metadata)
     {
         return "Marker " + bound.toString(metadata) + '@' + deletion.markedForDeleteAt();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
index 70d6a9d..9190ecf 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.rows;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.util.Objects;
+import java.util.Set;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
@@ -151,6 +152,12 @@ public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker<C
         startDeletion.digest(digest);
     }
 
+    @Override
+    public void digest(MessageDigest digest, Set<ByteBuffer> columnsToExclude)
+    {
+        digest(digest);
+    }
+
     public String toString(CFMetaData metadata)
     {
         return String.format("Marker %s@%d-%d", bound.toString(metadata), endDeletion.markedForDeleteAt(), startDeletion.markedForDeleteAt());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/rows/RowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowIterators.java b/src/java/org/apache/cassandra/db/rows/RowIterators.java
index bce6a7d..1463bf5 100644
--- a/src/java/org/apache/cassandra/db/rows/RowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/RowIterators.java
@@ -17,7 +17,9 @@
  */
 package org.apache.cassandra.db.rows;
 
+import java.nio.ByteBuffer;
 import java.security.MessageDigest;
+import java.util.Set;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,7 +38,7 @@ public abstract class RowIterators
 
     private RowIterators() {}
 
-    public static void digest(RowIterator iterator, MessageDigest digest)
+    public static void digest(RowIterator iterator, MessageDigest digest, MessageDigest altDigest, Set<ByteBuffer> columnsToExclude)
     {
         // TODO: we're not computing digest the same way that old nodes. This is
         // currently ok as this is only used for schema digest and the is no exchange
@@ -48,8 +50,23 @@ public abstract class RowIterators
         FBUtilities.updateWithBoolean(digest, iterator.isReverseOrder());
         iterator.staticRow().digest(digest);
 
+        if (altDigest != null)
+        {
+            // Compute the "alternative digest" here.
+            altDigest.update(iterator.partitionKey().getKey().duplicate());
+            iterator.columns().regulars.digest(altDigest, columnsToExclude);
+            iterator.columns().statics.digest(altDigest, columnsToExclude);
+            FBUtilities.updateWithBoolean(altDigest, iterator.isReverseOrder());
+            iterator.staticRow().digest(altDigest, columnsToExclude);
+        }
+
         while (iterator.hasNext())
-            iterator.next().digest(digest);
+        {
+            Row row = iterator.next();
+            row.digest(digest);
+            if (altDigest != null)
+                row.digest(altDigest, columnsToExclude);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/rows/Unfiltered.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Unfiltered.java b/src/java/org/apache/cassandra/db/rows/Unfiltered.java
index e75c632..3d8a9b1 100644
--- a/src/java/org/apache/cassandra/db/rows/Unfiltered.java
+++ b/src/java/org/apache/cassandra/db/rows/Unfiltered.java
@@ -17,7 +17,9 @@
  */
 package org.apache.cassandra.db.rows;
 
+import java.nio.ByteBuffer;
 import java.security.MessageDigest;
+import java.util.Set;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.Clusterable;
@@ -46,6 +48,16 @@ public interface Unfiltered extends Clusterable
     public void digest(MessageDigest digest);
 
     /**
+     * Digest the atom using the provided {@code MessageDigest}.
+     * This method only exists in 3.11.
+     * Same like {@link #digest(MessageDigest)}, but excludes the given columns from digest calculation.
+     */
+    public default void digest(MessageDigest digest, Set<ByteBuffer> columnsToExclude)
+    {
+        throw new UnsupportedOperationException("no no no - don't use this one - use digest(MessageDigest) instead");
+    }
+
+    /**
      * Validate the data of this atom.
      *
      * @param metadata the metadata for the table this atom is part of.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/gms/EndpointState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java
index 70f2a68..674b597 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -22,14 +22,19 @@ import java.util.Collections;
 import java.util.EnumMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 
+import javax.annotation.Nullable;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.CassandraVersion;
+
 /**
  * This abstraction represents both the HeartBeatState and the ApplicationState in an EndpointState
  * instance. Any state for a given endpoint can be retrieved from this instance.
@@ -154,6 +159,24 @@ public class EndpointState
         return pieces[0];
     }
 
+    @Nullable
+    public UUID getSchemaVersion()
+    {
+        VersionedValue applicationState = getApplicationState(ApplicationState.SCHEMA);
+        return applicationState != null
+               ? UUID.fromString(applicationState.value)
+               : null;
+    }
+
+    @Nullable
+    public CassandraVersion getReleaseVersion()
+    {
+        VersionedValue applicationState = getApplicationState(ApplicationState.RELEASE_VERSION);
+        return applicationState != null
+               ? new CassandraVersion(applicationState.value)
+               : null;
+    }
+
     public String toString()
     {
         return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " + applicationState.get();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 14601d7..2dac5c2 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -24,7 +24,7 @@ import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.*;
 import java.util.concurrent.locks.ReentrantLock;
-
+import javax.annotation.Nullable;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
@@ -32,9 +32,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
-
-import org.apache.cassandra.utils.CassandraVersion;
-import org.apache.cassandra.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,14 +40,17 @@ import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.CassandraVersion;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.Pair;
 
 /**
  * This module is responsible for Gossiping information for the local endpoint. This abstraction
@@ -75,6 +75,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     static final List<String> DEAD_STATES = Arrays.asList(VersionedValue.REMOVING_TOKEN, VersionedValue.REMOVED_TOKEN,
                                                           VersionedValue.STATUS_LEFT, VersionedValue.HIBERNATE);
     static ArrayList<String> SILENT_SHUTDOWN_STATES = new ArrayList<>();
+
     static
     {
         SILENT_SHUTDOWN_STATES.addAll(DEAD_STATES);
@@ -130,6 +131,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
 
     private final Map<InetAddress, Long> expireTimeEndpointMap = new ConcurrentHashMap<InetAddress, Long>();
 
+    private volatile boolean anyNodeOn30 = false; // we assume the regular case here - all nodes are on 3.11
     private volatile boolean inShadowRound = false;
     // seeds gathered during shadow round that indicated to be in the shadow round phase as well
     private final Set<InetAddress> seedsInShadowRound = new ConcurrentSkipListSet<>(inetcomparator);
@@ -852,20 +854,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         return endpointStateMap.get(ep);
     }
 
-    public boolean valuesEqual(InetAddress ep1, InetAddress ep2, ApplicationState as)
-    {
-        EndpointState state1 = getEndpointStateForEndpoint(ep1);
-        EndpointState state2 = getEndpointStateForEndpoint(ep2);
-
-        if (state1 == null || state2 == null)
-            return false;
-
-        VersionedValue value1 = state1.getApplicationState(as);
-        VersionedValue value2 = state2.getApplicationState(as);
-
-        return !(value1 == null || value2 == null) && value1.value.equals(value2.value);
-    }
-
     public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
     {
         return endpointStateMap.entrySet();
@@ -1198,6 +1186,26 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
                 handleMajorStateChange(ep, remoteState);
             }
         }
+
+        boolean any30 = anyEndpointOn30();
+        if (any30 != anyNodeOn30)
+        {
+            logger.info(any30
+                        ? "There is at least one 3.0 node in the cluster - will store and announce compatible schema version"
+                        : "There are no 3.0 nodes in the cluster - will store and announce real schema version");
+
+            anyNodeOn30 = any30;
+            executor.submit(Schema.instance::updateVersionAndAnnounce);
+        }
+    }
+
+    private boolean anyEndpointOn30()
+    {
+        return endpointStateMap.values()
+                               .stream()
+                               .map(EndpointState::getReleaseVersion)
+                               .filter(Objects::nonNull)
+                               .anyMatch(CassandraVersion::is30);
     }
 
     private void applyNewStates(InetAddress addr, EndpointState localState, EndpointState remoteState)
@@ -1547,6 +1555,11 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled());
     }
 
+    public boolean isAnyNodeOn30()
+    {
+        return anyNodeOn30;
+    }
+
     protected void maybeFinishShadowRound(InetAddress respondent, boolean isInShadowRound, Map<InetAddress, EndpointState> epStateMap)
     {
         if (inShadowRound)
@@ -1629,16 +1642,18 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         return System.currentTimeMillis() + Gossiper.aVeryLongTime;
     }
 
+    @Nullable
     public CassandraVersion getReleaseVersion(InetAddress ep)
     {
         EndpointState state = getEndpointStateForEndpoint(ep);
-        if (state != null)
-        {
-            VersionedValue applicationState = state.getApplicationState(ApplicationState.RELEASE_VERSION);
-            if (applicationState != null)
-                return new CassandraVersion(applicationState.value);
-        }
-        return null;
+        return state != null ? state.getReleaseVersion() : null;
+    }
+
+    @Nullable
+    public UUID getSchemaVersion(InetAddress ep)
+    {
+        EndpointState state = getEndpointStateForEndpoint(ep);
+        return state != null ? state.getSchemaVersion() : null;
     }
 
     public static void waitToSettle()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java b/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
index 47d986f..cc1c221 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.hints;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.Gossiper;
 
@@ -64,7 +65,7 @@ final class HintsDispatchTrigger implements Runnable
                .filter(store -> !isScheduled(store))
                .filter(HintsStore::isLive)
                .filter(store -> store.isWriting() || store.hasFiles())
-               .filter(store -> Gossiper.instance.valuesEqual(getBroadcastAddress(), store.address(), ApplicationState.SCHEMA))
+               .filter(store -> Schema.instance.isSameVersion(Gossiper.instance.getSchemaVersion(store.address())))
                .forEach(this::schedule);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 7834b12..b6add96 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -311,18 +311,24 @@ public final class SchemaKeyspace
     /**
      * Read schema from system keyspace and calculate MD5 digest of every row, resulting digest
      * will be converted into UUID which would act as content-based version of the schema.
+     *
+     * This implementation is special cased for 3.11 as it returns the schema digests for 3.11
+     * <em>and</em> 3.0 - i.e. with and without the beloved {@code cdc} column.
      */
-    public static UUID calculateSchemaDigest()
+    public static Pair<UUID, UUID> calculateSchemaDigest()
     {
         MessageDigest digest;
+        MessageDigest digest30;
         try
         {
             digest = MessageDigest.getInstance("MD5");
+            digest30 = MessageDigest.getInstance("MD5");
         }
         catch (NoSuchAlgorithmException e)
         {
             throw new RuntimeException(e);
         }
+        Set<ByteBuffer> cdc = Collections.singleton(ByteBufferUtil.bytes("cdc"));
 
         for (String table : ALL_FOR_DIGEST)
         {
@@ -340,12 +346,15 @@ public final class SchemaKeyspace
                     try (RowIterator partition = schema.next())
                     {
                         if (!isSystemKeyspaceSchemaPartition(partition.partitionKey()))
-                            RowIterators.digest(partition, digest);
+                        {
+                            RowIterators.digest(partition, digest, digest30, cdc);
+                        }
                     }
                 }
             }
         }
-        return UUID.nameUUIDFromBytes(digest.digest());
+
+        return Pair.create(UUID.nameUUIDFromBytes(digest.digest()), UUID.nameUUIDFromBytes(digest30.digest()));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 3332d2c..a1b3597 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -31,7 +31,6 @@ import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.config.SchemaConstants;
 import org.apache.cassandra.config.ViewDefinition;
 import org.apache.cassandra.cql3.functions.UDAggregate;
 import org.apache.cassandra.cql3.functions.UDFunction;
@@ -78,10 +77,9 @@ public class MigrationManager
 
     public static void scheduleSchemaPull(InetAddress endpoint, EndpointState state)
     {
-        VersionedValue value = state.getApplicationState(ApplicationState.SCHEMA);
-
-        if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && value != null)
-            maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint);
+        UUID schemaVersion = state.getSchemaVersion();
+        if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && schemaVersion != null)
+            maybeScheduleSchemaPull(schemaVersion, endpoint);
     }
 
     /**
@@ -90,16 +88,37 @@ public class MigrationManager
      */
     private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint)
     {
-        if ((Schema.instance.getVersion() != null && Schema.instance.getVersion().equals(theirVersion)) || !shouldPullSchemaFrom(endpoint))
+        if (Schema.instance.getVersion() == null)
+        {
+            logger.debug("Not pulling schema from {}, because local schama version is not known yet",
+                         endpoint);
+            return;
+        }
+        if (Schema.instance.isSameVersion(theirVersion))
+        {
+            logger.debug("Not pulling schema from {}, because schema versions match: " +
+                         "local/real={}, local/compatible={}, remote={}",
+                         endpoint,
+                         Schema.schemaVersionToString(Schema.instance.getRealVersion()),
+                         Schema.schemaVersionToString(Schema.instance.getAltVersion()),
+                         Schema.schemaVersionToString(theirVersion));
+            return;
+        }
+        if (!shouldPullSchemaFrom(endpoint))
         {
             logger.debug("Not pulling schema because versions match or shouldPullSchemaFrom returned false");
             return;
         }
 
-        if (SchemaConstants.emptyVersion.equals(Schema.instance.getVersion()) || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
+        if (Schema.instance.isEmpty() || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
         {
             // If we think we may be bootstrapping or have recently started, submit MigrationTask immediately
-            logger.debug("Submitting migration task for {}", endpoint);
+            logger.debug("Immediately submitting migration task for {}, " +
+                         "schema versions: local/real={}, local/compatible={}, remote={}",
+                         endpoint,
+                         Schema.schemaVersionToString(Schema.instance.getRealVersion()),
+                         Schema.schemaVersionToString(Schema.instance.getAltVersion()),
+                         Schema.schemaVersionToString(theirVersion));
             submitMigrationTask(endpoint);
         }
         else
@@ -109,20 +128,22 @@ public class MigrationManager
             Runnable runnable = () ->
             {
                 // grab the latest version of the schema since it may have changed again since the initial scheduling
-                EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
-                if (epState == null)
+                UUID epSchemaVersion = Gossiper.instance.getSchemaVersion(endpoint);
+                if (epSchemaVersion == null)
                 {
                     logger.debug("epState vanished for {}, not submitting migration task", endpoint);
                     return;
                 }
-                VersionedValue value = epState.getApplicationState(ApplicationState.SCHEMA);
-                UUID currentVersion = UUID.fromString(value.value);
-                if (Schema.instance.getVersion().equals(currentVersion))
+                if (Schema.instance.isSameVersion(epSchemaVersion))
                 {
-                    logger.debug("not submitting migration task for {} because our versions match", endpoint);
+                    logger.debug("Not submitting migration task for {} because our versions match ({})", endpoint, epSchemaVersion);
                     return;
                 }
-                logger.debug("submitting migration task for {}", endpoint);
+                logger.debug("submitting migration task for {}, schema version mismatch: local/real={}, local/compatible={}, remote={}",
+                             endpoint,
+                             Schema.schemaVersionToString(Schema.instance.getRealVersion()),
+                             Schema.schemaVersionToString(Schema.instance.getAltVersion()),
+                             Schema.schemaVersionToString(epSchemaVersion));
                 submitMigrationTask(endpoint);
             };
             ScheduledExecutors.nonPeriodicTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
@@ -585,11 +606,14 @@ public class MigrationManager
      * Used to notify nodes as they arrive in the cluster.
      *
      * @param version The schema version to announce
+     * @param compatible flag whether {@code version} is a 3.0 compatible version
      */
-    public static void passiveAnnounce(UUID version)
+    public static void passiveAnnounce(UUID version, boolean compatible)
     {
         Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.schema(version));
-        logger.debug("Gossiping my schema version {}", version);
+        logger.debug("Gossiping my {} schema version {}",
+                     compatible ? "3.0 compatible" : "3.11",
+                     Schema.schemaVersionToString(version));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 15027b2..c5e2912 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -831,6 +831,31 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
     }
 
+    public void waitForSchema(int delay)
+    {
+        logger.debug("Waiting for schema (max {} seconds)", delay);
+        // first sleep the delay to make sure we see all our peers
+        for (int i = 0; i < delay; i += 1000)
+        {
+            // if we see schema, we can proceed to the next check directly
+            if (!Schema.instance.isEmpty())
+            {
+                logger.debug("current schema version: {} (3.0 compatible: {})", Schema.instance.getRealVersion(), Schema.instance.getAltVersion());
+                break;
+            }
+            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+        }
+        // if our schema hasn't matched yet, wait until it has
+        // we do this by waiting for all in-flight migration requests and responses to complete
+        // (post CASSANDRA-1391 we don't expect this to be necessary very often, but it doesn't hurt to be careful)
+        if (!MigrationManager.isReadyForBootstrap())
+        {
+            setMode(Mode.JOINING, "waiting for schema information to complete", true);
+            MigrationManager.waitUntilReadyForBootstrap();
+        }
+        logger.info("Has schema with version {}", Schema.instance.getVersion());
+    }
+
     private void joinTokenRing(int delay) throws ConfigurationException
     {
         joined = true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/utils/CassandraVersion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CassandraVersion.java b/src/java/org/apache/cassandra/utils/CassandraVersion.java
index aed0fe7..bf9fe6a 100644
--- a/src/java/org/apache/cassandra/utils/CassandraVersion.java
+++ b/src/java/org/apache/cassandra/utils/CassandraVersion.java
@@ -118,6 +118,11 @@ public class CassandraVersion implements Comparable<CassandraVersion>
         return compareIdentifiers(build, other.build, -1);
     }
 
+    public boolean is30()
+    {
+        return major == 3 && minor == 0;
+    }
+
     /**
      * Returns a version that is backward compatible with this version amongst a list
      * of provided version, or null if none can be found.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[3/4] cassandra git commit: Merge branch 'cassandra-3.11' into trunk

Posted by sn...@apache.org.
Merge branch 'cassandra-3.11' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3e4e7d95
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3e4e7d95
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3e4e7d95

Branch: refs/heads/trunk
Commit: 3e4e7d9546d69a0e1c807b3e2d585d3a659500bc
Parents: 0d8199b e646e50
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed Dec 13 14:00:11 2017 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Dec 13 14:00:11 2017 +0100

----------------------------------------------------------------------

----------------------------------------------------------------------



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org