You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bl...@apache.org on 2021/09/07 13:35:33 UTC

[cassandra] branch trunk updated: Open java driver connections in CQLTester in a lazy way

This is an automated email from the ASF dual-hosted git repository.

blerer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 138569b  Open java driver connections in CQLTester in a lazy way
138569b is described below

commit 138569b079b3d17b1020a24463adabecd903b79f
Author: Benjamin Lerer <b....@gmail.com>
AuthorDate: Mon Sep 6 13:36:14 2021 +0200

    Open java driver connections in CQLTester in a lazy way
    
    patch by Benjamin Lerer; reviewed by Andrés de la Peña for CASSANDRA-16918
---
 .../apache/cassandra/transport/DriverBurnTest.java |   2 +-
 test/unit/org/apache/cassandra/cql3/CQLTester.java | 122 +++++++++++----------
 .../cassandra/cql3/PreparedStatementsTest.java     |  10 +-
 .../metrics/ClientRequestSizeMetricsTest.java      |   4 +-
 .../transport/ClientNotificiationsTest.java        |   3 +-
 .../transport/ClientResourceLimitsTest.java        |   5 +-
 .../cassandra/transport/RateLimitingTest.java      |   5 +-
 7 files changed, 80 insertions(+), 71 deletions(-)

diff --git a/test/burn/org/apache/cassandra/transport/DriverBurnTest.java b/test/burn/org/apache/cassandra/transport/DriverBurnTest.java
index 37ebec1..8aaf87e 100644
--- a/test/burn/org/apache/cassandra/transport/DriverBurnTest.java
+++ b/test/burn/org/apache/cassandra/transport/DriverBurnTest.java
@@ -62,7 +62,7 @@ public class DriverBurnTest extends CQLTester
             }
         };
 
-        requireNetwork((builder) -> builder.withPipelineConfigurator(configurator));
+        requireNetwork(builder -> builder.withPipelineConfigurator(configurator), builder -> {});
     }
 
     @Test
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index f3c279c..56be6f6 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -125,7 +125,9 @@ public abstract class CQLTester
     protected static final InetAddress nativeAddr;
     protected static final Set<InetAddressAndPort> remoteAddrs = new HashSet<>();
     private static final Map<ProtocolVersion, Cluster> clusters = new HashMap<>();
-    protected static final Map<ProtocolVersion, Session> sessions = new HashMap<>();
+    private static final Map<ProtocolVersion, Session> sessions = new HashMap<>();
+
+    private static Consumer<Cluster.Builder> clusterBuilderConfigurator;
 
     public static final List<ProtocolVersion> PROTOCOL_VERSIONS = new ArrayList<>(ProtocolVersion.SUPPORTED.size());
 
@@ -411,26 +413,27 @@ public abstract class CQLTester
         return allArgs;
     }
 
-    protected static void requireNetworkWithoutDriver()
-    {
-        startServices();
-        startServer(server -> {});
-    }
-
-    // lazy initialization for all tests that require Java Driver
+    /**
+     *  Initialize Native Transport for test that need it.
+     */
     protected static void requireNetwork() throws ConfigurationException
     {
-        requireNetwork(server -> {});
+        requireNetwork(server -> {}, cluster -> {});
     }
 
-    // lazy initialization for all tests that require Java Driver
-    protected static void requireNetwork(Consumer<Server.Builder> decorator) throws ConfigurationException
+    /**
+     *  Initialize Native Transport for the tests that need it.
+     */
+    protected static void requireNetwork(Consumer<Server.Builder> serverConfigurator,
+                                         Consumer<Cluster.Builder> clusterConfigurator) throws ConfigurationException
     {
         if (server != null)
             return;
 
+        clusterBuilderConfigurator = clusterConfigurator;
+
         startServices();
-        initializeNetwork(decorator, null);
+        startServer(serverConfigurator);
     }
 
     private static void startServices()
@@ -443,10 +446,11 @@ public abstract class CQLTester
 
     protected static void reinitializeNetwork()
     {
-        reinitializeNetwork(null);
+        reinitializeNetwork(server -> {}, cluster -> {});
     }
 
-    protected static void reinitializeNetwork(Consumer<Cluster.Builder> clusterConfigurator)
+    protected static void reinitializeNetwork(Consumer<Server.Builder> serverConfigurator,
+                                              Consumer<Cluster.Builder> clusterConfigurator)
     {
         if (server != null && server.isRunning())
         {
@@ -462,54 +466,49 @@ public abstract class CQLTester
         clusters.clear();
         sessions.clear();
 
-        initializeNetwork(server -> {}, clusterConfigurator);
+        clusterBuilderConfigurator = clusterConfigurator;
+
+        startServer(serverConfigurator);
     }
 
-    private static void initializeNetwork(Consumer<Server.Builder> decorator, Consumer<Cluster.Builder> clusterConfigurator)
+    private static void startServer(Consumer<Server.Builder> decorator)
     {
-        startServer(decorator);
-
-        for (ProtocolVersion version : PROTOCOL_VERSIONS)
-        {
-            if (clusters.containsKey(version))
-                continue;
+        Server.Builder serverBuilder = new Server.Builder().withHost(nativeAddr).withPort(nativePort);
+        decorator.accept(serverBuilder);
+        server = serverBuilder.build();
+        ClientMetrics.instance.init(Collections.singleton(server));
+        server.start();
+    }
 
-            SocketOptions socketOptions = new SocketOptions()
-                                          .setConnectTimeoutMillis(Integer.getInteger("cassandra.test.driver.connection_timeout_ms", DEFAULT_CONNECT_TIMEOUT_MILLIS)) // default is 5000
-                                          .setReadTimeoutMillis(Integer.getInteger("cassandra.test.driver.read_timeout_ms", DEFAULT_READ_TIMEOUT_MILLIS)); // default is 12000
+    private static Cluster initClientCluster(ProtocolVersion version)
+    {
+        SocketOptions socketOptions =
+                new SocketOptions().setConnectTimeoutMillis(Integer.getInteger("cassandra.test.driver.connection_timeout_ms",
+                                                                               DEFAULT_CONNECT_TIMEOUT_MILLIS)) // default is 5000
+                                   .setReadTimeoutMillis(Integer.getInteger("cassandra.test.driver.read_timeout_ms",
+                                                                            DEFAULT_READ_TIMEOUT_MILLIS)); // default is 12000
 
-            logger.info("Timeouts: {} / {}", socketOptions.getConnectTimeoutMillis(), socketOptions.getReadTimeoutMillis());
+        logger.info("Timeouts: {} / {}", socketOptions.getConnectTimeoutMillis(), socketOptions.getReadTimeoutMillis());
 
-            Cluster.Builder builder = Cluster.builder()
-                                             .withoutJMXReporting()
-                                             .addContactPoints(nativeAddr)
-                                             .withClusterName("Test Cluster")
-                                             .withPort(nativePort)
-                                             .withSocketOptions(socketOptions);
+        Cluster.Builder builder = Cluster.builder()
+                                         .withoutJMXReporting()
+                                         .addContactPoints(nativeAddr)
+                                         .withClusterName("Test Cluster")
+                                         .withPort(nativePort)
+                                         .withSocketOptions(socketOptions);
 
-            if (clusterConfigurator != null)
-                clusterConfigurator.accept(builder);
+        if (version.isBeta())
+            builder = builder.allowBetaProtocolVersion();
+        else
+            builder = builder.withProtocolVersion(com.datastax.driver.core.ProtocolVersion.fromInt(version.asInt()));
 
-            if (version.isBeta())
-                builder = builder.allowBetaProtocolVersion();
-            else
-                builder = builder.withProtocolVersion(com.datastax.driver.core.ProtocolVersion.fromInt(version.asInt()));
+        clusterBuilderConfigurator.accept(builder);
 
-            Cluster cluster = builder.build();
-            clusters.put(version, cluster);
-            sessions.put(version, cluster.connect());
+        Cluster cluster = builder.build();
 
-            logger.info("Started Java Driver instance for protocol version {}", version);
-        }
-    }
+        logger.info("Started Java Driver instance for protocol version {}", version);
 
-    private static void startServer(Consumer<Server.Builder> decorator)
-    {
-        Server.Builder serverBuilder = new Server.Builder().withHost(nativeAddr).withPort(nativePort);
-        decorator.accept(serverBuilder);
-        server = serverBuilder.build();
-        ClientMetrics.instance.init(Collections.singleton(server));
-        server.start();
+        return cluster;
     }
 
     protected void dropPerTestKeyspace() throws Throwable
@@ -1023,7 +1022,18 @@ public abstract class CQLTester
     {
         requireNetwork();
 
-        return sessions.get(protocolVersion);
+        return getSession(protocolVersion);
+    }
+
+    private Session getSession(ProtocolVersion protocolVersion)
+    {
+        Cluster cluster = getCluster(protocolVersion);
+        return sessions.computeIfAbsent(protocolVersion, userProto -> cluster.connect());
+    }
+
+    private Cluster getCluster(ProtocolVersion protocolVersion)
+    {
+        return clusters.computeIfAbsent(protocolVersion, userProto -> initClientCluster(protocolVersion));
     }
 
     protected SimpleClient newSimpleClient(ProtocolVersion version) throws IOException
@@ -1125,9 +1135,9 @@ public abstract class CQLTester
             for (int j = 0; j < meta.size(); j++)
             {
                 DataType type = meta.getType(j);
-                com.datastax.driver.core.TypeCodec<Object> codec = clusters.get(protocolVersion).getConfiguration()
-                                                                                                .getCodecRegistry()
-                                                                                                .codecFor(type);
+                com.datastax.driver.core.TypeCodec<Object> codec = getCluster(protocolVersion).getConfiguration()
+                                                                                              .getCodecRegistry()
+                                                                                              .codecFor(type);
                 ByteBuffer expectedByteValue = codec.serialize(expected[j], com.datastax.driver.core.ProtocolVersion.fromInt(protocolVersion.asInt()));
                 int expectedBytes = expectedByteValue == null ? -1 : expectedByteValue.remaining();
                 ByteBuffer actualValue = actual.getBytesUnsafe(meta.getName(j));
@@ -1825,7 +1835,7 @@ public abstract class CQLTester
     protected com.datastax.driver.core.TupleType tupleTypeOf(ProtocolVersion protocolVersion, com.datastax.driver.core.DataType...types)
     {
         requireNetwork();
-        return clusters.get(protocolVersion).getMetadata().newTupleType(types);
+        return getCluster(protocolVersion).getMetadata().newTupleType(types);
     }
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
diff --git a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
index ef705bd..5b3ae3e 100644
--- a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java
@@ -58,7 +58,7 @@ public class PreparedStatementsTest extends CQLTester
     @Test
     public void testInvalidatePreparedStatementsOnDrop()
     {
-        Session session = sessions.get(ProtocolVersion.V5);
+        Session session = sessionNet(ProtocolVersion.V5);
         session.execute(dropKsStatement);
         session.execute(createKsStatement);
 
@@ -102,7 +102,7 @@ public class PreparedStatementsTest extends CQLTester
 
     private void testInvalidatePreparedStatementOnAlter(ProtocolVersion version, boolean supportsMetadataChange)
     {
-        Session session = sessions.get(version);
+        Session session = sessionNet(version);
         String createTableStatement = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".qp_cleanup (a int PRIMARY KEY, b int, c int);";
         String alterTableStatement = "ALTER TABLE " + KEYSPACE + ".qp_cleanup ADD d int;";
 
@@ -162,7 +162,7 @@ public class PreparedStatementsTest extends CQLTester
 
     private void testInvalidatePreparedStatementOnAlterUnchangedMetadata(ProtocolVersion version)
     {
-        Session session = sessions.get(version);
+        Session session = sessionNet(version);
         String createTableStatement = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".qp_cleanup (a int PRIMARY KEY, b int, c int);";
         String alterTableStatement = "ALTER TABLE " + KEYSPACE + ".qp_cleanup ADD d int;";
 
@@ -200,7 +200,7 @@ public class PreparedStatementsTest extends CQLTester
     @Test
     public void testStatementRePreparationOnReconnect()
     {
-        Session session = sessions.get(ProtocolVersion.V5);
+        Session session = sessionNet(ProtocolVersion.V5);
         session.execute("USE " + keyspace());
 
         session.execute(dropKsStatement);
@@ -241,7 +241,7 @@ public class PreparedStatementsTest extends CQLTester
     @Test
     public void prepareAndExecuteWithCustomExpressions() throws Throwable
     {
-        Session session = sessions.get(ProtocolVersion.V5);
+        Session session = sessionNet(ProtocolVersion.V5);
 
         session.execute(dropKsStatement);
         session.execute(createKsStatement);
diff --git a/test/unit/org/apache/cassandra/metrics/ClientRequestSizeMetricsTest.java b/test/unit/org/apache/cassandra/metrics/ClientRequestSizeMetricsTest.java
index 1f2f771..f19fca5 100644
--- a/test/unit/org/apache/cassandra/metrics/ClientRequestSizeMetricsTest.java
+++ b/test/unit/org/apache/cassandra/metrics/ClientRequestSizeMetricsTest.java
@@ -69,7 +69,9 @@ public class ClientRequestSizeMetricsTest extends CQLTester
         // We explicitly disable scheme fetching to avoid that effect
         try
         {
-            reinitializeNetwork(builder -> builder.withQueryOptions(new QueryOptions().setMetadataEnabled(false)));
+            reinitializeNetwork(builder -> {}, builder -> builder.withQueryOptions(new QueryOptions().setMetadataEnabled(false)));
+            sessionNet(version); // Ensure that the connection is open
+
             // We want to ignore all the messages sent by the driver upon connection as well as
             // the event sent upon schema updates
             clearMetrics();
diff --git a/test/unit/org/apache/cassandra/transport/ClientNotificiationsTest.java b/test/unit/org/apache/cassandra/transport/ClientNotificiationsTest.java
index bd1ec63..4ff844a 100644
--- a/test/unit/org/apache/cassandra/transport/ClientNotificiationsTest.java
+++ b/test/unit/org/apache/cassandra/transport/ClientNotificiationsTest.java
@@ -19,7 +19,6 @@
 package org.apache.cassandra.transport;
 
 import java.util.Collections;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.junit.Before;
@@ -43,7 +42,7 @@ public class ClientNotificiationsTest extends CQLTester
     @Before
     public void setup()
     {
-        requireNetwork(builder -> builder.withEventNotifier(notifier));
+        requireNetwork(builder -> builder.withEventNotifier(notifier), builder -> {});
     }
 
     @Parameterized.Parameter(0)
diff --git a/test/unit/org/apache/cassandra/transport/ClientResourceLimitsTest.java b/test/unit/org/apache/cassandra/transport/ClientResourceLimitsTest.java
index 5cea90c..1ffd9b4 100644
--- a/test/unit/org/apache/cassandra/transport/ClientResourceLimitsTest.java
+++ b/test/unit/org/apache/cassandra/transport/ClientResourceLimitsTest.java
@@ -71,9 +71,8 @@ public class ClientResourceLimitsTest extends CQLTester
         DatabaseDescriptor.setNativeTransportReceiveQueueCapacityInBytes(1);
         DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytesPerIp(LOW_LIMIT);
         DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytes(LOW_LIMIT);
-        
-        // The driver control connections would send queries that might interfere with the tests.
-        requireNetworkWithoutDriver();
+
+        requireNetwork();
     }
 
     @AfterClass
diff --git a/test/unit/org/apache/cassandra/transport/RateLimitingTest.java b/test/unit/org/apache/cassandra/transport/RateLimitingTest.java
index 8497c01..215542d 100644
--- a/test/unit/org/apache/cassandra/transport/RateLimitingTest.java
+++ b/test/unit/org/apache/cassandra/transport/RateLimitingTest.java
@@ -76,9 +76,8 @@ public class RateLimitingTest extends CQLTester
         // If we don't exceed the queue capacity, we won't actually use the global/endpoint 
         // bytes-in-flight limits, and the assertions we make below around releasing them would be useless.
         DatabaseDescriptor.setNativeTransportReceiveQueueCapacityInBytes(1);
-        
-        // The driver control connections would send queries that might interfere with the tests.
-        requireNetworkWithoutDriver();
+
+        requireNetwork();
     }
 
     @Before

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org