You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2017/12/13 13:03:36 UTC
[1/4] cassandra git commit: Prevent continuous schema exchange
between 3.0 to 3.11 nodes
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.11 f6381db8e -> e646e5032
refs/heads/trunk 0d8199bab -> 7a40abb6a
Prevent continuous schema exchange between 3.0 to 3.11 nodes
patch by Robert Stupp; reviewed by Andrés de la Peña for CASSANDRA-14109
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e646e503
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e646e503
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e646e503
Branch: refs/heads/cassandra-3.11
Commit: e646e5032b68622f7ec1dd0c53137be08baabed9
Parents: f6381db
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed Dec 13 13:58:53 2017 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Dec 13 13:58:53 2017 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 9 +++
.../org/apache/cassandra/config/Schema.java | 79 ++++++++++++++++++--
src/java/org/apache/cassandra/db/Columns.java | 7 ++
.../cassandra/db/SchemaCheckVerbHandler.java | 9 ++-
.../apache/cassandra/db/rows/AbstractRow.java | 10 ++-
.../db/rows/RangeTombstoneBoundMarker.java | 7 ++
.../db/rows/RangeTombstoneBoundaryMarker.java | 7 ++
.../apache/cassandra/db/rows/RowIterators.java | 21 +++++-
.../apache/cassandra/db/rows/Unfiltered.java | 12 +++
.../org/apache/cassandra/gms/EndpointState.java | 23 ++++++
src/java/org/apache/cassandra/gms/Gossiper.java | 65 +++++++++-------
.../cassandra/hints/HintsDispatchTrigger.java | 3 +-
.../apache/cassandra/schema/SchemaKeyspace.java | 15 +++-
.../cassandra/service/MigrationManager.java | 58 +++++++++-----
.../cassandra/service/StorageService.java | 25 +++++++
.../cassandra/utils/CassandraVersion.java | 5 ++
17 files changed, 300 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index eaf312f..60794f0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.11.2
+ * Prevent continuous schema exchange between 3.0 and 3.11 nodes (CASSANDRA-14109)
* Fix imbalanced disks when replacing node with same address with JBOD (CASSANDRA-14084)
* Reload compaction strategies when disk boundaries are invalidated (CASSANDRA-13948)
* Remove OpenJDK log warning (CASSANDRA-13916)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 0c32278..f4b15e7 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -25,6 +25,15 @@ Upgrading
- Cassandra is not logging anymore by default an Heap histogram on OutOfMemoryError. To enable that behavior
set the 'cassandra.printHeapHistogramOnOutOfMemoryError' System property to 'true'. See CASSANDRA-13006
for more details.
+ - Upgrades from 3.0 might have produced unnecessary schema migrations while
+ there was at least one 3.0 node in the cluster. It is therefore highly
+ recommended to upgrade from 3.0 to at least 3.11.2. The root cause of
+ this schema mismatch was a difference in the way how schema digests were computed
+ in 3.0 and 3.11.2. To mitigate this issue, 3.11.2 and newer announce
+ 3.0 compatible digests as long as there is at least one 3.0 node in the
+ cluster. Once all nodes have been upgraded, the "real" schema version will be
+ announced. Note: this fix is only necessary in 3.11.2 and therefore only applies
+ to 3.11. (CASSANDRA-14109)
Materialized Views
-------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index 8fc83df..253a66b 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -22,6 +22,7 @@ import java.util.stream.Collectors;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +34,7 @@ import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.locator.LocalStrategy;
@@ -40,7 +42,6 @@ import org.apache.cassandra.schema.*;
import org.apache.cassandra.service.MigrationManager;
import org.apache.cassandra.utils.ConcurrentBiMap;
import org.apache.cassandra.utils.Pair;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
public class Schema
{
@@ -58,6 +59,7 @@ public class Schema
private final ConcurrentBiMap<Pair<String, String>, UUID> cfIdMap = new ConcurrentBiMap<>();
private volatile UUID version;
+ private volatile UUID altVersion;
/**
* Initialize empty schema object and load the hardcoded system tables
@@ -518,30 +520,82 @@ public class Schema
/* Version control */
/**
- * @return current schema version
+ * The schema version to announce.
+ * This will be either the "real" schema version including the {@code cdc} column,
+ * if no node in the cluster is running at 3.0, or a 3.0 compatible
+ * schema version, with the {@code cdc} column excluded, if at least one node is
+ * running 3.0.
+ *
+ * @return "current" schema version
*/
public UUID getVersion()
{
+ return Gossiper.instance.isEnabled() && Gossiper.instance.isAnyNodeOn30()
+ ? altVersion
+ : version;
+ }
+
+ /**
+ * The 3.11 schema version, always includes the {@code cdc} column.
+ */
+ public UUID getRealVersion()
+ {
return version;
}
/**
+ * The "alternative" schema version, compatible to 3.0, always excludes the
+ * {@code cdc} column.
+ */
+ public UUID getAltVersion()
+ {
+ return altVersion;
+ }
+
+ /**
+ * Checks whether the given schema version is the same as the current local schema
+ * version, either the 3.0 compatible or "real" one.
+ */
+ public boolean isSameVersion(UUID schemaVersion)
+ {
+ return schemaVersion != null
+ && (schemaVersion.equals(version) || schemaVersion.equals(altVersion));
+ }
+
+ /**
+ * Checks whether the current schema is empty.
+ */
+ public boolean isEmpty()
+ {
+ return SchemaConstants.emptyVersion.equals(version);
+ }
+
+ /**
* Read schema from system keyspace and calculate MD5 digest of every row, resulting digest
* will be converted into UUID which would act as content-based version of the schema.
+ *
+ * 3.11 note: we calculate the "real" schema version and the 3.0 compatible schema
+ * version here.
*/
public void updateVersion()
{
- version = SchemaKeyspace.calculateSchemaDigest();
- SystemKeyspace.updateSchemaVersion(version);
+ Pair<UUID, UUID> mixedVersions = SchemaKeyspace.calculateSchemaDigest();
+ version = mixedVersions.left;
+ altVersion = mixedVersions.right;
+ SystemKeyspace.updateSchemaVersion(getVersion());
}
- /*
+ /**
* Like updateVersion, but also announces via gossip
+ *
+ * 3.11 note: we announce the "current" schema version, which can be either the 3.0
+ * compatible one, if at least one node is still running 3.0, or the "real" schema version.
*/
public void updateVersionAndAnnounce()
{
updateVersion();
- MigrationManager.passiveAnnounce(version);
+ UUID current = getVersion();
+ MigrationManager.passiveAnnounce(current, current == getAltVersion());
}
/**
@@ -785,4 +839,17 @@ public class Schema
return transformed;
}
+
+ /**
+ * Converts the given schema version to a string. Returns {@code unknown}, if {@code version} is {@code null}
+ * or {@code "(empty)"}, if {@code version} refers to an {@link SchemaConstants#emptyVersion empty) schema.
+ */
+ public static String schemaVersionToString(UUID version)
+ {
+ return version == null
+ ? "unknown"
+ : SchemaConstants.emptyVersion.equals(version)
+ ? "(empty)"
+ : version.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/Columns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java
index 18729de..965e401 100644
--- a/src/java/org/apache/cassandra/db/Columns.java
+++ b/src/java/org/apache/cassandra/db/Columns.java
@@ -367,6 +367,13 @@ public class Columns extends AbstractCollection<ColumnDefinition> implements Col
digest.update(c.name.bytes.duplicate());
}
+ public void digest(MessageDigest digest, Set<ByteBuffer> columnsToExclude)
+ {
+ for (ColumnDefinition c : this)
+ if (!columnsToExclude.contains(c.name.bytes))
+ digest.update(c.name.bytes.duplicate());
+ }
+
/**
* Apply a function to each column definition in forwards or reversed order.
* @param function
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java b/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
index 4270a24..be501de 100644
--- a/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
@@ -36,7 +36,14 @@ public class SchemaCheckVerbHandler implements IVerbHandler
public void doVerb(MessageIn message, int id)
{
logger.trace("Received schema check request.");
- MessageOut<UUID> response = new MessageOut<UUID>(MessagingService.Verb.INTERNAL_RESPONSE, Schema.instance.getVersion(), UUIDSerializer.serializer);
+
+ /*
+ 3.11 is special here: We return the 3.0 compatible version, if the requesting node
+ is running 3.0. Otherwise the "real" schema version.
+ */
+ MessageOut<UUID> response = new MessageOut<>(MessagingService.Verb.INTERNAL_RESPONSE,
+ Schema.instance.getVersion(),
+ UUIDSerializer.serializer);
MessagingService.instance().sendReply(response, id, message.from);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/rows/AbstractRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRow.java b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
index 847cb47..13e6502 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractRow.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
@@ -19,7 +19,9 @@ package org.apache.cassandra.db.rows;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.util.AbstractCollection;
+import java.util.Collections;
import java.util.Objects;
+import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -63,6 +65,11 @@ public abstract class AbstractRow extends AbstractCollection<ColumnData> impleme
public void digest(MessageDigest digest)
{
+ digest(digest, Collections.emptySet());
+ }
+
+ public void digest(MessageDigest digest, Set<ByteBuffer> columnsToExclude)
+ {
FBUtilities.updateWithByte(digest, kind().ordinal());
clustering().digest(digest);
@@ -70,7 +77,8 @@ public abstract class AbstractRow extends AbstractCollection<ColumnData> impleme
primaryKeyLivenessInfo().digest(digest);
for (ColumnData cd : this)
- cd.digest(digest);
+ if (!columnsToExclude.contains(cd.column.name.bytes))
+ cd.digest(digest);
}
public void validateData(CFMetaData metadata)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
index a82bb64..fb94da3 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.rows;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.util.Objects;
+import java.util.Set;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
@@ -132,6 +133,12 @@ public class RangeTombstoneBoundMarker extends AbstractRangeTombstoneMarker<Clus
deletion.digest(digest);
}
+ @Override
+ public void digest(MessageDigest digest, Set<ByteBuffer> columnsToExclude)
+ {
+ digest(digest);
+ }
+
public String toString(CFMetaData metadata)
{
return "Marker " + bound.toString(metadata) + '@' + deletion.markedForDeleteAt();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
index 70d6a9d..9190ecf 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.rows;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.util.Objects;
+import java.util.Set;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
@@ -151,6 +152,12 @@ public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker<C
startDeletion.digest(digest);
}
+ @Override
+ public void digest(MessageDigest digest, Set<ByteBuffer> columnsToExclude)
+ {
+ digest(digest);
+ }
+
public String toString(CFMetaData metadata)
{
return String.format("Marker %s@%d-%d", bound.toString(metadata), endDeletion.markedForDeleteAt(), startDeletion.markedForDeleteAt());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/rows/RowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowIterators.java b/src/java/org/apache/cassandra/db/rows/RowIterators.java
index bce6a7d..1463bf5 100644
--- a/src/java/org/apache/cassandra/db/rows/RowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/RowIterators.java
@@ -17,7 +17,9 @@
*/
package org.apache.cassandra.db.rows;
+import java.nio.ByteBuffer;
import java.security.MessageDigest;
+import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +38,7 @@ public abstract class RowIterators
private RowIterators() {}
- public static void digest(RowIterator iterator, MessageDigest digest)
+ public static void digest(RowIterator iterator, MessageDigest digest, MessageDigest altDigest, Set<ByteBuffer> columnsToExclude)
{
// TODO: we're not computing digest the same way that old nodes. This is
// currently ok as this is only used for schema digest and the is no exchange
@@ -48,8 +50,23 @@ public abstract class RowIterators
FBUtilities.updateWithBoolean(digest, iterator.isReverseOrder());
iterator.staticRow().digest(digest);
+ if (altDigest != null)
+ {
+ // Compute the "alternative digest" here.
+ altDigest.update(iterator.partitionKey().getKey().duplicate());
+ iterator.columns().regulars.digest(altDigest, columnsToExclude);
+ iterator.columns().statics.digest(altDigest, columnsToExclude);
+ FBUtilities.updateWithBoolean(altDigest, iterator.isReverseOrder());
+ iterator.staticRow().digest(altDigest, columnsToExclude);
+ }
+
while (iterator.hasNext())
- iterator.next().digest(digest);
+ {
+ Row row = iterator.next();
+ row.digest(digest);
+ if (altDigest != null)
+ row.digest(altDigest, columnsToExclude);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/rows/Unfiltered.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Unfiltered.java b/src/java/org/apache/cassandra/db/rows/Unfiltered.java
index e75c632..3d8a9b1 100644
--- a/src/java/org/apache/cassandra/db/rows/Unfiltered.java
+++ b/src/java/org/apache/cassandra/db/rows/Unfiltered.java
@@ -17,7 +17,9 @@
*/
package org.apache.cassandra.db.rows;
+import java.nio.ByteBuffer;
import java.security.MessageDigest;
+import java.util.Set;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.Clusterable;
@@ -46,6 +48,16 @@ public interface Unfiltered extends Clusterable
public void digest(MessageDigest digest);
/**
+ * Digest the atom using the provided {@code MessageDigest}.
+ * This method only exists in 3.11.
+ * Same like {@link #digest(MessageDigest)}, but excludes the given columns from digest calculation.
+ */
+ public default void digest(MessageDigest digest, Set<ByteBuffer> columnsToExclude)
+ {
+ throw new UnsupportedOperationException("no no no - don't use this one - use digest(MessageDigest) instead");
+ }
+
+ /**
* Validate the data of this atom.
*
* @param metadata the metadata for the table this atom is part of.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/gms/EndpointState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java
index 70f2a68..674b597 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -22,14 +22,19 @@ import java.util.Collections;
import java.util.EnumMap;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.CassandraVersion;
+
/**
* This abstraction represents both the HeartBeatState and the ApplicationState in an EndpointState
* instance. Any state for a given endpoint can be retrieved from this instance.
@@ -154,6 +159,24 @@ public class EndpointState
return pieces[0];
}
+ @Nullable
+ public UUID getSchemaVersion()
+ {
+ VersionedValue applicationState = getApplicationState(ApplicationState.SCHEMA);
+ return applicationState != null
+ ? UUID.fromString(applicationState.value)
+ : null;
+ }
+
+ @Nullable
+ public CassandraVersion getReleaseVersion()
+ {
+ VersionedValue applicationState = getApplicationState(ApplicationState.RELEASE_VERSION);
+ return applicationState != null
+ ? new CassandraVersion(applicationState.value)
+ : null;
+ }
+
public String toString()
{
return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " + applicationState.get();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 14601d7..2dac5c2 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -24,7 +24,7 @@ import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
-
+import javax.annotation.Nullable;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -32,9 +32,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
-
-import org.apache.cassandra.utils.CassandraVersion;
-import org.apache.cassandra.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,14 +40,17 @@ import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.Pair;
/**
* This module is responsible for Gossiping information for the local endpoint. This abstraction
@@ -75,6 +75,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
static final List<String> DEAD_STATES = Arrays.asList(VersionedValue.REMOVING_TOKEN, VersionedValue.REMOVED_TOKEN,
VersionedValue.STATUS_LEFT, VersionedValue.HIBERNATE);
static ArrayList<String> SILENT_SHUTDOWN_STATES = new ArrayList<>();
+
static
{
SILENT_SHUTDOWN_STATES.addAll(DEAD_STATES);
@@ -130,6 +131,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
private final Map<InetAddress, Long> expireTimeEndpointMap = new ConcurrentHashMap<InetAddress, Long>();
+ private volatile boolean anyNodeOn30 = false; // we assume the regular case here - all nodes are on 3.11
private volatile boolean inShadowRound = false;
// seeds gathered during shadow round that indicated to be in the shadow round phase as well
private final Set<InetAddress> seedsInShadowRound = new ConcurrentSkipListSet<>(inetcomparator);
@@ -852,20 +854,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
return endpointStateMap.get(ep);
}
- public boolean valuesEqual(InetAddress ep1, InetAddress ep2, ApplicationState as)
- {
- EndpointState state1 = getEndpointStateForEndpoint(ep1);
- EndpointState state2 = getEndpointStateForEndpoint(ep2);
-
- if (state1 == null || state2 == null)
- return false;
-
- VersionedValue value1 = state1.getApplicationState(as);
- VersionedValue value2 = state2.getApplicationState(as);
-
- return !(value1 == null || value2 == null) && value1.value.equals(value2.value);
- }
-
public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
{
return endpointStateMap.entrySet();
@@ -1198,6 +1186,26 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
handleMajorStateChange(ep, remoteState);
}
}
+
+ boolean any30 = anyEndpointOn30();
+ if (any30 != anyNodeOn30)
+ {
+ logger.info(any30
+ ? "There is at least one 3.0 node in the cluster - will store and announce compatible schema version"
+ : "There are no 3.0 nodes in the cluster - will store and announce real schema version");
+
+ anyNodeOn30 = any30;
+ executor.submit(Schema.instance::updateVersionAndAnnounce);
+ }
+ }
+
+ private boolean anyEndpointOn30()
+ {
+ return endpointStateMap.values()
+ .stream()
+ .map(EndpointState::getReleaseVersion)
+ .filter(Objects::nonNull)
+ .anyMatch(CassandraVersion::is30);
}
private void applyNewStates(InetAddress addr, EndpointState localState, EndpointState remoteState)
@@ -1547,6 +1555,11 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled());
}
+ public boolean isAnyNodeOn30()
+ {
+ return anyNodeOn30;
+ }
+
protected void maybeFinishShadowRound(InetAddress respondent, boolean isInShadowRound, Map<InetAddress, EndpointState> epStateMap)
{
if (inShadowRound)
@@ -1629,16 +1642,18 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
return System.currentTimeMillis() + Gossiper.aVeryLongTime;
}
+ @Nullable
public CassandraVersion getReleaseVersion(InetAddress ep)
{
EndpointState state = getEndpointStateForEndpoint(ep);
- if (state != null)
- {
- VersionedValue applicationState = state.getApplicationState(ApplicationState.RELEASE_VERSION);
- if (applicationState != null)
- return new CassandraVersion(applicationState.value);
- }
- return null;
+ return state != null ? state.getReleaseVersion() : null;
+ }
+
+ @Nullable
+ public UUID getSchemaVersion(InetAddress ep)
+ {
+ EndpointState state = getEndpointStateForEndpoint(ep);
+ return state != null ? state.getSchemaVersion() : null;
}
public static void waitToSettle()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java b/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
index 47d986f..cc1c221 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.hints;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.cassandra.config.Schema;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.Gossiper;
@@ -64,7 +65,7 @@ final class HintsDispatchTrigger implements Runnable
.filter(store -> !isScheduled(store))
.filter(HintsStore::isLive)
.filter(store -> store.isWriting() || store.hasFiles())
- .filter(store -> Gossiper.instance.valuesEqual(getBroadcastAddress(), store.address(), ApplicationState.SCHEMA))
+ .filter(store -> Schema.instance.isSameVersion(Gossiper.instance.getSchemaVersion(store.address())))
.forEach(this::schedule);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 7834b12..b6add96 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -311,18 +311,24 @@ public final class SchemaKeyspace
/**
* Read schema from system keyspace and calculate MD5 digest of every row, resulting digest
* will be converted into UUID which would act as content-based version of the schema.
+ *
+ * This implementation is special cased for 3.11 as it returns the schema digests for 3.11
+ * <em>and</em> 3.0 - i.e. with and without the beloved {@code cdc} column.
*/
- public static UUID calculateSchemaDigest()
+ public static Pair<UUID, UUID> calculateSchemaDigest()
{
MessageDigest digest;
+ MessageDigest digest30;
try
{
digest = MessageDigest.getInstance("MD5");
+ digest30 = MessageDigest.getInstance("MD5");
}
catch (NoSuchAlgorithmException e)
{
throw new RuntimeException(e);
}
+ Set<ByteBuffer> cdc = Collections.singleton(ByteBufferUtil.bytes("cdc"));
for (String table : ALL_FOR_DIGEST)
{
@@ -340,12 +346,15 @@ public final class SchemaKeyspace
try (RowIterator partition = schema.next())
{
if (!isSystemKeyspaceSchemaPartition(partition.partitionKey()))
- RowIterators.digest(partition, digest);
+ {
+ RowIterators.digest(partition, digest, digest30, cdc);
+ }
}
}
}
}
- return UUID.nameUUIDFromBytes(digest.digest());
+
+ return Pair.create(UUID.nameUUIDFromBytes(digest.digest()), UUID.nameUUIDFromBytes(digest30.digest()));
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 3332d2c..a1b3597 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -31,7 +31,6 @@ import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.config.SchemaConstants;
import org.apache.cassandra.config.ViewDefinition;
import org.apache.cassandra.cql3.functions.UDAggregate;
import org.apache.cassandra.cql3.functions.UDFunction;
@@ -78,10 +77,9 @@ public class MigrationManager
public static void scheduleSchemaPull(InetAddress endpoint, EndpointState state)
{
- VersionedValue value = state.getApplicationState(ApplicationState.SCHEMA);
-
- if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && value != null)
- maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint);
+ UUID schemaVersion = state.getSchemaVersion();
+ if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && schemaVersion != null)
+ maybeScheduleSchemaPull(schemaVersion, endpoint);
}
/**
@@ -90,16 +88,37 @@ public class MigrationManager
*/
private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint)
{
- if ((Schema.instance.getVersion() != null && Schema.instance.getVersion().equals(theirVersion)) || !shouldPullSchemaFrom(endpoint))
+ if (Schema.instance.getVersion() == null)
+ {
+ logger.debug("Not pulling schema from {}, because local schama version is not known yet",
+ endpoint);
+ return;
+ }
+ if (Schema.instance.isSameVersion(theirVersion))
+ {
+ logger.debug("Not pulling schema from {}, because schema versions match: " +
+ "local/real={}, local/compatible={}, remote={}",
+ endpoint,
+ Schema.schemaVersionToString(Schema.instance.getRealVersion()),
+ Schema.schemaVersionToString(Schema.instance.getAltVersion()),
+ Schema.schemaVersionToString(theirVersion));
+ return;
+ }
+ if (!shouldPullSchemaFrom(endpoint))
{
logger.debug("Not pulling schema because versions match or shouldPullSchemaFrom returned false");
return;
}
- if (SchemaConstants.emptyVersion.equals(Schema.instance.getVersion()) || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
+ if (Schema.instance.isEmpty() || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
{
// If we think we may be bootstrapping or have recently started, submit MigrationTask immediately
- logger.debug("Submitting migration task for {}", endpoint);
+ logger.debug("Immediately submitting migration task for {}, " +
+ "schema versions: local/real={}, local/compatible={}, remote={}",
+ endpoint,
+ Schema.schemaVersionToString(Schema.instance.getRealVersion()),
+ Schema.schemaVersionToString(Schema.instance.getAltVersion()),
+ Schema.schemaVersionToString(theirVersion));
submitMigrationTask(endpoint);
}
else
@@ -109,20 +128,22 @@ public class MigrationManager
Runnable runnable = () ->
{
// grab the latest version of the schema since it may have changed again since the initial scheduling
- EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
- if (epState == null)
+ UUID epSchemaVersion = Gossiper.instance.getSchemaVersion(endpoint);
+ if (epSchemaVersion == null)
{
logger.debug("epState vanished for {}, not submitting migration task", endpoint);
return;
}
- VersionedValue value = epState.getApplicationState(ApplicationState.SCHEMA);
- UUID currentVersion = UUID.fromString(value.value);
- if (Schema.instance.getVersion().equals(currentVersion))
+ if (Schema.instance.isSameVersion(epSchemaVersion))
{
- logger.debug("not submitting migration task for {} because our versions match", endpoint);
+ logger.debug("Not submitting migration task for {} because our versions match ({})", endpoint, epSchemaVersion);
return;
}
- logger.debug("submitting migration task for {}", endpoint);
+ logger.debug("submitting migration task for {}, schema version mismatch: local/real={}, local/compatible={}, remote={}",
+ endpoint,
+ Schema.schemaVersionToString(Schema.instance.getRealVersion()),
+ Schema.schemaVersionToString(Schema.instance.getAltVersion()),
+ Schema.schemaVersionToString(epSchemaVersion));
submitMigrationTask(endpoint);
};
ScheduledExecutors.nonPeriodicTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
@@ -585,11 +606,14 @@ public class MigrationManager
* Used to notify nodes as they arrive in the cluster.
*
* @param version The schema version to announce
+ * @param compatible flag whether {@code version} is a 3.0 compatible version
*/
- public static void passiveAnnounce(UUID version)
+ public static void passiveAnnounce(UUID version, boolean compatible)
{
Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.schema(version));
- logger.debug("Gossiping my schema version {}", version);
+ logger.debug("Gossiping my {} schema version {}",
+ compatible ? "3.0 compatible" : "3.11",
+ Schema.schemaVersionToString(version));
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 15027b2..c5e2912 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -831,6 +831,31 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
}
+ public void waitForSchema(int delay)
+ {
+ logger.debug("Waiting for schema (max {} seconds)", delay);
+ // first sleep the delay to make sure we see all our peers
+ for (int i = 0; i < delay; i += 1000)
+ {
+ // if we see schema, we can proceed to the next check directly
+ if (!Schema.instance.isEmpty())
+ {
+ logger.debug("current schema version: {} (3.0 compatible: {})", Schema.instance.getRealVersion(), Schema.instance.getAltVersion());
+ break;
+ }
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+ }
+ // if our schema hasn't matched yet, wait until it has
+ // we do this by waiting for all in-flight migration requests and responses to complete
+ // (post CASSANDRA-1391 we don't expect this to be necessary very often, but it doesn't hurt to be careful)
+ if (!MigrationManager.isReadyForBootstrap())
+ {
+ setMode(Mode.JOINING, "waiting for schema information to complete", true);
+ MigrationManager.waitUntilReadyForBootstrap();
+ }
+ logger.info("Has schema with version {}", Schema.instance.getVersion());
+ }
+
private void joinTokenRing(int delay) throws ConfigurationException
{
joined = true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/utils/CassandraVersion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CassandraVersion.java b/src/java/org/apache/cassandra/utils/CassandraVersion.java
index aed0fe7..bf9fe6a 100644
--- a/src/java/org/apache/cassandra/utils/CassandraVersion.java
+++ b/src/java/org/apache/cassandra/utils/CassandraVersion.java
@@ -118,6 +118,11 @@ public class CassandraVersion implements Comparable<CassandraVersion>
return compareIdentifiers(build, other.build, -1);
}
+ public boolean is30()
+ {
+ return major == 3 && minor == 0;
+ }
+
/**
* Returns a version that is backward compatible with this version amongst a list
* of provided version, or null if none can be found.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[4/4] cassandra git commit: Bring code improvements of
CASSANDRA-14109 into trunk (but not the change itself)
Posted by sn...@apache.org.
Bring code improvements of CASSANDRA-14109 into trunk (but not the change itself)
patch by Robert Stupp; reviewed by Andrés de la Peña for CASSANDRA-14109
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7a40abb6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7a40abb6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7a40abb6
Branch: refs/heads/trunk
Commit: 7a40abb6a5108688fb1b10c375bb751cbb782ea4
Parents: 3e4e7d9
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed Dec 13 14:00:38 2017 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Dec 13 14:00:38 2017 +0100
----------------------------------------------------------------------
.../org/apache/cassandra/gms/EndpointState.java | 27 +++++++++--
src/java/org/apache/cassandra/gms/Gossiper.java | 31 ++++---------
.../cassandra/hints/HintsDispatchTrigger.java | 3 +-
.../cassandra/schema/MigrationManager.java | 48 ++++++++++++++------
.../org/apache/cassandra/schema/Schema.java | 30 ++++++++++++
.../cassandra/service/StorageService.java | 4 +-
6 files changed, 100 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a40abb6/src/java/org/apache/cassandra/gms/EndpointState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java
index 70f2a68..847041f 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -18,18 +18,19 @@
package org.apache.cassandra.gms;
import java.io.*;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.CassandraVersion;
+
/**
* This abstraction represents both the HeartBeatState and the ApplicationState in an EndpointState
* instance. Any state for a given endpoint can be retrieved from this instance.
@@ -154,6 +155,24 @@ public class EndpointState
return pieces[0];
}
+ @Nullable
+ public UUID getSchemaVersion()
+ {
+ VersionedValue applicationState = getApplicationState(ApplicationState.SCHEMA);
+ return applicationState != null
+ ? UUID.fromString(applicationState.value)
+ : null;
+ }
+
+ @Nullable
+ public CassandraVersion getReleaseVersion()
+ {
+ VersionedValue applicationState = getApplicationState(ApplicationState.RELEASE_VERSION);
+ return applicationState != null
+ ? new CassandraVersion(applicationState.value)
+ : null;
+ }
+
public String toString()
{
return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " + applicationState.get();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a40abb6/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 15dad71..e675d92 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -25,6 +25,7 @@ import java.util.Map.Entry;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
+import javax.annotation.Nullable;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -852,20 +853,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
return endpointStateMap.get(ep);
}
- public boolean valuesEqual(InetAddress ep1, InetAddress ep2, ApplicationState as)
- {
- EndpointState state1 = getEndpointStateForEndpoint(ep1);
- EndpointState state2 = getEndpointStateForEndpoint(ep2);
-
- if (state1 == null || state2 == null)
- return false;
-
- VersionedValue value1 = state1.getApplicationState(as);
- VersionedValue value2 = state2.getApplicationState(as);
-
- return !(value1 == null || value2 == null) && value1.value.equals(value2.value);
- }
-
public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
{
return endpointStateMap.entrySet();
@@ -1624,16 +1611,18 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
return System.currentTimeMillis() + Gossiper.aVeryLongTime;
}
+ @Nullable
public CassandraVersion getReleaseVersion(InetAddress ep)
{
EndpointState state = getEndpointStateForEndpoint(ep);
- if (state != null)
- {
- VersionedValue applicationState = state.getApplicationState(ApplicationState.RELEASE_VERSION);
- if (applicationState != null)
- return new CassandraVersion(applicationState.value);
- }
- return null;
+ return state != null ? state.getReleaseVersion() : null;
+ }
+
+ @Nullable
+ public UUID getSchemaVersion(InetAddress ep)
+ {
+ EndpointState state = getEndpointStateForEndpoint(ep);
+ return state != null ? state.getSchemaVersion() : null;
}
public static void waitToSettle()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a40abb6/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java b/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
index 47d986f..34d1eb2 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
@@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.schema.Schema;
import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddress;
@@ -64,7 +65,7 @@ final class HintsDispatchTrigger implements Runnable
.filter(store -> !isScheduled(store))
.filter(HintsStore::isLive)
.filter(store -> store.isWriting() || store.hasFiles())
- .filter(store -> Gossiper.instance.valuesEqual(getBroadcastAddress(), store.address(), ApplicationState.SCHEMA))
+ .filter(store -> Schema.instance.isSameVersion(Gossiper.instance.getSchemaVersion(store.address())))
.forEach(this::schedule);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a40abb6/src/java/org/apache/cassandra/schema/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/MigrationManager.java b/src/java/org/apache/cassandra/schema/MigrationManager.java
index d8a3b72..50e5c28 100644
--- a/src/java/org/apache/cassandra/schema/MigrationManager.java
+++ b/src/java/org/apache/cassandra/schema/MigrationManager.java
@@ -61,10 +61,9 @@ public class MigrationManager
public static void scheduleSchemaPull(InetAddress endpoint, EndpointState state)
{
- VersionedValue value = state.getApplicationState(ApplicationState.SCHEMA);
-
- if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && value != null)
- maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint);
+ UUID schemaVersion = state.getSchemaVersion();
+ if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && schemaVersion != null)
+ maybeScheduleSchemaPull(schemaVersion, endpoint);
}
/**
@@ -73,16 +72,34 @@ public class MigrationManager
*/
private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint)
{
- if ((Schema.instance.getVersion() != null && Schema.instance.getVersion().equals(theirVersion)) || !shouldPullSchemaFrom(endpoint))
+ if (Schema.instance.getVersion() == null)
+ {
+ logger.debug("Not pulling schema from {}, because local schama version is not known yet",
+ endpoint);
+ return;
+ }
+ if (Schema.instance.isSameVersion(theirVersion))
+ {
+ logger.debug("Not pulling schema from {}, because schema versions match ({})",
+ endpoint,
+ Schema.schemaVersionToString(theirVersion));
+ return;
+ }
+ if (!shouldPullSchemaFrom(endpoint))
{
- logger.debug("Not pulling schema because versions match or shouldPullSchemaFrom returned false");
+ logger.debug("Not pulling schema from {}, because versions match ({}/{}), or shouldPullSchemaFrom returned false",
+ endpoint, Schema.instance.getVersion(), theirVersion);
return;
}
- if (SchemaConstants.emptyVersion.equals(Schema.instance.getVersion()) || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
+ if (Schema.instance.isEmpty() || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
{
// If we think we may be bootstrapping or have recently started, submit MigrationTask immediately
- logger.debug("Submitting migration task for {}", endpoint);
+ logger.debug("Immediately submitting migration task for {} due to {}, " +
+ "schema versions: local={}, remote={}",
+ endpoint,
+ Schema.schemaVersionToString(Schema.instance.getVersion()),
+ Schema.schemaVersionToString(theirVersion));
submitMigrationTask(endpoint);
}
else
@@ -92,20 +109,21 @@ public class MigrationManager
Runnable runnable = () ->
{
// grab the latest version of the schema since it may have changed again since the initial scheduling
- EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
- if (epState == null)
+ UUID epSchemaVersion = Gossiper.instance.getSchemaVersion(endpoint);
+ if (epSchemaVersion == null)
{
logger.debug("epState vanished for {}, not submitting migration task", endpoint);
return;
}
- VersionedValue value = epState.getApplicationState(ApplicationState.SCHEMA);
- UUID currentVersion = UUID.fromString(value.value);
- if (Schema.instance.getVersion().equals(currentVersion))
+ if (Schema.instance.isSameVersion(epSchemaVersion))
{
- logger.debug("not submitting migration task for {} because our versions match", endpoint);
+ logger.debug("Not submitting migration task for {} because our versions match ({})", endpoint, epSchemaVersion);
return;
}
- logger.debug("submitting migration task for {}", endpoint);
+ logger.debug("Submitting migration task for {}, schema version mismatch: local={}, remote={}",
+ endpoint,
+ Schema.schemaVersionToString(Schema.instance.getVersion()),
+ Schema.schemaVersionToString(epSchemaVersion));
submitMigrationTask(endpoint);
};
ScheduledExecutors.nonPeriodicTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a40abb6/src/java/org/apache/cassandra/schema/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Schema.java b/src/java/org/apache/cassandra/schema/Schema.java
index 711724b..594b2ab 100644
--- a/src/java/org/apache/cassandra/schema/Schema.java
+++ b/src/java/org/apache/cassandra/schema/Schema.java
@@ -524,6 +524,22 @@ public final class Schema
}
/**
+ * Checks whether the given schema version is the same as the current local schema.
+ */
+ public boolean isSameVersion(UUID schemaVersion)
+ {
+ return schemaVersion != null && schemaVersion.equals(version);
+ }
+
+ /**
+ * Checks whether the current schema is empty.
+ */
+ public boolean isEmpty()
+ {
+ return SchemaConstants.emptyVersion.equals(version);
+ }
+
+ /**
* Read schema from system keyspace and calculate MD5 digest of every row, resulting digest
* will be converted into UUID which would act as content-based version of the schema.
*/
@@ -819,4 +835,18 @@ public final class Schema
{
changeListeners.forEach(l -> l.onDropAggregate(udf.name().keyspace, udf.name().name, udf.argTypes()));
}
+
+
+ /**
+ * Converts the given schema version to a string. Returns {@code unknown}, if {@code version} is {@code null}
+ * or {@code "(empty)"}, if {@code version} refers to an {@link SchemaConstants#emptyVersion empty) schema.
+ */
+ public static String schemaVersionToString(UUID version)
+ {
+ return version == null
+ ? "unknown"
+ : SchemaConstants.emptyVersion.equals(version)
+ ? "(empty)"
+ : version.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a40abb6/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 37d163f..89d5358 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -796,9 +796,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
for (int i = 0; i < delay; i += 1000)
{
// if we see schema, we can proceed to the next check directly
- if (!Schema.instance.getVersion().equals(SchemaConstants.emptyVersion))
+ if (!Schema.instance.isEmpty())
{
- logger.debug("got schema: {}", Schema.instance.getVersion());
+ logger.debug("current schema version: {}", Schema.instance.getVersion());
break;
}
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[2/4] cassandra git commit: Prevent continuous schema exchange
between 3.0 to 3.11 nodes
Posted by sn...@apache.org.
Prevent continuous schema exchange between 3.0 to 3.11 nodes
patch by Robert Stupp; reviewed by Andrés de la Peña for CASSANDRA-14109
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e646e503
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e646e503
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e646e503
Branch: refs/heads/trunk
Commit: e646e5032b68622f7ec1dd0c53137be08baabed9
Parents: f6381db
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed Dec 13 13:58:53 2017 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Dec 13 13:58:53 2017 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
NEWS.txt | 9 +++
.../org/apache/cassandra/config/Schema.java | 79 ++++++++++++++++++--
src/java/org/apache/cassandra/db/Columns.java | 7 ++
.../cassandra/db/SchemaCheckVerbHandler.java | 9 ++-
.../apache/cassandra/db/rows/AbstractRow.java | 10 ++-
.../db/rows/RangeTombstoneBoundMarker.java | 7 ++
.../db/rows/RangeTombstoneBoundaryMarker.java | 7 ++
.../apache/cassandra/db/rows/RowIterators.java | 21 +++++-
.../apache/cassandra/db/rows/Unfiltered.java | 12 +++
.../org/apache/cassandra/gms/EndpointState.java | 23 ++++++
src/java/org/apache/cassandra/gms/Gossiper.java | 65 +++++++++-------
.../cassandra/hints/HintsDispatchTrigger.java | 3 +-
.../apache/cassandra/schema/SchemaKeyspace.java | 15 +++-
.../cassandra/service/MigrationManager.java | 58 +++++++++-----
.../cassandra/service/StorageService.java | 25 +++++++
.../cassandra/utils/CassandraVersion.java | 5 ++
17 files changed, 300 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index eaf312f..60794f0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.11.2
+ * Prevent continuous schema exchange between 3.0 and 3.11 nodes (CASSANDRA-14109)
* Fix imbalanced disks when replacing node with same address with JBOD (CASSANDRA-14084)
* Reload compaction strategies when disk boundaries are invalidated (CASSANDRA-13948)
* Remove OpenJDK log warning (CASSANDRA-13916)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 0c32278..f4b15e7 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -25,6 +25,15 @@ Upgrading
- Cassandra is not logging anymore by default an Heap histogram on OutOfMemoryError. To enable that behavior
set the 'cassandra.printHeapHistogramOnOutOfMemoryError' System property to 'true'. See CASSANDRA-13006
for more details.
+ - Upgrades from 3.0 might have produced unnecessary schema migrations while
+ there was at least one 3.0 node in the cluster. It is therefore highly
+ recommended to upgrade from 3.0 to at least 3.11.2. The root cause of
+ this schema mismatch was a difference in the way how schema digests were computed
+ in 3.0 and 3.11.2. To mitigate this issue, 3.11.2 and newer announce
+ 3.0 compatible digests as long as there is at least one 3.0 node in the
+ cluster. Once all nodes have been upgraded, the "real" schema version will be
+ announced. Note: this fix is only necessary in 3.11.2 and therefore only applies
+ to 3.11. (CASSANDRA-14109)
Materialized Views
-------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index 8fc83df..253a66b 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -22,6 +22,7 @@ import java.util.stream.Collectors;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +34,7 @@ import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.locator.LocalStrategy;
@@ -40,7 +42,6 @@ import org.apache.cassandra.schema.*;
import org.apache.cassandra.service.MigrationManager;
import org.apache.cassandra.utils.ConcurrentBiMap;
import org.apache.cassandra.utils.Pair;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
public class Schema
{
@@ -58,6 +59,7 @@ public class Schema
private final ConcurrentBiMap<Pair<String, String>, UUID> cfIdMap = new ConcurrentBiMap<>();
private volatile UUID version;
+ private volatile UUID altVersion;
/**
* Initialize empty schema object and load the hardcoded system tables
@@ -518,30 +520,82 @@ public class Schema
/* Version control */
/**
- * @return current schema version
+ * The schema version to announce.
+ * This will be either the "real" schema version including the {@code cdc} column,
+ * if no node in the cluster is running at 3.0, or a 3.0 compatible
+ * schema version, with the {@code cdc} column excluded, if at least one node is
+ * running 3.0.
+ *
+ * @return "current" schema version
*/
public UUID getVersion()
{
+ return Gossiper.instance.isEnabled() && Gossiper.instance.isAnyNodeOn30()
+ ? altVersion
+ : version;
+ }
+
+ /**
+ * The 3.11 schema version, always includes the {@code cdc} column.
+ */
+ public UUID getRealVersion()
+ {
return version;
}
/**
+ * The "alternative" schema version, compatible to 3.0, always excludes the
+ * {@code cdc} column.
+ */
+ public UUID getAltVersion()
+ {
+ return altVersion;
+ }
+
+ /**
+ * Checks whether the given schema version is the same as the current local schema
+ * version, either the 3.0 compatible or "real" one.
+ */
+ public boolean isSameVersion(UUID schemaVersion)
+ {
+ return schemaVersion != null
+ && (schemaVersion.equals(version) || schemaVersion.equals(altVersion));
+ }
+
+ /**
+ * Checks whether the current schema is empty.
+ */
+ public boolean isEmpty()
+ {
+ return SchemaConstants.emptyVersion.equals(version);
+ }
+
+ /**
* Read schema from system keyspace and calculate MD5 digest of every row, resulting digest
* will be converted into UUID which would act as content-based version of the schema.
+ *
+ * 3.11 note: we calculate the "real" schema version and the 3.0 compatible schema
+ * version here.
*/
public void updateVersion()
{
- version = SchemaKeyspace.calculateSchemaDigest();
- SystemKeyspace.updateSchemaVersion(version);
+ Pair<UUID, UUID> mixedVersions = SchemaKeyspace.calculateSchemaDigest();
+ version = mixedVersions.left;
+ altVersion = mixedVersions.right;
+ SystemKeyspace.updateSchemaVersion(getVersion());
}
- /*
+ /**
* Like updateVersion, but also announces via gossip
+ *
+ * 3.11 note: we announce the "current" schema version, which can be either the 3.0
+ * compatible one, if at least one node is still running 3.0, or the "real" schema version.
*/
public void updateVersionAndAnnounce()
{
updateVersion();
- MigrationManager.passiveAnnounce(version);
+ UUID current = getVersion();
+ MigrationManager.passiveAnnounce(current, current == getAltVersion());
}
/**
@@ -785,4 +839,17 @@ public class Schema
return transformed;
}
+
+ /**
+ * Converts the given schema version to a string. Returns {@code unknown}, if {@code version} is {@code null}
+ * or {@code "(empty)"}, if {@code version} refers to an {@link SchemaConstants#emptyVersion empty) schema.
+ */
+ public static String schemaVersionToString(UUID version)
+ {
+ return version == null
+ ? "unknown"
+ : SchemaConstants.emptyVersion.equals(version)
+ ? "(empty)"
+ : version.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/Columns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java
index 18729de..965e401 100644
--- a/src/java/org/apache/cassandra/db/Columns.java
+++ b/src/java/org/apache/cassandra/db/Columns.java
@@ -367,6 +367,13 @@ public class Columns extends AbstractCollection<ColumnDefinition> implements Col
digest.update(c.name.bytes.duplicate());
}
+ public void digest(MessageDigest digest, Set<ByteBuffer> columnsToExclude)
+ {
+ for (ColumnDefinition c : this)
+ if (!columnsToExclude.contains(c.name.bytes))
+ digest.update(c.name.bytes.duplicate());
+ }
+
/**
* Apply a function to each column definition in forwards or reversed order.
* @param function
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java b/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
index 4270a24..be501de 100644
--- a/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
@@ -36,7 +36,14 @@ public class SchemaCheckVerbHandler implements IVerbHandler
public void doVerb(MessageIn message, int id)
{
logger.trace("Received schema check request.");
- MessageOut<UUID> response = new MessageOut<UUID>(MessagingService.Verb.INTERNAL_RESPONSE, Schema.instance.getVersion(), UUIDSerializer.serializer);
+
+ /*
+ 3.11 is special here: We return the 3.0 compatible version, if the requesting node
+ is running 3.0. Otherwise the "real" schema version.
+ */
+ MessageOut<UUID> response = new MessageOut<>(MessagingService.Verb.INTERNAL_RESPONSE,
+ Schema.instance.getVersion(),
+ UUIDSerializer.serializer);
MessagingService.instance().sendReply(response, id, message.from);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/rows/AbstractRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRow.java b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
index 847cb47..13e6502 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractRow.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
@@ -19,7 +19,9 @@ package org.apache.cassandra.db.rows;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.util.AbstractCollection;
+import java.util.Collections;
import java.util.Objects;
+import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -63,6 +65,11 @@ public abstract class AbstractRow extends AbstractCollection<ColumnData> impleme
public void digest(MessageDigest digest)
{
+ digest(digest, Collections.emptySet());
+ }
+
+ public void digest(MessageDigest digest, Set<ByteBuffer> columnsToExclude)
+ {
FBUtilities.updateWithByte(digest, kind().ordinal());
clustering().digest(digest);
@@ -70,7 +77,8 @@ public abstract class AbstractRow extends AbstractCollection<ColumnData> impleme
primaryKeyLivenessInfo().digest(digest);
for (ColumnData cd : this)
- cd.digest(digest);
+ if (!columnsToExclude.contains(cd.column.name.bytes))
+ cd.digest(digest);
}
public void validateData(CFMetaData metadata)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
index a82bb64..fb94da3 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.rows;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.util.Objects;
+import java.util.Set;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
@@ -132,6 +133,12 @@ public class RangeTombstoneBoundMarker extends AbstractRangeTombstoneMarker<Clus
deletion.digest(digest);
}
+ @Override
+ public void digest(MessageDigest digest, Set<ByteBuffer> columnsToExclude)
+ {
+ digest(digest);
+ }
+
public String toString(CFMetaData metadata)
{
return "Marker " + bound.toString(metadata) + '@' + deletion.markedForDeleteAt();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
index 70d6a9d..9190ecf 100644
--- a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
+++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.rows;
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.util.Objects;
+import java.util.Set;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
@@ -151,6 +152,12 @@ public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker<C
startDeletion.digest(digest);
}
+ @Override
+ public void digest(MessageDigest digest, Set<ByteBuffer> columnsToExclude)
+ {
+ digest(digest);
+ }
+
public String toString(CFMetaData metadata)
{
return String.format("Marker %s@%d-%d", bound.toString(metadata), endDeletion.markedForDeleteAt(), startDeletion.markedForDeleteAt());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/rows/RowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowIterators.java b/src/java/org/apache/cassandra/db/rows/RowIterators.java
index bce6a7d..1463bf5 100644
--- a/src/java/org/apache/cassandra/db/rows/RowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/RowIterators.java
@@ -17,7 +17,9 @@
*/
package org.apache.cassandra.db.rows;
+import java.nio.ByteBuffer;
import java.security.MessageDigest;
+import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +38,7 @@ public abstract class RowIterators
private RowIterators() {}
- public static void digest(RowIterator iterator, MessageDigest digest)
+ public static void digest(RowIterator iterator, MessageDigest digest, MessageDigest altDigest, Set<ByteBuffer> columnsToExclude)
{
// TODO: we're not computing digest the same way that old nodes. This is
// currently ok as this is only used for schema digest and the is no exchange
@@ -48,8 +50,23 @@ public abstract class RowIterators
FBUtilities.updateWithBoolean(digest, iterator.isReverseOrder());
iterator.staticRow().digest(digest);
+ if (altDigest != null)
+ {
+ // Compute the "alternative digest" here.
+ altDigest.update(iterator.partitionKey().getKey().duplicate());
+ iterator.columns().regulars.digest(altDigest, columnsToExclude);
+ iterator.columns().statics.digest(altDigest, columnsToExclude);
+ FBUtilities.updateWithBoolean(altDigest, iterator.isReverseOrder());
+ iterator.staticRow().digest(altDigest, columnsToExclude);
+ }
+
while (iterator.hasNext())
- iterator.next().digest(digest);
+ {
+ Row row = iterator.next();
+ row.digest(digest);
+ if (altDigest != null)
+ row.digest(altDigest, columnsToExclude);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/db/rows/Unfiltered.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Unfiltered.java b/src/java/org/apache/cassandra/db/rows/Unfiltered.java
index e75c632..3d8a9b1 100644
--- a/src/java/org/apache/cassandra/db/rows/Unfiltered.java
+++ b/src/java/org/apache/cassandra/db/rows/Unfiltered.java
@@ -17,7 +17,9 @@
*/
package org.apache.cassandra.db.rows;
+import java.nio.ByteBuffer;
import java.security.MessageDigest;
+import java.util.Set;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.Clusterable;
@@ -46,6 +48,16 @@ public interface Unfiltered extends Clusterable
public void digest(MessageDigest digest);
/**
+ * Digest the atom using the provided {@code MessageDigest}.
+ * This method only exists in 3.11.
+ * Same like {@link #digest(MessageDigest)}, but excludes the given columns from digest calculation.
+ */
+ public default void digest(MessageDigest digest, Set<ByteBuffer> columnsToExclude)
+ {
+ throw new UnsupportedOperationException("no no no - don't use this one - use digest(MessageDigest) instead");
+ }
+
+ /**
* Validate the data of this atom.
*
* @param metadata the metadata for the table this atom is part of.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/gms/EndpointState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java
index 70f2a68..674b597 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -22,14 +22,19 @@ import java.util.Collections;
import java.util.EnumMap;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.CassandraVersion;
+
/**
* This abstraction represents both the HeartBeatState and the ApplicationState in an EndpointState
* instance. Any state for a given endpoint can be retrieved from this instance.
@@ -154,6 +159,24 @@ public class EndpointState
return pieces[0];
}
+ @Nullable
+ public UUID getSchemaVersion()
+ {
+ VersionedValue applicationState = getApplicationState(ApplicationState.SCHEMA);
+ return applicationState != null
+ ? UUID.fromString(applicationState.value)
+ : null;
+ }
+
+ @Nullable
+ public CassandraVersion getReleaseVersion()
+ {
+ VersionedValue applicationState = getApplicationState(ApplicationState.RELEASE_VERSION);
+ return applicationState != null
+ ? new CassandraVersion(applicationState.value)
+ : null;
+ }
+
public String toString()
{
return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " + applicationState.get();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 14601d7..2dac5c2 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -24,7 +24,7 @@ import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
-
+import javax.annotation.Nullable;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -32,9 +32,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
-
-import org.apache.cassandra.utils.CassandraVersion;
-import org.apache.cassandra.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,14 +40,17 @@ import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.Pair;
/**
* This module is responsible for Gossiping information for the local endpoint. This abstraction
@@ -75,6 +75,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
static final List<String> DEAD_STATES = Arrays.asList(VersionedValue.REMOVING_TOKEN, VersionedValue.REMOVED_TOKEN,
VersionedValue.STATUS_LEFT, VersionedValue.HIBERNATE);
static ArrayList<String> SILENT_SHUTDOWN_STATES = new ArrayList<>();
+
static
{
SILENT_SHUTDOWN_STATES.addAll(DEAD_STATES);
@@ -130,6 +131,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
private final Map<InetAddress, Long> expireTimeEndpointMap = new ConcurrentHashMap<InetAddress, Long>();
+ private volatile boolean anyNodeOn30 = false; // we assume the regular case here - all nodes are on 3.11
private volatile boolean inShadowRound = false;
// seeds gathered during shadow round that indicated to be in the shadow round phase as well
private final Set<InetAddress> seedsInShadowRound = new ConcurrentSkipListSet<>(inetcomparator);
@@ -852,20 +854,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
return endpointStateMap.get(ep);
}
- public boolean valuesEqual(InetAddress ep1, InetAddress ep2, ApplicationState as)
- {
- EndpointState state1 = getEndpointStateForEndpoint(ep1);
- EndpointState state2 = getEndpointStateForEndpoint(ep2);
-
- if (state1 == null || state2 == null)
- return false;
-
- VersionedValue value1 = state1.getApplicationState(as);
- VersionedValue value2 = state2.getApplicationState(as);
-
- return !(value1 == null || value2 == null) && value1.value.equals(value2.value);
- }
-
public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
{
return endpointStateMap.entrySet();
@@ -1198,6 +1186,26 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
handleMajorStateChange(ep, remoteState);
}
}
+
+ boolean any30 = anyEndpointOn30();
+ if (any30 != anyNodeOn30)
+ {
+ logger.info(any30
+ ? "There is at least one 3.0 node in the cluster - will store and announce compatible schema version"
+ : "There are no 3.0 nodes in the cluster - will store and announce real schema version");
+
+ anyNodeOn30 = any30;
+ executor.submit(Schema.instance::updateVersionAndAnnounce);
+ }
+ }
+
+ private boolean anyEndpointOn30()
+ {
+ return endpointStateMap.values()
+ .stream()
+ .map(EndpointState::getReleaseVersion)
+ .filter(Objects::nonNull)
+ .anyMatch(CassandraVersion::is30);
}
private void applyNewStates(InetAddress addr, EndpointState localState, EndpointState remoteState)
@@ -1547,6 +1555,11 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled());
}
+ public boolean isAnyNodeOn30()
+ {
+ return anyNodeOn30;
+ }
+
protected void maybeFinishShadowRound(InetAddress respondent, boolean isInShadowRound, Map<InetAddress, EndpointState> epStateMap)
{
if (inShadowRound)
@@ -1629,16 +1642,18 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
return System.currentTimeMillis() + Gossiper.aVeryLongTime;
}
+ @Nullable
public CassandraVersion getReleaseVersion(InetAddress ep)
{
EndpointState state = getEndpointStateForEndpoint(ep);
- if (state != null)
- {
- VersionedValue applicationState = state.getApplicationState(ApplicationState.RELEASE_VERSION);
- if (applicationState != null)
- return new CassandraVersion(applicationState.value);
- }
- return null;
+ return state != null ? state.getReleaseVersion() : null;
+ }
+
+ @Nullable
+ public UUID getSchemaVersion(InetAddress ep)
+ {
+ EndpointState state = getEndpointStateForEndpoint(ep);
+ return state != null ? state.getSchemaVersion() : null;
}
public static void waitToSettle()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java b/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
index 47d986f..cc1c221 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.hints;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.cassandra.config.Schema;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.Gossiper;
@@ -64,7 +65,7 @@ final class HintsDispatchTrigger implements Runnable
.filter(store -> !isScheduled(store))
.filter(HintsStore::isLive)
.filter(store -> store.isWriting() || store.hasFiles())
- .filter(store -> Gossiper.instance.valuesEqual(getBroadcastAddress(), store.address(), ApplicationState.SCHEMA))
+ .filter(store -> Schema.instance.isSameVersion(Gossiper.instance.getSchemaVersion(store.address())))
.forEach(this::schedule);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 7834b12..b6add96 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -311,18 +311,24 @@ public final class SchemaKeyspace
/**
* Read schema from system keyspace and calculate MD5 digest of every row, resulting digest
* will be converted into UUID which would act as content-based version of the schema.
+ *
+ * This implementation is special cased for 3.11 as it returns the schema digests for 3.11
+ * <em>and</em> 3.0 - i.e. with and without the beloved {@code cdc} column.
*/
- public static UUID calculateSchemaDigest()
+ public static Pair<UUID, UUID> calculateSchemaDigest()
{
MessageDigest digest;
+ MessageDigest digest30;
try
{
digest = MessageDigest.getInstance("MD5");
+ digest30 = MessageDigest.getInstance("MD5");
}
catch (NoSuchAlgorithmException e)
{
throw new RuntimeException(e);
}
+ Set<ByteBuffer> cdc = Collections.singleton(ByteBufferUtil.bytes("cdc"));
for (String table : ALL_FOR_DIGEST)
{
@@ -340,12 +346,15 @@ public final class SchemaKeyspace
try (RowIterator partition = schema.next())
{
if (!isSystemKeyspaceSchemaPartition(partition.partitionKey()))
- RowIterators.digest(partition, digest);
+ {
+ RowIterators.digest(partition, digest, digest30, cdc);
+ }
}
}
}
}
- return UUID.nameUUIDFromBytes(digest.digest());
+
+ return Pair.create(UUID.nameUUIDFromBytes(digest.digest()), UUID.nameUUIDFromBytes(digest30.digest()));
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 3332d2c..a1b3597 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -31,7 +31,6 @@ import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.config.SchemaConstants;
import org.apache.cassandra.config.ViewDefinition;
import org.apache.cassandra.cql3.functions.UDAggregate;
import org.apache.cassandra.cql3.functions.UDFunction;
@@ -78,10 +77,9 @@ public class MigrationManager
public static void scheduleSchemaPull(InetAddress endpoint, EndpointState state)
{
- VersionedValue value = state.getApplicationState(ApplicationState.SCHEMA);
-
- if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && value != null)
- maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint);
+ UUID schemaVersion = state.getSchemaVersion();
+ if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && schemaVersion != null)
+ maybeScheduleSchemaPull(schemaVersion, endpoint);
}
/**
@@ -90,16 +88,37 @@ public class MigrationManager
*/
private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint)
{
- if ((Schema.instance.getVersion() != null && Schema.instance.getVersion().equals(theirVersion)) || !shouldPullSchemaFrom(endpoint))
+ if (Schema.instance.getVersion() == null)
+ {
+ logger.debug("Not pulling schema from {}, because local schama version is not known yet",
+ endpoint);
+ return;
+ }
+ if (Schema.instance.isSameVersion(theirVersion))
+ {
+ logger.debug("Not pulling schema from {}, because schema versions match: " +
+ "local/real={}, local/compatible={}, remote={}",
+ endpoint,
+ Schema.schemaVersionToString(Schema.instance.getRealVersion()),
+ Schema.schemaVersionToString(Schema.instance.getAltVersion()),
+ Schema.schemaVersionToString(theirVersion));
+ return;
+ }
+ if (!shouldPullSchemaFrom(endpoint))
{
logger.debug("Not pulling schema because versions match or shouldPullSchemaFrom returned false");
return;
}
- if (SchemaConstants.emptyVersion.equals(Schema.instance.getVersion()) || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
+ if (Schema.instance.isEmpty() || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
{
// If we think we may be bootstrapping or have recently started, submit MigrationTask immediately
- logger.debug("Submitting migration task for {}", endpoint);
+ logger.debug("Immediately submitting migration task for {}, " +
+ "schema versions: local/real={}, local/compatible={}, remote={}",
+ endpoint,
+ Schema.schemaVersionToString(Schema.instance.getRealVersion()),
+ Schema.schemaVersionToString(Schema.instance.getAltVersion()),
+ Schema.schemaVersionToString(theirVersion));
submitMigrationTask(endpoint);
}
else
@@ -109,20 +128,22 @@ public class MigrationManager
Runnable runnable = () ->
{
// grab the latest version of the schema since it may have changed again since the initial scheduling
- EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
- if (epState == null)
+ UUID epSchemaVersion = Gossiper.instance.getSchemaVersion(endpoint);
+ if (epSchemaVersion == null)
{
logger.debug("epState vanished for {}, not submitting migration task", endpoint);
return;
}
- VersionedValue value = epState.getApplicationState(ApplicationState.SCHEMA);
- UUID currentVersion = UUID.fromString(value.value);
- if (Schema.instance.getVersion().equals(currentVersion))
+ if (Schema.instance.isSameVersion(epSchemaVersion))
{
- logger.debug("not submitting migration task for {} because our versions match", endpoint);
+ logger.debug("Not submitting migration task for {} because our versions match ({})", endpoint, epSchemaVersion);
return;
}
- logger.debug("submitting migration task for {}", endpoint);
+ logger.debug("submitting migration task for {}, schema version mismatch: local/real={}, local/compatible={}, remote={}",
+ endpoint,
+ Schema.schemaVersionToString(Schema.instance.getRealVersion()),
+ Schema.schemaVersionToString(Schema.instance.getAltVersion()),
+ Schema.schemaVersionToString(epSchemaVersion));
submitMigrationTask(endpoint);
};
ScheduledExecutors.nonPeriodicTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
@@ -585,11 +606,14 @@ public class MigrationManager
* Used to notify nodes as they arrive in the cluster.
*
* @param version The schema version to announce
+ * @param compatible flag whether {@code version} is a 3.0 compatible version
*/
- public static void passiveAnnounce(UUID version)
+ public static void passiveAnnounce(UUID version, boolean compatible)
{
Gossiper.instance.addLocalApplicationState(ApplicationState.SCHEMA, StorageService.instance.valueFactory.schema(version));
- logger.debug("Gossiping my schema version {}", version);
+ logger.debug("Gossiping my {} schema version {}",
+ compatible ? "3.0 compatible" : "3.11",
+ Schema.schemaVersionToString(version));
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 15027b2..c5e2912 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -831,6 +831,31 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
}
+ public void waitForSchema(int delay)
+ {
+ logger.debug("Waiting for schema (max {} seconds)", delay);
+ // first sleep the delay to make sure we see all our peers
+ for (int i = 0; i < delay; i += 1000)
+ {
+ // if we see schema, we can proceed to the next check directly
+ if (!Schema.instance.isEmpty())
+ {
+ logger.debug("current schema version: {} (3.0 compatible: {})", Schema.instance.getRealVersion(), Schema.instance.getAltVersion());
+ break;
+ }
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+ }
+ // if our schema hasn't matched yet, wait until it has
+ // we do this by waiting for all in-flight migration requests and responses to complete
+ // (post CASSANDRA-1391 we don't expect this to be necessary very often, but it doesn't hurt to be careful)
+ if (!MigrationManager.isReadyForBootstrap())
+ {
+ setMode(Mode.JOINING, "waiting for schema information to complete", true);
+ MigrationManager.waitUntilReadyForBootstrap();
+ }
+ logger.info("Has schema with version {}", Schema.instance.getVersion());
+ }
+
private void joinTokenRing(int delay) throws ConfigurationException
{
joined = true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e646e503/src/java/org/apache/cassandra/utils/CassandraVersion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CassandraVersion.java b/src/java/org/apache/cassandra/utils/CassandraVersion.java
index aed0fe7..bf9fe6a 100644
--- a/src/java/org/apache/cassandra/utils/CassandraVersion.java
+++ b/src/java/org/apache/cassandra/utils/CassandraVersion.java
@@ -118,6 +118,11 @@ public class CassandraVersion implements Comparable<CassandraVersion>
return compareIdentifiers(build, other.build, -1);
}
+ public boolean is30()
+ {
+ return major == 3 && minor == 0;
+ }
+
/**
* Returns a version that is backward compatible with this version amongst a list
* of provided version, or null if none can be found.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[3/4] cassandra git commit: Merge branch 'cassandra-3.11' into trunk
Posted by sn...@apache.org.
Merge branch 'cassandra-3.11' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3e4e7d95
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3e4e7d95
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3e4e7d95
Branch: refs/heads/trunk
Commit: 3e4e7d9546d69a0e1c807b3e2d585d3a659500bc
Parents: 0d8199b e646e50
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed Dec 13 14:00:11 2017 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Dec 13 14:00:11 2017 +0100
----------------------------------------------------------------------
----------------------------------------------------------------------
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org