You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2018/01/19 14:20:45 UTC

[1/3] cassandra git commit: Add Unittest for schema migration fix

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.11 d0b7566ff -> e7b9c1f50
  refs/heads/trunk 1cb91eaaa -> 716d08f90


Add Unittest for schema migration fix

patch by Jay Zhuang; reviewed by jasobrown for CASSANDRA-14140


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e7b9c1f5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e7b9c1f5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e7b9c1f5

Branch: refs/heads/cassandra-3.11
Commit: e7b9c1f50a9875682b480a3ab69e662f4b097d4d
Parents: d0b7566
Author: Jay Zhuang <ja...@yahoo.com>
Authored: Wed Dec 27 15:25:36 2017 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Fri Jan 19 06:17:51 2018 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/gms/VersionedValue.java    |   9 +-
 .../apache/cassandra/schema/SchemaKeyspace.java |  19 ++--
 .../cassandra/service/StorageService.java       |  25 -----
 .../org/apache/cassandra/gms/GossiperTest.java  |  69 ++++++++++++-
 .../cassandra/schema/SchemaKeyspaceTest.java    | 102 +++++++++++++++++++
 6 files changed, 188 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b9c1f5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c06ad0d..5e4c40a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.11.2
+ * Add Unittest for schema migration fix (CASSANDRA-14140)
  * Print correct snitch info from nodetool describecluster (CASSANDRA-13528)
  * Close socket on error during connect on OutboundTcpConnection (CASSANDRA-9630)
  * Enable CDC unittest (CASSANDRA-14141)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b9c1f5/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java
index d9c8d0b..0ec1712 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -24,6 +24,7 @@ import java.util.UUID;
 
 import static java.nio.charset.StandardCharsets.ISO_8859_1;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.db.TypeSizes;
@@ -250,7 +251,13 @@ public class VersionedValue implements Comparable<VersionedValue>
 
         public VersionedValue releaseVersion()
         {
-            return new VersionedValue(FBUtilities.getReleaseVersionString());
+            return releaseVersion(FBUtilities.getReleaseVersionString());
+        }
+
+        @VisibleForTesting
+        public VersionedValue releaseVersion(String version)
+        {
+            return new VersionedValue(version);
         }
 
         public VersionedValue networkVersion()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b9c1f5/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index b6add96..164e32d 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -25,6 +25,7 @@ import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.*;
 import com.google.common.collect.Maps;
 import org.slf4j.Logger;
@@ -109,7 +110,7 @@ public final class SchemaKeyspace
      * for digest calculations, otherwise the nodes will never agree on the schema during a rolling upgrade, see CASSANDRA-13559.
      */
     public static final ImmutableList<String> ALL_FOR_DIGEST =
-        ImmutableList.of(KEYSPACES, TABLES, COLUMNS, DROPPED_COLUMNS, TRIGGERS, VIEWS, TYPES, FUNCTIONS, AGGREGATES, INDEXES);
+        ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, VIEWS, TYPES, FUNCTIONS, AGGREGATES, INDEXES);
 
     private static final CFMetaData Keyspaces =
         compile(KEYSPACES,
@@ -317,6 +318,14 @@ public final class SchemaKeyspace
      */
     public static Pair<UUID, UUID> calculateSchemaDigest()
     {
+        Set<ByteBuffer> cdc = Collections.singleton(ByteBufferUtil.bytes("cdc"));
+
+        return calculateSchemaDigest(cdc);
+    }
+
+    @VisibleForTesting
+    static Pair<UUID, UUID> calculateSchemaDigest(Set<ByteBuffer> columnsToExclude)
+    {
         MessageDigest digest;
         MessageDigest digest30;
         try
@@ -328,15 +337,9 @@ public final class SchemaKeyspace
         {
             throw new RuntimeException(e);
         }
-        Set<ByteBuffer> cdc = Collections.singleton(ByteBufferUtil.bytes("cdc"));
 
         for (String table : ALL_FOR_DIGEST)
         {
-            // Due to CASSANDRA-11050 we want to exclude DROPPED_COLUMNS for schema digest computation. We can and
-            // should remove that in the next major release (so C* 4.0).
-            if (table.equals(DROPPED_COLUMNS))
-                continue;
-
             ReadCommand cmd = getReadCommandForTableSchema(table);
             try (ReadExecutionController executionController = cmd.executionController();
                  PartitionIterator schema = cmd.executeInternal(executionController))
@@ -347,7 +350,7 @@ public final class SchemaKeyspace
                     {
                         if (!isSystemKeyspaceSchemaPartition(partition.partitionKey()))
                         {
-                            RowIterators.digest(partition, digest, digest30, cdc);
+                            RowIterators.digest(partition, digest, digest30, columnsToExclude);
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b9c1f5/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index c5e2912..15027b2 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -831,31 +831,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
     }
 
-    public void waitForSchema(int delay)
-    {
-        logger.debug("Waiting for schema (max {} seconds)", delay);
-        // first sleep the delay to make sure we see all our peers
-        for (int i = 0; i < delay; i += 1000)
-        {
-            // if we see schema, we can proceed to the next check directly
-            if (!Schema.instance.isEmpty())
-            {
-                logger.debug("current schema version: {} (3.0 compatible: {})", Schema.instance.getRealVersion(), Schema.instance.getAltVersion());
-                break;
-            }
-            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
-        }
-        // if our schema hasn't matched yet, wait until it has
-        // we do this by waiting for all in-flight migration requests and responses to complete
-        // (post CASSANDRA-1391 we don't expect this to be necessary very often, but it doesn't hurt to be careful)
-        if (!MigrationManager.isReadyForBootstrap())
-        {
-            setMode(Mode.JOINING, "waiting for schema information to complete", true);
-            MigrationManager.waitUntilReadyForBootstrap();
-        }
-        logger.info("Has schema with version {}", Schema.instance.getVersion());
-    }
-
     private void joinTokenRing(int delay) throws ConfigurationException
     {
         joined = true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b9c1f5/test/unit/org/apache/cassandra/gms/GossiperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/GossiperTest.java b/test/unit/org/apache/cassandra/gms/GossiperTest.java
index 83f12d1..def0530 100644
--- a/test/unit/org/apache/cassandra/gms/GossiperTest.java
+++ b/test/unit/org/apache/cassandra/gms/GossiperTest.java
@@ -26,24 +26,37 @@ import java.util.UUID;
 
 import com.google.common.collect.ImmutableMap;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.StorageService;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class GossiperTest
 {
-    static
+    @BeforeClass
+    public static void before()
     {
         DatabaseDescriptor.daemonInitialization();
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace("schema_test_ks",
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD("schema_test_ks", "schema_test_cf"));
     }
+
     static final IPartitioner partitioner = new RandomPartitioner();
     StorageService ss = StorageService.instance;
     TokenMetadata tmd = StorageService.instance.getTokenMetadata();
@@ -56,10 +69,10 @@ public class GossiperTest
     public void setup()
     {
         tmd.clearUnsafe();
-    };
+    }
 
     @Test
-    public void testLargeGenerationJump() throws UnknownHostException, InterruptedException
+    public void testLargeGenerationJump() throws UnknownHostException
     {
         Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2);
         InetAddress remoteHostAddress = hosts.get(1);
@@ -90,4 +103,54 @@ public class GossiperTest
         //The generation should not have been updated because it is over Gossiper.MAX_GENERATION_DIFFERENCE in the future
         assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
     }
+
+    @Test
+    public void testSchemaVersionUpdate() throws UnknownHostException, InterruptedException
+    {
+        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2);
+        MessagingService.instance().listen();
+        Gossiper.instance.start(1);
+        InetAddress remoteHostAddress = hosts.get(1);
+
+        EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
+        // Set to any 3.0 version
+        Gossiper.instance.injectApplicationState(remoteHostAddress, ApplicationState.RELEASE_VERSION, StorageService.instance.valueFactory.releaseVersion("3.0.14"));
+
+        Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, initialRemoteState));
+
+        // wait until the schema is set
+        VersionedValue schema = null;
+        for (int i = 0; i < 10; i++)
+        {
+            EndpointState localState = Gossiper.instance.getEndpointStateForEndpoint(hosts.get(0));
+            schema = localState.getApplicationState(ApplicationState.SCHEMA);
+            if (schema != null)
+                break;
+            Thread.sleep(1000);
+        }
+
+        // schema is set and equals to "alternative" version
+        assertTrue(schema != null);
+        assertEquals(schema.value, Schema.instance.getAltVersion().toString());
+
+        // Upgrade remote host version to the latest one (3.11)
+        Gossiper.instance.injectApplicationState(remoteHostAddress, ApplicationState.RELEASE_VERSION, StorageService.instance.valueFactory.releaseVersion());
+
+        Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, initialRemoteState));
+
+        // wait until the schema change
+        VersionedValue newSchema = null;
+        for (int i = 0; i < 10; i++)
+        {
+            EndpointState localState = Gossiper.instance.getEndpointStateForEndpoint(hosts.get(0));
+            newSchema = localState.getApplicationState(ApplicationState.SCHEMA);
+            if (!schema.value.equals(newSchema.value))
+                break;
+            Thread.sleep(1000);
+        }
+
+        // schema is changed and equals to real version
+        assertFalse(schema.value.equals(newSchema.value));
+        assertEquals(newSchema.value, Schema.instance.getRealVersion().toString());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b9c1f5/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java b/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
index 2f83498..550deed 100644
--- a/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
@@ -21,10 +21,12 @@ package org.apache.cassandra.schema;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 
 import com.google.common.collect.ImmutableMap;
 
@@ -34,6 +36,7 @@ import org.junit.Test;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.config.SchemaConstants;
 import org.apache.cassandra.cql3.QueryProcessor;
@@ -46,14 +49,19 @@ import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.thrift.CfDef;
 import org.apache.cassandra.thrift.ColumnDef;
 import org.apache.cassandra.thrift.IndexType;
 import org.apache.cassandra.thrift.ThriftConversion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class SchemaKeyspaceTest
@@ -206,4 +214,98 @@ public class SchemaKeyspaceTest
         assertEquals(cfm.params, params);
         assertEquals(new HashSet<>(cfm.allColumns()), columns);
     }
+
+    private static boolean hasCDC(Mutation m)
+    {
+        for (PartitionUpdate p : m.getPartitionUpdates())
+        {
+            for (ColumnDefinition cd : p.columns())
+            {
+                if (cd.name.toString().equals("cdc"))
+                    return true;
+            }
+        }
+        return false;
+    }
+
+    private static boolean hasSchemaTables(Mutation m)
+    {
+        for (PartitionUpdate p : m.getPartitionUpdates())
+        {
+            if (p.metadata().cfName.equals(SchemaKeyspace.TABLES))
+                return true;
+        }
+        return false;
+    }
+
+    @Test
+    public void testConvertSchemaToMutationsWithoutCDC() throws IOException
+    {
+        boolean oldCDCOption = DatabaseDescriptor.isCDCEnabled();
+        try
+        {
+            DatabaseDescriptor.setCDCEnabled(false);
+            Collection<Mutation> mutations = SchemaKeyspace.convertSchemaToMutations();
+            boolean foundTables = false;
+            for (Mutation m : mutations)
+            {
+                if (hasSchemaTables(m))
+                {
+                    foundTables = true;
+                    assertFalse(hasCDC(m));
+                    try (DataOutputBuffer output = new DataOutputBuffer())
+                    {
+                        Mutation.serializer.serialize(m, output, MessagingService.current_version);
+                        try (DataInputBuffer input = new DataInputBuffer(output.getData()))
+                        {
+                            Mutation out = Mutation.serializer.deserialize(input, MessagingService.current_version);
+                            assertFalse(hasCDC(out));
+                        }
+                    }
+                }
+            }
+            assertTrue(foundTables);
+        }
+        finally
+        {
+            DatabaseDescriptor.setCDCEnabled(oldCDCOption);
+        }
+    }
+
+    @Test
+    public void testConvertSchemaToMutationsWithCDC()
+    {
+        boolean oldCDCOption = DatabaseDescriptor.isCDCEnabled();
+        try
+        {
+            DatabaseDescriptor.setCDCEnabled(true);
+            Collection<Mutation> mutations = SchemaKeyspace.convertSchemaToMutations();
+            boolean foundTables = false;
+            for (Mutation m : mutations)
+            {
+                if (hasSchemaTables(m))
+                {
+                    foundTables = true;
+                    assertTrue(hasCDC(m));
+                }
+            }
+            assertTrue(foundTables);
+        }
+        finally
+        {
+            DatabaseDescriptor.setCDCEnabled(oldCDCOption);
+        }
+    }
+
+    @Test
+    public void testSchemaDigest()
+    {
+        Set<ByteBuffer> abc = Collections.singleton(ByteBufferUtil.bytes("abc"));
+        Pair<UUID, UUID> versions = SchemaKeyspace.calculateSchemaDigest(abc);
+        assertTrue(versions.left.equals(versions.right));
+
+        Set<ByteBuffer> cdc = Collections.singleton(ByteBufferUtil.bytes("cdc"));
+        versions = SchemaKeyspace.calculateSchemaDigest(cdc);
+        assertFalse(versions.left.equals(versions.right));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[2/3] cassandra git commit: Add Unittest for schema migration fix

Posted by ja...@apache.org.
Add Unittest for schema migration fix

patch by Jay Zhuang; reviewed by jasobrown for CASSANDRA-14140


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e7b9c1f5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e7b9c1f5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e7b9c1f5

Branch: refs/heads/trunk
Commit: e7b9c1f50a9875682b480a3ab69e662f4b097d4d
Parents: d0b7566
Author: Jay Zhuang <ja...@yahoo.com>
Authored: Wed Dec 27 15:25:36 2017 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Fri Jan 19 06:17:51 2018 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/gms/VersionedValue.java    |   9 +-
 .../apache/cassandra/schema/SchemaKeyspace.java |  19 ++--
 .../cassandra/service/StorageService.java       |  25 -----
 .../org/apache/cassandra/gms/GossiperTest.java  |  69 ++++++++++++-
 .../cassandra/schema/SchemaKeyspaceTest.java    | 102 +++++++++++++++++++
 6 files changed, 188 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b9c1f5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c06ad0d..5e4c40a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.11.2
+ * Add Unittest for schema migration fix (CASSANDRA-14140)
  * Print correct snitch info from nodetool describecluster (CASSANDRA-13528)
  * Close socket on error during connect on OutboundTcpConnection (CASSANDRA-9630)
  * Enable CDC unittest (CASSANDRA-14141)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b9c1f5/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java
index d9c8d0b..0ec1712 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -24,6 +24,7 @@ import java.util.UUID;
 
 import static java.nio.charset.StandardCharsets.ISO_8859_1;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.db.TypeSizes;
@@ -250,7 +251,13 @@ public class VersionedValue implements Comparable<VersionedValue>
 
         public VersionedValue releaseVersion()
         {
-            return new VersionedValue(FBUtilities.getReleaseVersionString());
+            return releaseVersion(FBUtilities.getReleaseVersionString());
+        }
+
+        @VisibleForTesting
+        public VersionedValue releaseVersion(String version)
+        {
+            return new VersionedValue(version);
         }
 
         public VersionedValue networkVersion()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b9c1f5/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index b6add96..164e32d 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -25,6 +25,7 @@ import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.*;
 import com.google.common.collect.Maps;
 import org.slf4j.Logger;
@@ -109,7 +110,7 @@ public final class SchemaKeyspace
      * for digest calculations, otherwise the nodes will never agree on the schema during a rolling upgrade, see CASSANDRA-13559.
      */
     public static final ImmutableList<String> ALL_FOR_DIGEST =
-        ImmutableList.of(KEYSPACES, TABLES, COLUMNS, DROPPED_COLUMNS, TRIGGERS, VIEWS, TYPES, FUNCTIONS, AGGREGATES, INDEXES);
+        ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, VIEWS, TYPES, FUNCTIONS, AGGREGATES, INDEXES);
 
     private static final CFMetaData Keyspaces =
         compile(KEYSPACES,
@@ -317,6 +318,14 @@ public final class SchemaKeyspace
      */
     public static Pair<UUID, UUID> calculateSchemaDigest()
     {
+        Set<ByteBuffer> cdc = Collections.singleton(ByteBufferUtil.bytes("cdc"));
+
+        return calculateSchemaDigest(cdc);
+    }
+
+    @VisibleForTesting
+    static Pair<UUID, UUID> calculateSchemaDigest(Set<ByteBuffer> columnsToExclude)
+    {
         MessageDigest digest;
         MessageDigest digest30;
         try
@@ -328,15 +337,9 @@ public final class SchemaKeyspace
         {
             throw new RuntimeException(e);
         }
-        Set<ByteBuffer> cdc = Collections.singleton(ByteBufferUtil.bytes("cdc"));
 
         for (String table : ALL_FOR_DIGEST)
         {
-            // Due to CASSANDRA-11050 we want to exclude DROPPED_COLUMNS for schema digest computation. We can and
-            // should remove that in the next major release (so C* 4.0).
-            if (table.equals(DROPPED_COLUMNS))
-                continue;
-
             ReadCommand cmd = getReadCommandForTableSchema(table);
             try (ReadExecutionController executionController = cmd.executionController();
                  PartitionIterator schema = cmd.executeInternal(executionController))
@@ -347,7 +350,7 @@ public final class SchemaKeyspace
                     {
                         if (!isSystemKeyspaceSchemaPartition(partition.partitionKey()))
                         {
-                            RowIterators.digest(partition, digest, digest30, cdc);
+                            RowIterators.digest(partition, digest, digest30, columnsToExclude);
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b9c1f5/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index c5e2912..15027b2 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -831,31 +831,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
     }
 
-    public void waitForSchema(int delay)
-    {
-        logger.debug("Waiting for schema (max {} seconds)", delay);
-        // first sleep the delay to make sure we see all our peers
-        for (int i = 0; i < delay; i += 1000)
-        {
-            // if we see schema, we can proceed to the next check directly
-            if (!Schema.instance.isEmpty())
-            {
-                logger.debug("current schema version: {} (3.0 compatible: {})", Schema.instance.getRealVersion(), Schema.instance.getAltVersion());
-                break;
-            }
-            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
-        }
-        // if our schema hasn't matched yet, wait until it has
-        // we do this by waiting for all in-flight migration requests and responses to complete
-        // (post CASSANDRA-1391 we don't expect this to be necessary very often, but it doesn't hurt to be careful)
-        if (!MigrationManager.isReadyForBootstrap())
-        {
-            setMode(Mode.JOINING, "waiting for schema information to complete", true);
-            MigrationManager.waitUntilReadyForBootstrap();
-        }
-        logger.info("Has schema with version {}", Schema.instance.getVersion());
-    }
-
     private void joinTokenRing(int delay) throws ConfigurationException
     {
         joined = true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b9c1f5/test/unit/org/apache/cassandra/gms/GossiperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/GossiperTest.java b/test/unit/org/apache/cassandra/gms/GossiperTest.java
index 83f12d1..def0530 100644
--- a/test/unit/org/apache/cassandra/gms/GossiperTest.java
+++ b/test/unit/org/apache/cassandra/gms/GossiperTest.java
@@ -26,24 +26,37 @@ import java.util.UUID;
 
 import com.google.common.collect.ImmutableMap;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.StorageService;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class GossiperTest
 {
-    static
+    @BeforeClass
+    public static void before()
     {
         DatabaseDescriptor.daemonInitialization();
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace("schema_test_ks",
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD("schema_test_ks", "schema_test_cf"));
     }
+
     static final IPartitioner partitioner = new RandomPartitioner();
     StorageService ss = StorageService.instance;
     TokenMetadata tmd = StorageService.instance.getTokenMetadata();
@@ -56,10 +69,10 @@ public class GossiperTest
     public void setup()
     {
         tmd.clearUnsafe();
-    };
+    }
 
     @Test
-    public void testLargeGenerationJump() throws UnknownHostException, InterruptedException
+    public void testLargeGenerationJump() throws UnknownHostException
     {
         Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2);
         InetAddress remoteHostAddress = hosts.get(1);
@@ -90,4 +103,54 @@ public class GossiperTest
         //The generation should not have been updated because it is over Gossiper.MAX_GENERATION_DIFFERENCE in the future
         assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
     }
+
+    @Test
+    public void testSchemaVersionUpdate() throws UnknownHostException, InterruptedException
+    {
+        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2);
+        MessagingService.instance().listen();
+        Gossiper.instance.start(1);
+        InetAddress remoteHostAddress = hosts.get(1);
+
+        EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
+        // Set to any 3.0 version
+        Gossiper.instance.injectApplicationState(remoteHostAddress, ApplicationState.RELEASE_VERSION, StorageService.instance.valueFactory.releaseVersion("3.0.14"));
+
+        Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, initialRemoteState));
+
+        // wait until the schema is set
+        VersionedValue schema = null;
+        for (int i = 0; i < 10; i++)
+        {
+            EndpointState localState = Gossiper.instance.getEndpointStateForEndpoint(hosts.get(0));
+            schema = localState.getApplicationState(ApplicationState.SCHEMA);
+            if (schema != null)
+                break;
+            Thread.sleep(1000);
+        }
+
+        // schema is set and equals to "alternative" version
+        assertTrue(schema != null);
+        assertEquals(schema.value, Schema.instance.getAltVersion().toString());
+
+        // Upgrade remote host version to the latest one (3.11)
+        Gossiper.instance.injectApplicationState(remoteHostAddress, ApplicationState.RELEASE_VERSION, StorageService.instance.valueFactory.releaseVersion());
+
+        Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, initialRemoteState));
+
+        // wait until the schema change
+        VersionedValue newSchema = null;
+        for (int i = 0; i < 10; i++)
+        {
+            EndpointState localState = Gossiper.instance.getEndpointStateForEndpoint(hosts.get(0));
+            newSchema = localState.getApplicationState(ApplicationState.SCHEMA);
+            if (!schema.value.equals(newSchema.value))
+                break;
+            Thread.sleep(1000);
+        }
+
+        // schema is changed and equals to real version
+        assertFalse(schema.value.equals(newSchema.value));
+        assertEquals(newSchema.value, Schema.instance.getRealVersion().toString());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b9c1f5/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java b/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
index 2f83498..550deed 100644
--- a/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
@@ -21,10 +21,12 @@ package org.apache.cassandra.schema;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 
 import com.google.common.collect.ImmutableMap;
 
@@ -34,6 +36,7 @@ import org.junit.Test;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.config.SchemaConstants;
 import org.apache.cassandra.cql3.QueryProcessor;
@@ -46,14 +49,19 @@ import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.thrift.CfDef;
 import org.apache.cassandra.thrift.ColumnDef;
 import org.apache.cassandra.thrift.IndexType;
 import org.apache.cassandra.thrift.ThriftConversion;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class SchemaKeyspaceTest
@@ -206,4 +214,98 @@ public class SchemaKeyspaceTest
         assertEquals(cfm.params, params);
         assertEquals(new HashSet<>(cfm.allColumns()), columns);
     }
+
+    private static boolean hasCDC(Mutation m)
+    {
+        for (PartitionUpdate p : m.getPartitionUpdates())
+        {
+            for (ColumnDefinition cd : p.columns())
+            {
+                if (cd.name.toString().equals("cdc"))
+                    return true;
+            }
+        }
+        return false;
+    }
+
+    private static boolean hasSchemaTables(Mutation m)
+    {
+        for (PartitionUpdate p : m.getPartitionUpdates())
+        {
+            if (p.metadata().cfName.equals(SchemaKeyspace.TABLES))
+                return true;
+        }
+        return false;
+    }
+
+    @Test
+    public void testConvertSchemaToMutationsWithoutCDC() throws IOException
+    {
+        boolean oldCDCOption = DatabaseDescriptor.isCDCEnabled();
+        try
+        {
+            DatabaseDescriptor.setCDCEnabled(false);
+            Collection<Mutation> mutations = SchemaKeyspace.convertSchemaToMutations();
+            boolean foundTables = false;
+            for (Mutation m : mutations)
+            {
+                if (hasSchemaTables(m))
+                {
+                    foundTables = true;
+                    assertFalse(hasCDC(m));
+                    try (DataOutputBuffer output = new DataOutputBuffer())
+                    {
+                        Mutation.serializer.serialize(m, output, MessagingService.current_version);
+                        try (DataInputBuffer input = new DataInputBuffer(output.getData()))
+                        {
+                            Mutation out = Mutation.serializer.deserialize(input, MessagingService.current_version);
+                            assertFalse(hasCDC(out));
+                        }
+                    }
+                }
+            }
+            assertTrue(foundTables);
+        }
+        finally
+        {
+            DatabaseDescriptor.setCDCEnabled(oldCDCOption);
+        }
+    }
+
+    @Test
+    public void testConvertSchemaToMutationsWithCDC()
+    {
+        boolean oldCDCOption = DatabaseDescriptor.isCDCEnabled();
+        try
+        {
+            DatabaseDescriptor.setCDCEnabled(true);
+            Collection<Mutation> mutations = SchemaKeyspace.convertSchemaToMutations();
+            boolean foundTables = false;
+            for (Mutation m : mutations)
+            {
+                if (hasSchemaTables(m))
+                {
+                    foundTables = true;
+                    assertTrue(hasCDC(m));
+                }
+            }
+            assertTrue(foundTables);
+        }
+        finally
+        {
+            DatabaseDescriptor.setCDCEnabled(oldCDCOption);
+        }
+    }
+
+    @Test
+    public void testSchemaDigest()
+    {
+        Set<ByteBuffer> abc = Collections.singleton(ByteBufferUtil.bytes("abc"));
+        Pair<UUID, UUID> versions = SchemaKeyspace.calculateSchemaDigest(abc);
+        assertTrue(versions.left.equals(versions.right));
+
+        Set<ByteBuffer> cdc = Collections.singleton(ByteBufferUtil.bytes("cdc"));
+        versions = SchemaKeyspace.calculateSchemaDigest(cdc);
+        assertFalse(versions.left.equals(versions.right));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[3/3] cassandra git commit: Merge branch 'cassandra-3.11' into trunk

Posted by ja...@apache.org.
Merge branch 'cassandra-3.11' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/716d08f9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/716d08f9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/716d08f9

Branch: refs/heads/trunk
Commit: 716d08f9090deb4c0c48096b5044186d270feb45
Parents: 1cb91ea e7b9c1f
Author: Jason Brown <ja...@gmail.com>
Authored: Fri Jan 19 06:20:11 2018 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Fri Jan 19 06:20:11 2018 -0800

----------------------------------------------------------------------

----------------------------------------------------------------------



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org