You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2021/09/07 13:35:33 UTC
[cassandra] branch trunk updated: Open java driver connections in
CQLTester in a lazy way
This is an automated email from the ASF dual-hosted git repository.
blerer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 138569b Open java driver connections in CQLTester in a lazy way
138569b is described below
commit 138569b079b3d17b1020a24463adabecd903b79f
Author: Benjamin Lerer <b....@gmail.com>
AuthorDate: Mon Sep 6 13:36:14 2021 +0200
Open java driver connections in CQLTester in a lazy way
patch by Benjamin Lerer; reviewed by Andrés de la Peña for CASSANDRA-16918
---
.../apache/cassandra/transport/DriverBurnTest.java | 2 +-
test/unit/org/apache/cassandra/cql3/CQLTester.java | 122 +++++++++++----------
.../cassandra/cql3/PreparedStatementsTest.java | 10 +-
.../metrics/ClientRequestSizeMetricsTest.java | 4 +-
.../transport/ClientNotificiationsTest.java | 3 +-
.../transport/ClientResourceLimitsTest.java | 5 +-
.../cassandra/transport/RateLimitingTest.java | 5 +-
7 files changed, 80 insertions(+), 71 deletions(-)
diff --git a/test/burn/org/apache/cassandra/transport/DriverBurnTest.java b/test/burn/org/apache/cassandra/transport/DriverBurnTest.java
index 37ebec1..8aaf87e 100644
--- a/test/burn/org/apache/cassandra/transport/DriverBurnTest.java
+++ b/test/burn/org/apache/cassandra/transport/DriverBurnTest.java
@@ -62,7 +62,7 @@ public class DriverBurnTest extends CQLTester
}
};
- requireNetwork((builder) -> builder.withPipelineConfigurator(configurator));
+ requireNetwork(builder -> builder.withPipelineConfigurator(configurator), builder -> {});
}
@Test
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index f3c279c..56be6f6 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -125,7 +125,9 @@ public abstract class CQLTester
protected static final InetAddress nativeAddr;
protected static final Set<InetAddressAndPort> remoteAddrs = new HashSet<>();
private static final Map<ProtocolVersion, Cluster> clusters = new HashMap<>();
- protected static final Map<ProtocolVersion, Session> sessions = new HashMap<>();
+ private static final Map<ProtocolVersion, Session> sessions = new HashMap<>();
+
+ private static Consumer<Cluster.Builder> clusterBuilderConfigurator;
public static final List<ProtocolVersion> PROTOCOL_VERSIONS = new ArrayList<>(ProtocolVersion.SUPPORTED.size());
@@ -411,26 +413,27 @@ public abstract class CQLTester
return allArgs;
}
- protected static void requireNetworkWithoutDriver()
- {
- startServices();
- startServer(server -> {});
- }
-
- // lazy initialization for all tests that require Java Driver
+ /**
+ * Initialize Native Transport for test that need it.
+ */
protected static void requireNetwork() throws ConfigurationException
{
- requireNetwork(server -> {});
+ requireNetwork(server -> {}, cluster -> {});
}
- // lazy initialization for all tests that require Java Driver
- protected static void requireNetwork(Consumer<Server.Builder> decorator) throws ConfigurationException
+ /**
+ * Initialize Native Transport for the tests that need it.
+ */
+ protected static void requireNetwork(Consumer<Server.Builder> serverConfigurator,
+ Consumer<Cluster.Builder> clusterConfigurator) throws ConfigurationException
{
if (server != null)
return;
+ clusterBuilderConfigurator = clusterConfigurator;
+
startServices();
- initializeNetwork(decorator, null);
+ startServer(serverConfigurator);
}
private static void startServices()
@@ -443,10 +446,11 @@ public abstract class CQLTester
protected static void reinitializeNetwork()
{
- reinitializeNetwork(null);
+ reinitializeNetwork(server -> {}, cluster -> {});
}
- protected static void reinitializeNetwork(Consumer<Cluster.Builder> clusterConfigurator)
+ protected static void reinitializeNetwork(Consumer<Server.Builder> serverConfigurator,
+ Consumer<Cluster.Builder> clusterConfigurator)
{
if (server != null && server.isRunning())
{
@@ -462,54 +466,49 @@ public abstract class CQLTester
clusters.clear();
sessions.clear();
- initializeNetwork(server -> {}, clusterConfigurator);
+ clusterBuilderConfigurator = clusterConfigurator;
+
+ startServer(serverConfigurator);
}
- private static void initializeNetwork(Consumer<Server.Builder> decorator, Consumer<Cluster.Builder> clusterConfigurator)
+ private static void startServer(Consumer<Server.Builder> decorator)
{
- startServer(decorator);
-
- for (ProtocolVersion version : PROTOCOL_VERSIONS)
- {
- if (clusters.containsKey(version))
- continue;
+ Server.Builder serverBuilder = new Server.Builder().withHost(nativeAddr).withPort(nativePort);
+ decorator.accept(serverBuilder);
+ server = serverBuilder.build();
+ ClientMetrics.instance.init(Collections.singleton(server));
+ server.start();
+ }
- SocketOptions socketOptions = new SocketOptions()
- .setConnectTimeoutMillis(Integer.getInteger("cassandra.test.driver.connection_timeout_ms", DEFAULT_CONNECT_TIMEOUT_MILLIS)) // default is 5000
- .setReadTimeoutMillis(Integer.getInteger("cassandra.test.driver.read_timeout_ms", DEFAULT_READ_TIMEOUT_MILLIS)); // default is 12000
+ private static Cluster initClientCluster(ProtocolVersion version)
+ {
+ SocketOptions socketOptions =
+ new SocketOptions().setConnectTimeoutMillis(Integer.getInteger("cassandra.test.driver.connection_timeout_ms",
+ DEFAULT_CONNECT_TIMEOUT_MILLIS)) // default is 5000
+ .setReadTimeoutMillis(Integer.getInteger("cassandra.test.driver.read_timeout_ms",
+ DEFAULT_READ_TIMEOUT_MILLIS)); // default is 12000
- logger.info("Timeouts: {} / {}", socketOptions.getConnectTimeoutMillis(), socketOptions.getReadTimeoutMillis());
+ logger.info("Timeouts: {} / {}", socketOptions.getConnectTimeoutMillis(), socketOptions.getReadTimeoutMillis());
- Cluster.Builder builder = Cluster.builder()
- .withoutJMXReporting()
- .addContactPoints(nativeAddr)
- .withClusterName("Test Cluster")
- .withPort(nativePort)
- .withSocketOptions(socketOptions);
+ Cluster.Builder builder = Cluster.builder()
+ .withoutJMXReporting()
+ .addContactPoints(nativeAddr)
+ .withClusterName("Test Cluster")
+ .withPort(nativePort)
+ .withSocketOptions(socketOptions);
- if (clusterConfigurator != null)
- clusterConfigurator.accept(builder);
+ if (version.isBeta())
+ builder = builder.allowBetaProtocolVersion();
+ else
+ builder = builder.withProtocolVersion(com.datastax.driver.core.ProtocolVersion.fromInt(version.asInt()));
- if (version.isBeta())
- builder = builder.allowBetaProtocolVersion();
- else
- builder = builder.withProtocolVersion(com.datastax.driver.core.ProtocolVersion.fromInt(version.asInt()));
+ clusterBuilderConfigurator.accept(builder);
- Cluster cluster = builder.build();
- clusters.put(version, cluster);
- sessions.put(version, cluster.connect());
+ Cluster cluster = builder.build();
- logger.info("Started Java Driver instance for protocol version {}", version);
- }
- }
+ logger.info("Started Java Driver instance for protocol version {}", version);
- private static void startServer(Consumer<Server.Builder> decorator)
- {
- Server.Builder serverBuilder = new Server.Builder().withHost(nativeAddr).withPort(nativePort);
- decorator.accept(serverBuilder);
- server = serverBuilder.build();
- ClientMetrics.instance.init(Collections.singleton(server));
- server.start();
+ return cluster;
}
protected void dropPerTestKeyspace() throws Throwable
@@ -1023,7 +1022,18 @@ public abstract class CQLTester
{
requireNetwork();
- return sessions.get(protocolVersion);
+ return getSession(protocolVersion);
+ }
+
+ private Session getSession(ProtocolVersion protocolVersion)
+ {
+ Cluster cluster = getCluster(protocolVersion);
+ return sessions.computeIfAbsent(protocolVersion, userProto -> cluster.connect());
+ }
+
+ private Cluster getCluster(ProtocolVersion protocolVersion)
+ {
+ return clusters.computeIfAbsent(protocolVersion, userProto -> initClientCluster(protocolVersion));
}
protected SimpleClient newSimpleClient(ProtocolVersion version) throws IOException
@@ -1125,9 +1135,9 @@ public abstract class CQLTester
for (int j = 0; j < meta.size(); j++)
{
DataType type = meta.getType(j);
- com.datastax.driver.core.TypeCodec<Object> codec = clusters.get(protocolVersion).getConfiguration()
- .getCodecRegistry()
- .codecFor(type);
+ com.datastax.driver.core.TypeCodec<Object> codec = getCluster(protocolVersion).getConfiguration()
+ .getCodecRegistry()
+ .codecFor(type);
ByteBuffer expectedByteValue = codec.serialize(expected[j], com.datastax.driver.core.ProtocolVersion.fromInt(protocolVersion.asInt()));
int expectedBytes = expectedByteValue == null ? -1 : expectedByteValue.remaining();
ByteBuffer actualValue = actual.getBytesUnsafe(meta.getName(j));
@@ -1825,7 +1835,7 @@ public abstract class CQLTester
protected com.datastax.driver.core.TupleType tupleTypeOf(ProtocolVersion protocolVersion, com.datastax.driver.core.DataType...types)
{
requireNetwork();
- return clusters.get(protocolVersion).getMetadata().newTupleType(types);
+ return getCluster(protocolVersion).getMetadata().newTupleType(types);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
diff --git a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
index ef705bd..5b3ae3e 100644
--- a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
@@ -58,7 +58,7 @@ public class PreparedStatementsTest extends CQLTester
@Test
public void testInvalidatePreparedStatementsOnDrop()
{
- Session session = sessions.get(ProtocolVersion.V5);
+ Session session = sessionNet(ProtocolVersion.V5);
session.execute(dropKsStatement);
session.execute(createKsStatement);
@@ -102,7 +102,7 @@ public class PreparedStatementsTest extends CQLTester
private void testInvalidatePreparedStatementOnAlter(ProtocolVersion version, boolean supportsMetadataChange)
{
- Session session = sessions.get(version);
+ Session session = sessionNet(version);
String createTableStatement = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".qp_cleanup (a int PRIMARY KEY, b int, c int);";
String alterTableStatement = "ALTER TABLE " + KEYSPACE + ".qp_cleanup ADD d int;";
@@ -162,7 +162,7 @@ public class PreparedStatementsTest extends CQLTester
private void testInvalidatePreparedStatementOnAlterUnchangedMetadata(ProtocolVersion version)
{
- Session session = sessions.get(version);
+ Session session = sessionNet(version);
String createTableStatement = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".qp_cleanup (a int PRIMARY KEY, b int, c int);";
String alterTableStatement = "ALTER TABLE " + KEYSPACE + ".qp_cleanup ADD d int;";
@@ -200,7 +200,7 @@ public class PreparedStatementsTest extends CQLTester
@Test
public void testStatementRePreparationOnReconnect()
{
- Session session = sessions.get(ProtocolVersion.V5);
+ Session session = sessionNet(ProtocolVersion.V5);
session.execute("USE " + keyspace());
session.execute(dropKsStatement);
@@ -241,7 +241,7 @@ public class PreparedStatementsTest extends CQLTester
@Test
public void prepareAndExecuteWithCustomExpressions() throws Throwable
{
- Session session = sessions.get(ProtocolVersion.V5);
+ Session session = sessionNet(ProtocolVersion.V5);
session.execute(dropKsStatement);
session.execute(createKsStatement);
diff --git a/test/unit/org/apache/cassandra/metrics/ClientRequestSizeMetricsTest.java b/test/unit/org/apache/cassandra/metrics/ClientRequestSizeMetricsTest.java
index 1f2f771..f19fca5 100644
--- a/test/unit/org/apache/cassandra/metrics/ClientRequestSizeMetricsTest.java
+++ b/test/unit/org/apache/cassandra/metrics/ClientRequestSizeMetricsTest.java
@@ -69,7 +69,9 @@ public class ClientRequestSizeMetricsTest extends CQLTester
// We explicitly disable scheme fetching to avoid that effect
try
{
- reinitializeNetwork(builder -> builder.withQueryOptions(new QueryOptions().setMetadataEnabled(false)));
+ reinitializeNetwork(builder -> {}, builder -> builder.withQueryOptions(new QueryOptions().setMetadataEnabled(false)));
+ sessionNet(version); // Ensure that the connection is open
+
// We want to ignore all the messages sent by the driver upon connection as well as
// the event sent upon schema updates
clearMetrics();
diff --git a/test/unit/org/apache/cassandra/transport/ClientNotificiationsTest.java b/test/unit/org/apache/cassandra/transport/ClientNotificiationsTest.java
index bd1ec63..4ff844a 100644
--- a/test/unit/org/apache/cassandra/transport/ClientNotificiationsTest.java
+++ b/test/unit/org/apache/cassandra/transport/ClientNotificiationsTest.java
@@ -19,7 +19,6 @@
package org.apache.cassandra.transport;
import java.util.Collections;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
@@ -43,7 +42,7 @@ public class ClientNotificiationsTest extends CQLTester
@Before
public void setup()
{
- requireNetwork(builder -> builder.withEventNotifier(notifier));
+ requireNetwork(builder -> builder.withEventNotifier(notifier), builder -> {});
}
@Parameterized.Parameter(0)
diff --git a/test/unit/org/apache/cassandra/transport/ClientResourceLimitsTest.java b/test/unit/org/apache/cassandra/transport/ClientResourceLimitsTest.java
index 5cea90c..1ffd9b4 100644
--- a/test/unit/org/apache/cassandra/transport/ClientResourceLimitsTest.java
+++ b/test/unit/org/apache/cassandra/transport/ClientResourceLimitsTest.java
@@ -71,9 +71,8 @@ public class ClientResourceLimitsTest extends CQLTester
DatabaseDescriptor.setNativeTransportReceiveQueueCapacityInBytes(1);
DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytesPerIp(LOW_LIMIT);
DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytes(LOW_LIMIT);
-
- // The driver control connections would send queries that might interfere with the tests.
- requireNetworkWithoutDriver();
+
+ requireNetwork();
}
@AfterClass
diff --git a/test/unit/org/apache/cassandra/transport/RateLimitingTest.java b/test/unit/org/apache/cassandra/transport/RateLimitingTest.java
index 8497c01..215542d 100644
--- a/test/unit/org/apache/cassandra/transport/RateLimitingTest.java
+++ b/test/unit/org/apache/cassandra/transport/RateLimitingTest.java
@@ -76,9 +76,8 @@ public class RateLimitingTest extends CQLTester
// If we don't exceed the queue capacity, we won't actually use the global/endpoint
// bytes-in-flight limits, and the assertions we make below around releasing them would be useless.
DatabaseDescriptor.setNativeTransportReceiveQueueCapacityInBytes(1);
-
- // The driver control connections would send queries that might interfere with the tests.
- requireNetworkWithoutDriver();
+
+ requireNetwork();
}
@Before
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org