You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2018/01/19 14:20:45 UTC
[1/3] cassandra git commit: Add Unittest for schema migration fix
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.11 d0b7566ff -> e7b9c1f50
refs/heads/trunk 1cb91eaaa -> 716d08f90
Add Unittest for schema migration fix
patch by Jay Zhuang; reviewed by jasobrown for CASSANDRA-14140
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e7b9c1f5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e7b9c1f5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e7b9c1f5
Branch: refs/heads/cassandra-3.11
Commit: e7b9c1f50a9875682b480a3ab69e662f4b097d4d
Parents: d0b7566
Author: Jay Zhuang <ja...@yahoo.com>
Authored: Wed Dec 27 15:25:36 2017 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Fri Jan 19 06:17:51 2018 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/gms/VersionedValue.java | 9 +-
.../apache/cassandra/schema/SchemaKeyspace.java | 19 ++--
.../cassandra/service/StorageService.java | 25 -----
.../org/apache/cassandra/gms/GossiperTest.java | 69 ++++++++++++-
.../cassandra/schema/SchemaKeyspaceTest.java | 102 +++++++++++++++++++
6 files changed, 188 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b9c1f5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c06ad0d..5e4c40a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.11.2
+ * Add Unittest for schema migration fix (CASSANDRA-14140)
* Print correct snitch info from nodetool describecluster (CASSANDRA-13528)
* Close socket on error during connect on OutboundTcpConnection (CASSANDRA-9630)
* Enable CDC unittest (CASSANDRA-14141)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b9c1f5/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java
index d9c8d0b..0ec1712 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -24,6 +24,7 @@ import java.util.UUID;
import static java.nio.charset.StandardCharsets.ISO_8859_1;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import org.apache.cassandra.db.TypeSizes;
@@ -250,7 +251,13 @@ public class VersionedValue implements Comparable<VersionedValue>
public VersionedValue releaseVersion()
{
- return new VersionedValue(FBUtilities.getReleaseVersionString());
+ return releaseVersion(FBUtilities.getReleaseVersionString());
+ }
+
+ @VisibleForTesting
+ public VersionedValue releaseVersion(String version)
+ {
+ return new VersionedValue(version);
}
public VersionedValue networkVersion()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b9c1f5/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index b6add96..164e32d 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -25,6 +25,7 @@ import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.*;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
@@ -109,7 +110,7 @@ public final class SchemaKeyspace
* for digest calculations, otherwise the nodes will never agree on the schema during a rolling upgrade, see CASSANDRA-13559.
*/
public static final ImmutableList<String> ALL_FOR_DIGEST =
- ImmutableList.of(KEYSPACES, TABLES, COLUMNS, DROPPED_COLUMNS, TRIGGERS, VIEWS, TYPES, FUNCTIONS, AGGREGATES, INDEXES);
+ ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, VIEWS, TYPES, FUNCTIONS, AGGREGATES, INDEXES);
private static final CFMetaData Keyspaces =
compile(KEYSPACES,
@@ -317,6 +318,14 @@ public final class SchemaKeyspace
*/
public static Pair<UUID, UUID> calculateSchemaDigest()
{
+ Set<ByteBuffer> cdc = Collections.singleton(ByteBufferUtil.bytes("cdc"));
+
+ return calculateSchemaDigest(cdc);
+ }
+
+ @VisibleForTesting
+ static Pair<UUID, UUID> calculateSchemaDigest(Set<ByteBuffer> columnsToExclude)
+ {
MessageDigest digest;
MessageDigest digest30;
try
@@ -328,15 +337,9 @@ public final class SchemaKeyspace
{
throw new RuntimeException(e);
}
- Set<ByteBuffer> cdc = Collections.singleton(ByteBufferUtil.bytes("cdc"));
for (String table : ALL_FOR_DIGEST)
{
- // Due to CASSANDRA-11050 we want to exclude DROPPED_COLUMNS for schema digest computation. We can and
- // should remove that in the next major release (so C* 4.0).
- if (table.equals(DROPPED_COLUMNS))
- continue;
-
ReadCommand cmd = getReadCommandForTableSchema(table);
try (ReadExecutionController executionController = cmd.executionController();
PartitionIterator schema = cmd.executeInternal(executionController))
@@ -347,7 +350,7 @@ public final class SchemaKeyspace
{
if (!isSystemKeyspaceSchemaPartition(partition.partitionKey()))
{
- RowIterators.digest(partition, digest, digest30, cdc);
+ RowIterators.digest(partition, digest, digest30, columnsToExclude);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b9c1f5/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index c5e2912..15027b2 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -831,31 +831,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
}
- public void waitForSchema(int delay)
- {
- logger.debug("Waiting for schema (max {} seconds)", delay);
- // first sleep the delay to make sure we see all our peers
- for (int i = 0; i < delay; i += 1000)
- {
- // if we see schema, we can proceed to the next check directly
- if (!Schema.instance.isEmpty())
- {
- logger.debug("current schema version: {} (3.0 compatible: {})", Schema.instance.getRealVersion(), Schema.instance.getAltVersion());
- break;
- }
- Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
- }
- // if our schema hasn't matched yet, wait until it has
- // we do this by waiting for all in-flight migration requests and responses to complete
- // (post CASSANDRA-1391 we don't expect this to be necessary very often, but it doesn't hurt to be careful)
- if (!MigrationManager.isReadyForBootstrap())
- {
- setMode(Mode.JOINING, "waiting for schema information to complete", true);
- MigrationManager.waitUntilReadyForBootstrap();
- }
- logger.info("Has schema with version {}", Schema.instance.getVersion());
- }
-
private void joinTokenRing(int delay) throws ConfigurationException
{
joined = true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b9c1f5/test/unit/org/apache/cassandra/gms/GossiperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/GossiperTest.java b/test/unit/org/apache/cassandra/gms/GossiperTest.java
index 83f12d1..def0530 100644
--- a/test/unit/org/apache/cassandra/gms/GossiperTest.java
+++ b/test/unit/org/apache/cassandra/gms/GossiperTest.java
@@ -26,24 +26,37 @@ import java.util.UUID;
import com.google.common.collect.ImmutableMap;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
+import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.StorageService;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
public class GossiperTest
{
- static
+ @BeforeClass
+ public static void before()
{
DatabaseDescriptor.daemonInitialization();
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace("schema_test_ks",
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD("schema_test_ks", "schema_test_cf"));
}
+
static final IPartitioner partitioner = new RandomPartitioner();
StorageService ss = StorageService.instance;
TokenMetadata tmd = StorageService.instance.getTokenMetadata();
@@ -56,10 +69,10 @@ public class GossiperTest
public void setup()
{
tmd.clearUnsafe();
- };
+ }
@Test
- public void testLargeGenerationJump() throws UnknownHostException, InterruptedException
+ public void testLargeGenerationJump() throws UnknownHostException
{
Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2);
InetAddress remoteHostAddress = hosts.get(1);
@@ -90,4 +103,54 @@ public class GossiperTest
//The generation should not have been updated because it is over Gossiper.MAX_GENERATION_DIFFERENCE in the future
assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
}
+
+ @Test
+ public void testSchemaVersionUpdate() throws UnknownHostException, InterruptedException
+ {
+ Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2);
+ MessagingService.instance().listen();
+ Gossiper.instance.start(1);
+ InetAddress remoteHostAddress = hosts.get(1);
+
+ EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
+ // Set to any 3.0 version
+ Gossiper.instance.injectApplicationState(remoteHostAddress, ApplicationState.RELEASE_VERSION, StorageService.instance.valueFactory.releaseVersion("3.0.14"));
+
+ Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, initialRemoteState));
+
+ // wait until the schema is set
+ VersionedValue schema = null;
+ for (int i = 0; i < 10; i++)
+ {
+ EndpointState localState = Gossiper.instance.getEndpointStateForEndpoint(hosts.get(0));
+ schema = localState.getApplicationState(ApplicationState.SCHEMA);
+ if (schema != null)
+ break;
+ Thread.sleep(1000);
+ }
+
+ // schema is set and equals to "alternative" version
+ assertTrue(schema != null);
+ assertEquals(schema.value, Schema.instance.getAltVersion().toString());
+
+ // Upgrade remote host version to the latest one (3.11)
+ Gossiper.instance.injectApplicationState(remoteHostAddress, ApplicationState.RELEASE_VERSION, StorageService.instance.valueFactory.releaseVersion());
+
+ Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, initialRemoteState));
+
+ // wait until the schema change
+ VersionedValue newSchema = null;
+ for (int i = 0; i < 10; i++)
+ {
+ EndpointState localState = Gossiper.instance.getEndpointStateForEndpoint(hosts.get(0));
+ newSchema = localState.getApplicationState(ApplicationState.SCHEMA);
+ if (!schema.value.equals(newSchema.value))
+ break;
+ Thread.sleep(1000);
+ }
+
+ // schema is changed and equals to real version
+ assertFalse(schema.value.equals(newSchema.value));
+ assertEquals(newSchema.value, Schema.instance.getRealVersion().toString());
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b9c1f5/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java b/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
index 2f83498..550deed 100644
--- a/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
@@ -21,10 +21,12 @@ package org.apache.cassandra.schema;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.UUID;
import com.google.common.collect.ImmutableMap;
@@ -34,6 +36,7 @@ import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.config.SchemaConstants;
import org.apache.cassandra.cql3.QueryProcessor;
@@ -46,14 +49,19 @@ import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.ColumnDef;
import org.apache.cassandra.thrift.IndexType;
import org.apache.cassandra.thrift.ThriftConversion;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class SchemaKeyspaceTest
@@ -206,4 +214,98 @@ public class SchemaKeyspaceTest
assertEquals(cfm.params, params);
assertEquals(new HashSet<>(cfm.allColumns()), columns);
}
+
+ private static boolean hasCDC(Mutation m)
+ {
+ for (PartitionUpdate p : m.getPartitionUpdates())
+ {
+ for (ColumnDefinition cd : p.columns())
+ {
+ if (cd.name.toString().equals("cdc"))
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static boolean hasSchemaTables(Mutation m)
+ {
+ for (PartitionUpdate p : m.getPartitionUpdates())
+ {
+ if (p.metadata().cfName.equals(SchemaKeyspace.TABLES))
+ return true;
+ }
+ return false;
+ }
+
+ @Test
+ public void testConvertSchemaToMutationsWithoutCDC() throws IOException
+ {
+ boolean oldCDCOption = DatabaseDescriptor.isCDCEnabled();
+ try
+ {
+ DatabaseDescriptor.setCDCEnabled(false);
+ Collection<Mutation> mutations = SchemaKeyspace.convertSchemaToMutations();
+ boolean foundTables = false;
+ for (Mutation m : mutations)
+ {
+ if (hasSchemaTables(m))
+ {
+ foundTables = true;
+ assertFalse(hasCDC(m));
+ try (DataOutputBuffer output = new DataOutputBuffer())
+ {
+ Mutation.serializer.serialize(m, output, MessagingService.current_version);
+ try (DataInputBuffer input = new DataInputBuffer(output.getData()))
+ {
+ Mutation out = Mutation.serializer.deserialize(input, MessagingService.current_version);
+ assertFalse(hasCDC(out));
+ }
+ }
+ }
+ }
+ assertTrue(foundTables);
+ }
+ finally
+ {
+ DatabaseDescriptor.setCDCEnabled(oldCDCOption);
+ }
+ }
+
+ @Test
+ public void testConvertSchemaToMutationsWithCDC()
+ {
+ boolean oldCDCOption = DatabaseDescriptor.isCDCEnabled();
+ try
+ {
+ DatabaseDescriptor.setCDCEnabled(true);
+ Collection<Mutation> mutations = SchemaKeyspace.convertSchemaToMutations();
+ boolean foundTables = false;
+ for (Mutation m : mutations)
+ {
+ if (hasSchemaTables(m))
+ {
+ foundTables = true;
+ assertTrue(hasCDC(m));
+ }
+ }
+ assertTrue(foundTables);
+ }
+ finally
+ {
+ DatabaseDescriptor.setCDCEnabled(oldCDCOption);
+ }
+ }
+
+ @Test
+ public void testSchemaDigest()
+ {
+ Set<ByteBuffer> abc = Collections.singleton(ByteBufferUtil.bytes("abc"));
+ Pair<UUID, UUID> versions = SchemaKeyspace.calculateSchemaDigest(abc);
+ assertTrue(versions.left.equals(versions.right));
+
+ Set<ByteBuffer> cdc = Collections.singleton(ByteBufferUtil.bytes("cdc"));
+ versions = SchemaKeyspace.calculateSchemaDigest(cdc);
+ assertFalse(versions.left.equals(versions.right));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[2/3] cassandra git commit: Add Unittest for schema migration fix
Posted by ja...@apache.org.
Add Unittest for schema migration fix
patch by Jay Zhuang; reviewed by jasobrown for CASSANDRA-14140
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e7b9c1f5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e7b9c1f5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e7b9c1f5
Branch: refs/heads/trunk
Commit: e7b9c1f50a9875682b480a3ab69e662f4b097d4d
Parents: d0b7566
Author: Jay Zhuang <ja...@yahoo.com>
Authored: Wed Dec 27 15:25:36 2017 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Fri Jan 19 06:17:51 2018 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/gms/VersionedValue.java | 9 +-
.../apache/cassandra/schema/SchemaKeyspace.java | 19 ++--
.../cassandra/service/StorageService.java | 25 -----
.../org/apache/cassandra/gms/GossiperTest.java | 69 ++++++++++++-
.../cassandra/schema/SchemaKeyspaceTest.java | 102 +++++++++++++++++++
6 files changed, 188 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b9c1f5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c06ad0d..5e4c40a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.11.2
+ * Add Unittest for schema migration fix (CASSANDRA-14140)
* Print correct snitch info from nodetool describecluster (CASSANDRA-13528)
* Close socket on error during connect on OutboundTcpConnection (CASSANDRA-9630)
* Enable CDC unittest (CASSANDRA-14141)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b9c1f5/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java
index d9c8d0b..0ec1712 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -24,6 +24,7 @@ import java.util.UUID;
import static java.nio.charset.StandardCharsets.ISO_8859_1;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import org.apache.cassandra.db.TypeSizes;
@@ -250,7 +251,13 @@ public class VersionedValue implements Comparable<VersionedValue>
public VersionedValue releaseVersion()
{
- return new VersionedValue(FBUtilities.getReleaseVersionString());
+ return releaseVersion(FBUtilities.getReleaseVersionString());
+ }
+
+ @VisibleForTesting
+ public VersionedValue releaseVersion(String version)
+ {
+ return new VersionedValue(version);
}
public VersionedValue networkVersion()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b9c1f5/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index b6add96..164e32d 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -25,6 +25,7 @@ import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.*;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
@@ -109,7 +110,7 @@ public final class SchemaKeyspace
* for digest calculations, otherwise the nodes will never agree on the schema during a rolling upgrade, see CASSANDRA-13559.
*/
public static final ImmutableList<String> ALL_FOR_DIGEST =
- ImmutableList.of(KEYSPACES, TABLES, COLUMNS, DROPPED_COLUMNS, TRIGGERS, VIEWS, TYPES, FUNCTIONS, AGGREGATES, INDEXES);
+ ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, VIEWS, TYPES, FUNCTIONS, AGGREGATES, INDEXES);
private static final CFMetaData Keyspaces =
compile(KEYSPACES,
@@ -317,6 +318,14 @@ public final class SchemaKeyspace
*/
public static Pair<UUID, UUID> calculateSchemaDigest()
{
+ Set<ByteBuffer> cdc = Collections.singleton(ByteBufferUtil.bytes("cdc"));
+
+ return calculateSchemaDigest(cdc);
+ }
+
+ @VisibleForTesting
+ static Pair<UUID, UUID> calculateSchemaDigest(Set<ByteBuffer> columnsToExclude)
+ {
MessageDigest digest;
MessageDigest digest30;
try
@@ -328,15 +337,9 @@ public final class SchemaKeyspace
{
throw new RuntimeException(e);
}
- Set<ByteBuffer> cdc = Collections.singleton(ByteBufferUtil.bytes("cdc"));
for (String table : ALL_FOR_DIGEST)
{
- // Due to CASSANDRA-11050 we want to exclude DROPPED_COLUMNS for schema digest computation. We can and
- // should remove that in the next major release (so C* 4.0).
- if (table.equals(DROPPED_COLUMNS))
- continue;
-
ReadCommand cmd = getReadCommandForTableSchema(table);
try (ReadExecutionController executionController = cmd.executionController();
PartitionIterator schema = cmd.executeInternal(executionController))
@@ -347,7 +350,7 @@ public final class SchemaKeyspace
{
if (!isSystemKeyspaceSchemaPartition(partition.partitionKey()))
{
- RowIterators.digest(partition, digest, digest30, cdc);
+ RowIterators.digest(partition, digest, digest30, columnsToExclude);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b9c1f5/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index c5e2912..15027b2 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -831,31 +831,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
}
- public void waitForSchema(int delay)
- {
- logger.debug("Waiting for schema (max {} seconds)", delay);
- // first sleep the delay to make sure we see all our peers
- for (int i = 0; i < delay; i += 1000)
- {
- // if we see schema, we can proceed to the next check directly
- if (!Schema.instance.isEmpty())
- {
- logger.debug("current schema version: {} (3.0 compatible: {})", Schema.instance.getRealVersion(), Schema.instance.getAltVersion());
- break;
- }
- Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
- }
- // if our schema hasn't matched yet, wait until it has
- // we do this by waiting for all in-flight migration requests and responses to complete
- // (post CASSANDRA-1391 we don't expect this to be necessary very often, but it doesn't hurt to be careful)
- if (!MigrationManager.isReadyForBootstrap())
- {
- setMode(Mode.JOINING, "waiting for schema information to complete", true);
- MigrationManager.waitUntilReadyForBootstrap();
- }
- logger.info("Has schema with version {}", Schema.instance.getVersion());
- }
-
private void joinTokenRing(int delay) throws ConfigurationException
{
joined = true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b9c1f5/test/unit/org/apache/cassandra/gms/GossiperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/GossiperTest.java b/test/unit/org/apache/cassandra/gms/GossiperTest.java
index 83f12d1..def0530 100644
--- a/test/unit/org/apache/cassandra/gms/GossiperTest.java
+++ b/test/unit/org/apache/cassandra/gms/GossiperTest.java
@@ -26,24 +26,37 @@ import java.util.UUID;
import com.google.common.collect.ImmutableMap;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
+import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.StorageService;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
public class GossiperTest
{
- static
+ @BeforeClass
+ public static void before()
{
DatabaseDescriptor.daemonInitialization();
+ SchemaLoader.prepareServer();
+ SchemaLoader.createKeyspace("schema_test_ks",
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD("schema_test_ks", "schema_test_cf"));
}
+
static final IPartitioner partitioner = new RandomPartitioner();
StorageService ss = StorageService.instance;
TokenMetadata tmd = StorageService.instance.getTokenMetadata();
@@ -56,10 +69,10 @@ public class GossiperTest
public void setup()
{
tmd.clearUnsafe();
- };
+ }
@Test
- public void testLargeGenerationJump() throws UnknownHostException, InterruptedException
+ public void testLargeGenerationJump() throws UnknownHostException
{
Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2);
InetAddress remoteHostAddress = hosts.get(1);
@@ -90,4 +103,54 @@ public class GossiperTest
//The generation should not have been updated because it is over Gossiper.MAX_GENERATION_DIFFERENCE in the future
assertEquals(proposedRemoteHeartBeat.getGeneration(), actualRemoteHeartBeat.getGeneration());
}
+
+ @Test
+ public void testSchemaVersionUpdate() throws UnknownHostException, InterruptedException
+ {
+ Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 2);
+ MessagingService.instance().listen();
+ Gossiper.instance.start(1);
+ InetAddress remoteHostAddress = hosts.get(1);
+
+ EndpointState initialRemoteState = Gossiper.instance.getEndpointStateForEndpoint(remoteHostAddress);
+ // Set to any 3.0 version
+ Gossiper.instance.injectApplicationState(remoteHostAddress, ApplicationState.RELEASE_VERSION, StorageService.instance.valueFactory.releaseVersion("3.0.14"));
+
+ Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, initialRemoteState));
+
+ // wait until the schema is set
+ VersionedValue schema = null;
+ for (int i = 0; i < 10; i++)
+ {
+ EndpointState localState = Gossiper.instance.getEndpointStateForEndpoint(hosts.get(0));
+ schema = localState.getApplicationState(ApplicationState.SCHEMA);
+ if (schema != null)
+ break;
+ Thread.sleep(1000);
+ }
+
+ // schema is set and equals to "alternative" version
+ assertTrue(schema != null);
+ assertEquals(schema.value, Schema.instance.getAltVersion().toString());
+
+ // Upgrade remote host version to the latest one (3.11)
+ Gossiper.instance.injectApplicationState(remoteHostAddress, ApplicationState.RELEASE_VERSION, StorageService.instance.valueFactory.releaseVersion());
+
+ Gossiper.instance.applyStateLocally(ImmutableMap.of(remoteHostAddress, initialRemoteState));
+
+ // wait until the schema change
+ VersionedValue newSchema = null;
+ for (int i = 0; i < 10; i++)
+ {
+ EndpointState localState = Gossiper.instance.getEndpointStateForEndpoint(hosts.get(0));
+ newSchema = localState.getApplicationState(ApplicationState.SCHEMA);
+ if (!schema.value.equals(newSchema.value))
+ break;
+ Thread.sleep(1000);
+ }
+
+ // schema is changed and equals to real version
+ assertFalse(schema.value.equals(newSchema.value));
+ assertEquals(newSchema.value, Schema.instance.getRealVersion().toString());
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e7b9c1f5/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java b/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
index 2f83498..550deed 100644
--- a/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/schema/SchemaKeyspaceTest.java
@@ -21,10 +21,12 @@ package org.apache.cassandra.schema;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.UUID;
import com.google.common.collect.ImmutableMap;
@@ -34,6 +36,7 @@ import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.config.SchemaConstants;
import org.apache.cassandra.cql3.QueryProcessor;
@@ -46,14 +49,19 @@ import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.ColumnDef;
import org.apache.cassandra.thrift.IndexType;
import org.apache.cassandra.thrift.ThriftConversion;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class SchemaKeyspaceTest
@@ -206,4 +214,98 @@ public class SchemaKeyspaceTest
assertEquals(cfm.params, params);
assertEquals(new HashSet<>(cfm.allColumns()), columns);
}
+
+ private static boolean hasCDC(Mutation m)
+ {
+ for (PartitionUpdate p : m.getPartitionUpdates())
+ {
+ for (ColumnDefinition cd : p.columns())
+ {
+ if (cd.name.toString().equals("cdc"))
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static boolean hasSchemaTables(Mutation m)
+ {
+ for (PartitionUpdate p : m.getPartitionUpdates())
+ {
+ if (p.metadata().cfName.equals(SchemaKeyspace.TABLES))
+ return true;
+ }
+ return false;
+ }
+
+ @Test
+ public void testConvertSchemaToMutationsWithoutCDC() throws IOException
+ {
+ boolean oldCDCOption = DatabaseDescriptor.isCDCEnabled();
+ try
+ {
+ DatabaseDescriptor.setCDCEnabled(false);
+ Collection<Mutation> mutations = SchemaKeyspace.convertSchemaToMutations();
+ boolean foundTables = false;
+ for (Mutation m : mutations)
+ {
+ if (hasSchemaTables(m))
+ {
+ foundTables = true;
+ assertFalse(hasCDC(m));
+ try (DataOutputBuffer output = new DataOutputBuffer())
+ {
+ Mutation.serializer.serialize(m, output, MessagingService.current_version);
+ try (DataInputBuffer input = new DataInputBuffer(output.getData()))
+ {
+ Mutation out = Mutation.serializer.deserialize(input, MessagingService.current_version);
+ assertFalse(hasCDC(out));
+ }
+ }
+ }
+ }
+ assertTrue(foundTables);
+ }
+ finally
+ {
+ DatabaseDescriptor.setCDCEnabled(oldCDCOption);
+ }
+ }
+
+ @Test
+ public void testConvertSchemaToMutationsWithCDC()
+ {
+ boolean oldCDCOption = DatabaseDescriptor.isCDCEnabled();
+ try
+ {
+ DatabaseDescriptor.setCDCEnabled(true);
+ Collection<Mutation> mutations = SchemaKeyspace.convertSchemaToMutations();
+ boolean foundTables = false;
+ for (Mutation m : mutations)
+ {
+ if (hasSchemaTables(m))
+ {
+ foundTables = true;
+ assertTrue(hasCDC(m));
+ }
+ }
+ assertTrue(foundTables);
+ }
+ finally
+ {
+ DatabaseDescriptor.setCDCEnabled(oldCDCOption);
+ }
+ }
+
+ @Test
+ public void testSchemaDigest()
+ {
+ Set<ByteBuffer> abc = Collections.singleton(ByteBufferUtil.bytes("abc"));
+ Pair<UUID, UUID> versions = SchemaKeyspace.calculateSchemaDigest(abc);
+ assertTrue(versions.left.equals(versions.right));
+
+ Set<ByteBuffer> cdc = Collections.singleton(ByteBufferUtil.bytes("cdc"));
+ versions = SchemaKeyspace.calculateSchemaDigest(cdc);
+ assertFalse(versions.left.equals(versions.right));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org
[3/3] cassandra git commit: Merge branch 'cassandra-3.11' into trunk
Posted by ja...@apache.org.
Merge branch 'cassandra-3.11' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/716d08f9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/716d08f9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/716d08f9
Branch: refs/heads/trunk
Commit: 716d08f9090deb4c0c48096b5044186d270feb45
Parents: 1cb91ea e7b9c1f
Author: Jason Brown <ja...@gmail.com>
Authored: Fri Jan 19 06:20:11 2018 -0800
Committer: Jason Brown <ja...@gmail.com>
Committed: Fri Jan 19 06:20:11 2018 -0800
----------------------------------------------------------------------
----------------------------------------------------------------------
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org