You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2017/12/13 13:03:39 UTC

[4/4] cassandra git commit: Bring code improvements of CASSANDRA-14109 into trunk (but not the change itself)

Bring code improvements of CASSANDRA-14109 into trunk (but not the change itself)

patch by Robert Stupp; reviewed by Andrés de la Peña for CASSANDRA-14109


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7a40abb6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7a40abb6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7a40abb6

Branch: refs/heads/trunk
Commit: 7a40abb6a5108688fb1b10c375bb751cbb782ea4
Parents: 3e4e7d9
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed Dec 13 14:00:38 2017 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Dec 13 14:00:38 2017 +0100

----------------------------------------------------------------------
 .../org/apache/cassandra/gms/EndpointState.java | 27 +++++++++--
 src/java/org/apache/cassandra/gms/Gossiper.java | 31 ++++---------
 .../cassandra/hints/HintsDispatchTrigger.java   |  3 +-
 .../cassandra/schema/MigrationManager.java      | 48 ++++++++++++++------
 .../org/apache/cassandra/schema/Schema.java     | 30 ++++++++++++
 .../cassandra/service/StorageService.java       |  4 +-
 6 files changed, 100 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a40abb6/src/java/org/apache/cassandra/gms/EndpointState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java
index 70f2a68..847041f 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -18,18 +18,19 @@
 package org.apache.cassandra.gms;
 
 import java.io.*;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicReference;
 
+import javax.annotation.Nullable;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.CassandraVersion;
+
 /**
  * This abstraction represents both the HeartBeatState and the ApplicationState in an EndpointState
  * instance. Any state for a given endpoint can be retrieved from this instance.
@@ -154,6 +155,24 @@ public class EndpointState
         return pieces[0];
     }
 
+    @Nullable
+    public UUID getSchemaVersion()
+    {
+        VersionedValue applicationState = getApplicationState(ApplicationState.SCHEMA);
+        return applicationState != null
+               ? UUID.fromString(applicationState.value)
+               : null;
+    }
+
+    @Nullable
+    public CassandraVersion getReleaseVersion()
+    {
+        VersionedValue applicationState = getApplicationState(ApplicationState.RELEASE_VERSION);
+        return applicationState != null
+               ? new CassandraVersion(applicationState.value)
+               : null;
+    }
+
     public String toString()
     {
         return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " + applicationState.get();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a40abb6/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 15dad71..e675d92 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -25,6 +25,7 @@ import java.util.Map.Entry;
 import java.util.concurrent.*;
 import java.util.concurrent.locks.ReentrantLock;
 
+import javax.annotation.Nullable;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
@@ -852,20 +853,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         return endpointStateMap.get(ep);
     }
 
-    public boolean valuesEqual(InetAddress ep1, InetAddress ep2, ApplicationState as)
-    {
-        EndpointState state1 = getEndpointStateForEndpoint(ep1);
-        EndpointState state2 = getEndpointStateForEndpoint(ep2);
-
-        if (state1 == null || state2 == null)
-            return false;
-
-        VersionedValue value1 = state1.getApplicationState(as);
-        VersionedValue value2 = state2.getApplicationState(as);
-
-        return !(value1 == null || value2 == null) && value1.value.equals(value2.value);
-    }
-
     public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
     {
         return endpointStateMap.entrySet();
@@ -1624,16 +1611,18 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         return System.currentTimeMillis() + Gossiper.aVeryLongTime;
     }
 
+    @Nullable
     public CassandraVersion getReleaseVersion(InetAddress ep)
     {
         EndpointState state = getEndpointStateForEndpoint(ep);
-        if (state != null)
-        {
-            VersionedValue applicationState = state.getApplicationState(ApplicationState.RELEASE_VERSION);
-            if (applicationState != null)
-                return new CassandraVersion(applicationState.value);
-        }
-        return null;
+        return state != null ? state.getReleaseVersion() : null;
+    }
+
+    @Nullable
+    public UUID getSchemaVersion(InetAddress ep)
+    {
+        EndpointState state = getEndpointStateForEndpoint(ep);
+        return state != null ? state.getSchemaVersion() : null;
     }
 
     public static void waitToSettle()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a40abb6/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java b/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
index 47d986f..34d1eb2 100644
--- a/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
+++ b/src/java/org/apache/cassandra/hints/HintsDispatchTrigger.java
@@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.schema.Schema;
 
 import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddress;
 
@@ -64,7 +65,7 @@ final class HintsDispatchTrigger implements Runnable
                .filter(store -> !isScheduled(store))
                .filter(HintsStore::isLive)
                .filter(store -> store.isWriting() || store.hasFiles())
-               .filter(store -> Gossiper.instance.valuesEqual(getBroadcastAddress(), store.address(), ApplicationState.SCHEMA))
+               .filter(store -> Schema.instance.isSameVersion(Gossiper.instance.getSchemaVersion(store.address())))
                .forEach(this::schedule);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a40abb6/src/java/org/apache/cassandra/schema/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/MigrationManager.java b/src/java/org/apache/cassandra/schema/MigrationManager.java
index d8a3b72..50e5c28 100644
--- a/src/java/org/apache/cassandra/schema/MigrationManager.java
+++ b/src/java/org/apache/cassandra/schema/MigrationManager.java
@@ -61,10 +61,9 @@ public class MigrationManager
 
     public static void scheduleSchemaPull(InetAddress endpoint, EndpointState state)
     {
-        VersionedValue value = state.getApplicationState(ApplicationState.SCHEMA);
-
-        if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && value != null)
-            maybeScheduleSchemaPull(UUID.fromString(value.value), endpoint);
+        UUID schemaVersion = state.getSchemaVersion();
+        if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && schemaVersion != null)
+            maybeScheduleSchemaPull(schemaVersion, endpoint);
     }
 
     /**
@@ -73,16 +72,34 @@ public class MigrationManager
      */
     private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint)
     {
-        if ((Schema.instance.getVersion() != null && Schema.instance.getVersion().equals(theirVersion)) || !shouldPullSchemaFrom(endpoint))
+        if (Schema.instance.getVersion() == null)
+        {
+            logger.debug("Not pulling schema from {}, because local schama version is not known yet",
+                         endpoint);
+            return;
+        }
+        if (Schema.instance.isSameVersion(theirVersion))
+        {
+            logger.debug("Not pulling schema from {}, because schema versions match ({})",
+                         endpoint,
+                         Schema.schemaVersionToString(theirVersion));
+            return;
+        }
+        if (!shouldPullSchemaFrom(endpoint))
         {
-            logger.debug("Not pulling schema because versions match or shouldPullSchemaFrom returned false");
+            logger.debug("Not pulling schema from {}, because versions match ({}/{}), or shouldPullSchemaFrom returned false",
+                         endpoint, Schema.instance.getVersion(), theirVersion);
             return;
         }
 
-        if (SchemaConstants.emptyVersion.equals(Schema.instance.getVersion()) || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
+        if (Schema.instance.isEmpty() || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
         {
             // If we think we may be bootstrapping or have recently started, submit MigrationTask immediately
-            logger.debug("Submitting migration task for {}", endpoint);
+            logger.debug("Immediately submitting migration task for {} due to {}, " +
+                         "schema versions: local={}, remote={}",
+                         endpoint,
+                         Schema.schemaVersionToString(Schema.instance.getVersion()),
+                         Schema.schemaVersionToString(theirVersion));
             submitMigrationTask(endpoint);
         }
         else
@@ -92,20 +109,21 @@ public class MigrationManager
             Runnable runnable = () ->
             {
                 // grab the latest version of the schema since it may have changed again since the initial scheduling
-                EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
-                if (epState == null)
+                UUID epSchemaVersion = Gossiper.instance.getSchemaVersion(endpoint);
+                if (epSchemaVersion == null)
                 {
                     logger.debug("epState vanished for {}, not submitting migration task", endpoint);
                     return;
                 }
-                VersionedValue value = epState.getApplicationState(ApplicationState.SCHEMA);
-                UUID currentVersion = UUID.fromString(value.value);
-                if (Schema.instance.getVersion().equals(currentVersion))
+                if (Schema.instance.isSameVersion(epSchemaVersion))
                 {
-                    logger.debug("not submitting migration task for {} because our versions match", endpoint);
+                    logger.debug("Not submitting migration task for {} because our versions match ({})", endpoint, epSchemaVersion);
                     return;
                 }
-                logger.debug("submitting migration task for {}", endpoint);
+                logger.debug("Submitting migration task for {}, schema version mismatch: local={}, remote={}",
+                             endpoint,
+                             Schema.schemaVersionToString(Schema.instance.getVersion()),
+                             Schema.schemaVersionToString(epSchemaVersion));
                 submitMigrationTask(endpoint);
             };
             ScheduledExecutors.nonPeriodicTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a40abb6/src/java/org/apache/cassandra/schema/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Schema.java b/src/java/org/apache/cassandra/schema/Schema.java
index 711724b..594b2ab 100644
--- a/src/java/org/apache/cassandra/schema/Schema.java
+++ b/src/java/org/apache/cassandra/schema/Schema.java
@@ -524,6 +524,22 @@ public final class Schema
     }
 
     /**
+     * Checks whether the given schema version is the same as the current local schema.
+     */
+    public boolean isSameVersion(UUID schemaVersion)
+    {
+        return schemaVersion != null && schemaVersion.equals(version);
+    }
+
+    /**
+     * Checks whether the current schema is empty.
+     */
+    public boolean isEmpty()
+    {
+        return SchemaConstants.emptyVersion.equals(version);
+    }
+
+    /**
      * Read schema from system keyspace and calculate MD5 digest of every row, resulting digest
      * will be converted into UUID which would act as content-based version of the schema.
      */
@@ -819,4 +835,18 @@ public final class Schema
     {
         changeListeners.forEach(l -> l.onDropAggregate(udf.name().keyspace, udf.name().name, udf.argTypes()));
     }
+
+
+    /**
+     * Converts the given schema version to a string. Returns {@code unknown}, if {@code version} is {@code null}
+     * or {@code "(empty)"}, if {@code version} refers to an {@link SchemaConstants#emptyVersion empty) schema.
+     */
+    public static String schemaVersionToString(UUID version)
+    {
+        return version == null
+               ? "unknown"
+               : SchemaConstants.emptyVersion.equals(version)
+                 ? "(empty)"
+                 : version.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a40abb6/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 37d163f..89d5358 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -796,9 +796,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         for (int i = 0; i < delay; i += 1000)
         {
             // if we see schema, we can proceed to the next check directly
-            if (!Schema.instance.getVersion().equals(SchemaConstants.emptyVersion))
+            if (!Schema.instance.isEmpty())
             {
-                logger.debug("got schema: {}", Schema.instance.getVersion());
+                logger.debug("current schema version: {}", Schema.instance.getVersion());
                 break;
             }
             Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org