You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2017/12/13 13:03:39 UTC
[4/4] cassandra git commit: Bring code improvements of
CASSANDRA-14109 into trunk (but not the change itself)
Bring code improvements of CASSANDRA-14109 into trunk (but not the change itself)
patch by Robert Stupp; reviewed by Andrés de la Peña for CASSANDRA-14109
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7a40abb6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7a40abb6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7a40abb6
Branch: refs/heads/trunk
Commit: 7a40abb6a5108688fb1b10c375bb751cbb782ea4
Parents: 3e4e7d9
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed Dec 13 14:00:38 2017 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Dec 13 14:00:38 2017 +0100
----------------------------------------------------------------------
.../org/apache/cassandra/gms/EndpointState.java | 27 +++++++++--
src/java/org/apache/cassandra/gms/Gossiper.java | 31 ++++---------
.../cassandra/hints/HintsDispatchTrigger.java | 3 +-
.../cassandra/schema/MigrationManager.java | 48 ++++++++++++++------
.../org/apache/cassandra/schema/Schema.java | 30 ++++++++++++
.../cassandra/service/StorageService.java | 4 +-
6 files changed, 100 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a40abb6/src/java/org/apache/cassandra/gms/EndpointState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java
index 70f2a68..847041f 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -18,18 +18,19 @@
package org.apache.cassandra.gms;
import java.io.*;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.CassandraVersion;
+
/**
* This abstraction represents both the HeartBeatState and the ApplicationState in an EndpointState
* instance. Any state for a given endpoint can be retrieved from this instance.
@@ -154,6 +155,24 @@ public class EndpointState
return pieces[0];
}
+ @Nullable
+ public UUID getSchemaVersion()
+ {
+ VersionedValue applicationState = getApplicationState(ApplicationState.SCHEMA);
+ return applicationState != null
+ ? UUID.fromString(applicationState.value)
+ : null;
+ }
+
+ @Nullable
+ public CassandraVersion getReleaseVersion()
+ {
+ VersionedValue applicationState = getApplicationState(ApplicationState.RELEASE_VERSION);
+ return applicationState != null
+ ? new CassandraVersion(applicationState.value)
+ : null;
+ }
+
public String toString()
{
return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " + applicationState.get();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a40abb6/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 15dad71..e675d92 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -25,6 +25,7 @@ import java.util.Map.Entry;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
+import javax.annotation.Nullable;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -852,20 +853,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
return endpointStateMap.get(ep);
}
- public boolean valuesEqual(InetAddress ep1, InetAddress ep2, ApplicationState as)
- {
- EndpointState state1 = getEndpointStateForEndpoint(ep1);
- EndpointState state2 = getEndpointStateForEndpoint(ep2);
-
- if (state1 == null || state2 == null)
- return false;
-
- VersionedValue value1 = state1.getApplicationState(as);
- VersionedValue value2 = state2.getApplicationState(as);
-
- return !(value1 == null || value2 == null) && value1.value.equals(value2.value);
- }
-
public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
{
return endpointStateMap.entrySet();
@@ -1624,16 +1611,18 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
return System.currentTimeMillis() + Gossiper.aVeryLongTime;
}
+ @Nullable
public CassandraVersion getReleaseVersion(InetAddress ep)
{
EndpointState state = getEndpointStateForEndpoint(ep);
- if (state != null)
- {
- VersionedValue applicationState = state.getApplicationState(ApplicationState.RELEASE_VERSION);
- if (applicationState != null)
- return new CassandraVersion(applicationState.value);
- }
- return null;
+ return state != null ? state.getReleaseVersion() : null;
+ }
+
+ @Nullable
+ public UUID getSchemaVersion(InetAddress ep)
+ {
+ EndpointState state = getEndpointStateForEndpoint(ep);
+ return state != null ? state.getSchemaVersion() : null;
}
public static void waitToSettle()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a40abb6/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java b/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
index 47d986f..34d1eb2 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
@@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.schema.Schema;
import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddress;
@@ -64,7 +65,7 @@ final class HintsDispatchTrigger implements Runnable
.filter(store -> !isScheduled(store))
.filter(HintsStore::isLive)
.filter(store -> store.isWriting() || store.hasFiles())
- .filter(store -> Gossiper.instance.valuesEqual(getBroadcastAddress(), store.address(), ApplicationState.SCHEMA))
+ .filter(store -> Schema.instance.isSameVersion(Gossiper.instance.getSchemaVersion(store.address())))
.forEach(this::schedule);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a40abb6/src/java/org/apache/cassandra/schema/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/MigrationManager.java b/src/java/org/apache/cassandra/schema/MigrationManager.java
index d8a3b72..50e5c28 100644
--- a/src/java/org/apache/cassandra/schema/MigrationManager.java
+++ b/src/java/org/apache/cassandra/schema/MigrationManager.java
@@ -61,10 +61,9 @@ public class MigrationManager
public static void scheduleSchemaPull(InetAddress endpoint, EndpointState state)
{
- VersionedValue value = state.getApplicationState(ApplicationState.SCHEMA);
-
- if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && value != null)
- maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint);
+ UUID schemaVersion = state.getSchemaVersion();
+ if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && schemaVersion != null)
+ maybeScheduleSchemaPull(schemaVersion, endpoint);
}
/**
@@ -73,16 +72,34 @@ public class MigrationManager
*/
private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint)
{
- if ((Schema.instance.getVersion() != null && Schema.instance.getVersion().equals(theirVersion)) || !shouldPullSchemaFrom(endpoint))
+ if (Schema.instance.getVersion() == null)
+ {
+ logger.debug("Not pulling schema from {}, because local schama version is not known yet",
+ endpoint);
+ return;
+ }
+ if (Schema.instance.isSameVersion(theirVersion))
+ {
+ logger.debug("Not pulling schema from {}, because schema versions match ({})",
+ endpoint,
+ Schema.schemaVersionToString(theirVersion));
+ return;
+ }
+ if (!shouldPullSchemaFrom(endpoint))
{
- logger.debug("Not pulling schema because versions match or shouldPullSchemaFrom returned false");
+ logger.debug("Not pulling schema from {}, because versions match ({}/{}), or shouldPullSchemaFrom returned false",
+ endpoint, Schema.instance.getVersion(), theirVersion);
return;
}
- if (SchemaConstants.emptyVersion.equals(Schema.instance.getVersion()) || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
+ if (Schema.instance.isEmpty() || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
{
// If we think we may be bootstrapping or have recently started, submit MigrationTask immediately
- logger.debug("Submitting migration task for {}", endpoint);
+ logger.debug("Immediately submitting migration task for {} due to {}, " +
+ "schema versions: local={}, remote={}",
+ endpoint,
+ Schema.schemaVersionToString(Schema.instance.getVersion()),
+ Schema.schemaVersionToString(theirVersion));
submitMigrationTask(endpoint);
}
else
@@ -92,20 +109,21 @@ public class MigrationManager
Runnable runnable = () ->
{
// grab the latest version of the schema since it may have changed again since the initial scheduling
- EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
- if (epState == null)
+ UUID epSchemaVersion = Gossiper.instance.getSchemaVersion(endpoint);
+ if (epSchemaVersion == null)
{
logger.debug("epState vanished for {}, not submitting migration task", endpoint);
return;
}
- VersionedValue value = epState.getApplicationState(ApplicationState.SCHEMA);
- UUID currentVersion = UUID.fromString(value.value);
- if (Schema.instance.getVersion().equals(currentVersion))
+ if (Schema.instance.isSameVersion(epSchemaVersion))
{
- logger.debug("not submitting migration task for {} because our versions match", endpoint);
+ logger.debug("Not submitting migration task for {} because our versions match ({})", endpoint, epSchemaVersion);
return;
}
- logger.debug("submitting migration task for {}", endpoint);
+ logger.debug("Submitting migration task for {}, schema version mismatch: local={}, remote={}",
+ endpoint,
+ Schema.schemaVersionToString(Schema.instance.getVersion()),
+ Schema.schemaVersionToString(epSchemaVersion));
submitMigrationTask(endpoint);
};
ScheduledExecutors.nonPeriodicTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a40abb6/src/java/org/apache/cassandra/schema/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Schema.java b/src/java/org/apache/cassandra/schema/Schema.java
index 711724b..594b2ab 100644
--- a/src/java/org/apache/cassandra/schema/Schema.java
+++ b/src/java/org/apache/cassandra/schema/Schema.java
@@ -524,6 +524,22 @@ public final class Schema
}
/**
+ * Checks whether the given schema version is the same as the current local schema.
+ */
+ public boolean isSameVersion(UUID schemaVersion)
+ {
+ return schemaVersion != null && schemaVersion.equals(version);
+ }
+
+ /**
+ * Checks whether the current schema is empty.
+ */
+ public boolean isEmpty()
+ {
+ return SchemaConstants.emptyVersion.equals(version);
+ }
+
+ /**
* Read schema from system keyspace and calculate MD5 digest of every row, resulting digest
* will be converted into UUID which would act as content-based version of the schema.
*/
@@ -819,4 +835,18 @@ public final class Schema
{
changeListeners.forEach(l -> l.onDropAggregate(udf.name().keyspace, udf.name().name, udf.argTypes()));
}
+
+
+ /**
+ * Converts the given schema version to a string. Returns {@code unknown}, if {@code version} is {@code null}
+ * or {@code "(empty)"}, if {@code version} refers to an {@link SchemaConstants#emptyVersion empty) schema.
+ */
+ public static String schemaVersionToString(UUID version)
+ {
+ return version == null
+ ? "unknown"
+ : SchemaConstants.emptyVersion.equals(version)
+ ? "(empty)"
+ : version.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a40abb6/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 37d163f..89d5358 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -796,9 +796,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
for (int i = 0; i < delay; i += 1000)
{
// if we see schema, we can proceed to the next check directly
- if (!Schema.instance.getVersion().equals(SchemaConstants.emptyVersion))
+ if (!Schema.instance.isEmpty())
{
- logger.debug("got schema: {}", Schema.instance.getVersion());
+ logger.debug("current schema version: {}", Schema.instance.getVersion());
break;
}
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org