You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2019/04/24 18:28:19 UTC
[cassandra] branch cassandra-2.2 updated: Support cross version
messaging in in-jvm upgrade dtests
This is an automated email from the ASF dual-hosted git repository.
bdeggleston pushed a commit to branch cassandra-2.2
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-2.2 by this push:
new 7d2c3c2 Support cross version messaging in in-jvm upgrade dtests
7d2c3c2 is described below
commit 7d2c3c215f65ee41f86886304257647fc24b1f70
Author: Blake Eggleston <bd...@gmail.com>
AuthorDate: Thu Apr 4 14:39:57 2019 -0700
Support cross version messaging in in-jvm upgrade dtests
Patch by Blake Eggleston; Reviewed by Alex Petrov for CASSANDRA-15078
---
CHANGES.txt | 1 +
.../cassandra/distributed/api/IInstance.java | 3 +++
.../distributed/impl/AbstractCluster.java | 18 ++++++++++++++++-
.../impl/DelegatingInvokableInstance.java | 12 +++++++++++
.../cassandra/distributed/impl/Instance.java | 23 ++++++++++++++++------
.../distributed/impl/InstanceClassLoader.java | 11 +++++++++++
.../distributed/upgrade/UpgradeTestBase.java | 11 ++++-------
7 files changed, 65 insertions(+), 14 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index e887733..1cc4153 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.15
+ * Support cross version messaging in in-jvm upgrade dtests (CASSANDRA-15078)
* Fix index summary redistribution cancellation (CASSANDRA-15045)
* Refactor Circle CI configuration (CASSANDRA-14806)
* Fixing invalid CQL in security documentation (CASSANDRA-15020)
diff --git a/test/distributed/org/apache/cassandra/distributed/api/IInstance.java b/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
index 8c9f962..3834093 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
@@ -42,4 +42,7 @@ public interface IInstance extends IIsolatedExecutor
// these methods are not for external use, but for simplicity we leave them public and on the normal IInstance interface
void startup(ICluster cluster);
void receiveMessage(IMessage message);
+
+ int getMessagingVersion();
+ void setMessagingVersion(InetAddressAndPort endpoint, int version);
}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 2e759f5..1dc7a65 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -141,8 +141,9 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
{
if (!isShutdown)
throw new IllegalStateException();
- delegate().startup(AbstractCluster.this);
+ delegate.startup(AbstractCluster.this);
isShutdown = false;
+ updateMessagingVersions();
}
@Override
@@ -252,6 +253,21 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
}).run();
}
+ private void updateMessagingVersions()
+ {
+ for (IInstance reportTo: instances)
+ {
+ for (IInstance reportFrom: instances)
+ {
+ if (reportFrom == reportTo)
+ continue;
+
+ int minVersion = Math.min(reportFrom.getMessagingVersion(), reportTo.getMessagingVersion());
+ reportTo.setMessagingVersion(reportFrom.broadcastAddressAndPort(), minVersion);
+ }
+ }
+ }
+
/**
* Will wait for a schema change AND agreement that occurs after it is created
* (and precedes the invocation to waitForAgreement)
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java b/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
index 27e2c04..e9e6844 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
@@ -74,6 +74,18 @@ public abstract class DelegatingInvokableInstance implements IInvokableInstance
}
@Override
+ public int getMessagingVersion()
+ {
+ return delegate().getMessagingVersion();
+ }
+
+ @Override
+ public void setMessagingVersion(InetAddressAndPort endpoint, int version)
+ {
+ delegate().setMessagingVersion(endpoint, version);
+ }
+
+ @Override
public IInstanceConfig config()
{
return delegate().config();
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index e37d60f..dce03ca 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -30,7 +30,6 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
@@ -57,18 +56,17 @@ import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IListen;
import org.apache.cassandra.distributed.api.IMessage;
-import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.metrics.CassandraMetricsRegistry;
import org.apache.cassandra.net.IMessageSink;
import org.apache.cassandra.net.MessageDeliveryTask;
import org.apache.cassandra.net.MessageIn;
@@ -201,8 +199,9 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
InetAddressAndPort from = broadcastAddressAndPort();
assert from.equals(lookupAddressAndPort.apply(messageOut.from));
InetAddressAndPort toFull = lookupAddressAndPort.apply(to);
- messageOut.serialize(out, MessagingService.current_version);
- deliver.accept(toFull, new Message(messageOut.verb.ordinal(), out.toByteArray(), id, MessagingService.current_version, from));
+ int version = MessagingService.instance().getVersion(to);
+ messageOut.serialize(out, version);
+ deliver.accept(toFull, new Message(messageOut.verb.ordinal(), out.toByteArray(), id, version, from));
}
catch (IOException e)
{
@@ -234,6 +233,16 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
}).run();
}
+ public int getMessagingVersion()
+ {
+ return callsOnInstance(() -> MessagingService.current_version).call();
+ }
+
+ public void setMessagingVersion(InetAddressAndPort endpoint, int version)
+ {
+ runOnInstance(() -> MessagingService.instance().setVersion(endpoint.address, version));
+ }
+
@Override
public void startup(ICluster cluster)
{
@@ -340,7 +349,9 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
ApplicationState.STATUS,
new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(tokens.get(i))));
Gossiper.instance.realMarkAlive(ep.address, Gossiper.instance.getEndpointStateForEndpoint(ep.address));
- MessagingService.instance().setVersion(ep.address, MessagingService.current_version);
+
+ int version = Math.min(MessagingService.current_version, cluster.get(ep).getMessagingVersion());
+ MessagingService.instance().setVersion(ep.address, version);
}
// check that all nodes are in token metadata
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
index 1722515..6fd5c7e 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
@@ -63,11 +63,15 @@ public class InstanceClassLoader extends URLClassLoader
InstanceClassLoader create(int id, URL[] urls, ClassLoader sharedClassLoader);
}
+ private final int id;
+ private final URL[] urls;
private final ClassLoader sharedClassLoader;
InstanceClassLoader(int id, URL[] urls, ClassLoader sharedClassLoader)
{
super(urls, null);
+ this.id = id;
+ this.urls = urls;
this.sharedClassLoader = sharedClassLoader;
}
@@ -102,4 +106,11 @@ public class InstanceClassLoader extends URLClassLoader
return clazz.getClassLoader().getClass().getName().equals(InstanceClassLoader.class.getName());
}
+ public String toString()
+ {
+ return "InstanceClassLoader{" +
+ "id=" + id +
+ ", urls=" + Arrays.toString(urls) +
+ '}';
+ }
}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
index 812bdbe..4403767 100644
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
@@ -28,7 +28,8 @@ import org.apache.cassandra.distributed.impl.Versions;
import org.apache.cassandra.distributed.impl.Versions.Version;
import org.apache.cassandra.distributed.test.DistributedTestBase;
-import static org.apache.cassandra.distributed.impl.Versions.*;
+import static org.apache.cassandra.distributed.impl.Versions.Major;
+import static org.apache.cassandra.distributed.impl.Versions.find;
public class UpgradeTestBase extends DistributedTestBase
{
@@ -129,17 +130,13 @@ public class UpgradeTestBase extends DistributedTestBase
{
try (UpgradeableCluster cluster = init(UpgradeableCluster.create(nodeCount, upgrade.initial)))
{
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
-
- cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
- cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)");
- cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)");
+ setup.run(cluster);
for (Version version : upgrade.upgrade)
{
for (int n = 1 ; n <= nodeCount ; ++n)
{
- cluster.get(n).shutdown();
+ cluster.get(n).shutdown().get();
cluster.get(n).setVersion(version);
cluster.get(n).startup();
runAfterNodeUpgrade.run(cluster, n);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org