You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by mc...@apache.org on 2020/10/17 18:51:26 UTC

[cassandra] branch trunk updated: Add metric for client concurrent byte throttle

This is an automated email from the ASF dual-hosted git repository.

mck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4adb8a5  Add metric for client concurrent byte throttle
4adb8a5 is described below

commit 4adb8a5e075f5647f36a82f53c43ce7fc0c95b62
Author: Chris Lohfink <cl...@apple.com>
AuthorDate: Mon Sep 21 04:27:20 2020 -0500

    Add metric for client concurrent byte throttle
    
     patch by Chris Lohfink; reviewed by David Capwell, Mick Semb Wever for CASSANDRA-16074
---
 doc/source/operating/metrics.rst                   |  17 +-
 .../cassandra/db/virtual/AbstractVirtualTable.java |  16 ++
 .../apache/cassandra/metrics/ClientMetrics.java    |  14 +-
 .../org/apache/cassandra/transport/Server.java     |  44 +++-
 .../InflightRequestPayloadTrackerTest.java         | 251 +++++++++------------
 5 files changed, 193 insertions(+), 149 deletions(-)

diff --git a/doc/source/operating/metrics.rst b/doc/source/operating/metrics.rst
index 2cabfd5..2b6cef2 100644
--- a/doc/source/operating/metrics.rst
+++ b/doc/source/operating/metrics.rst
@@ -622,13 +622,16 @@ Reported name format:
 **JMX MBean**
     ``org.apache.cassandra.metrics:type=Client name=<MetricName>``
 
-============================== =============================== ===========
-Name                           Type                            Description
-============================== =============================== ===========
-connectedNativeClients         Gauge<Integer>                  Number of clients connected to this nodes native protocol server
-connections                    Gauge<List<Map<String, String>> List of all connections and their state information
-connectedNativeClientsByUser   Gauge<Map<String, Int>          Number of connnective native clients by username
-============================== =============================== ===========
+============================== ================================ ===========
+Name                           Type                             Description
+============================== ================================ ===========
+ConnectedNativeClients         Gauge<Integer>                   Number of clients connected to this nodes native protocol server
+Connections                    Gauge<List<Map<String, String>>  List of all connections and their state information
+ConnectedNativeClientsByUser   Gauge<Map<String, Int>           Number of connnective native clients by username
+ClientsByProtocolVersion       Gauge<List<Map<String, String>>> List of up to last 100 connections including protocol version. Can be reset with clearConnectionHistory operation in org.apache.cassandra.db:StorageService mbean.
+RequestsSize                   Gauge<Long>                      How many concurrent bytes used in currently processing requests
+RequestsSizeByIpDistribution   Histogram                        How many concurrent bytes used in currently processing requests by different ips
+============================== ================================ ===========
 
 
 Batch Metrics
diff --git a/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java b/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
index 6c49b9a..c2de1db 100644
--- a/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
+++ b/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.virtual;
 
 import java.util.Iterator;
 import java.util.NavigableMap;
+import java.util.function.Supplier;
 
 import com.google.common.collect.AbstractIterator;
 
@@ -219,4 +220,19 @@ public abstract class AbstractVirtualTable implements VirtualTable
             };
         }
     }
+
+    public static class SimpleTable extends AbstractVirtualTable
+    {
+        private final Supplier<? extends AbstractVirtualTable.DataSet> supplier;
+        public SimpleTable(TableMetadata metadata, Supplier<AbstractVirtualTable.DataSet> supplier)
+        {
+            super(metadata);
+            this.supplier = supplier;
+        }
+
+        public AbstractVirtualTable.DataSet data()
+        {
+            return supplier.get();
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/metrics/ClientMetrics.java b/src/java/org/apache/cassandra/metrics/ClientMetrics.java
index 6464a17..c7d880e 100644
--- a/src/java/org/apache/cassandra/metrics/ClientMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ClientMetrics.java
@@ -22,7 +22,9 @@ import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
+import com.codahale.metrics.Reservoir;
 import org.apache.cassandra.transport.ClientStat;
 import org.apache.cassandra.transport.ConnectedClient;
 import org.apache.cassandra.transport.Server;
@@ -40,7 +42,6 @@ public final class ClientMetrics
 
     private Meter authSuccess;
     private Meter authFailure;
-
     private AtomicInteger pausedConnections;
     
     @SuppressWarnings({ "unused", "FieldCanBeLocal" })
@@ -89,6 +90,17 @@ public final class ClientMetrics
         registerGauge("ConnectedNativeClientsByUser", "connectedNativeClientsByUser", this::countConnectedClientsByUser);
         registerGauge("Connections", "connections", this::connectedClients);
         registerGauge("ClientsByProtocolVersion", "clientsByProtocolVersion", this::recentClientStats);
+        registerGauge("RequestsSize", Server.EndpointPayloadTracker::getCurrentGlobalUsage);
+
+        Reservoir ipUsageReservoir = Server.EndpointPayloadTracker.ipUsageReservoir();
+        Metrics.register(factory.createMetricName("RequestsSizeByIpDistribution"),
+                         new Histogram(ipUsageReservoir)
+        {
+             public long getCount()
+             {
+                 return ipUsageReservoir.size();
+             }
+        });
 
         authSuccess = registerMeter("AuthSuccess");
         authFailure = registerMeter("AuthFailure");
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 69c87ee..b1fad53 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -31,6 +31,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.codahale.metrics.Reservoir;
+import com.codahale.metrics.Snapshot;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
@@ -44,7 +46,6 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslHandler;
-import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.Version;
@@ -57,6 +58,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.DecayingEstimatedHistogramReservoir;
 import org.apache.cassandra.net.ResourceLimits;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaChangeListener;
@@ -366,6 +368,21 @@ public class Server implements CassandraDaemon.Server
             }
         }
 
+        public static long getCurrentGlobalUsage()
+        {
+            return globalRequestPayloadInFlight.using();
+        }
+
+        public static Snapshot getCurrentIpUsage()
+        {
+            DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir();
+            for (EndpointPayloadTracker tracker : requestPayloadInFlightPerEndpoint.values())
+            {
+                histogram.update(tracker.endpointAndGlobalPayloadsInFlight.endpoint().using());
+            }
+            return histogram.getSnapshot();
+        }
+
         public static long getGlobalLimit()
         {
             return DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytes();
@@ -404,6 +421,31 @@ public class Server implements CassandraDaemon.Server
             if (-1 == refCount.updateAndGet(i -> i == 1 ? -1 : i - 1))
                 requestPayloadInFlightPerEndpoint.remove(endpoint, this);
         }
+
+        /**
+         * This will recompute the ip usage histo on each query of the snapshot when requested instead of trying to keep
+         * a histogram up to date with each request
+         */
+        public static Reservoir ipUsageReservoir()
+        {
+            return new Reservoir()
+            {
+                public int size()
+                {
+                    return requestPayloadInFlightPerEndpoint.size();
+                }
+
+                public void update(long l)
+                {
+                    throw new IllegalStateException();
+                }
+
+                public Snapshot getSnapshot()
+                {
+                    return getCurrentIpUsage();
+                }
+            };
+        }
     }
 
     private static class Initializer extends ChannelInitializer<Channel>
diff --git a/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java b/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java
index 3c18a75..5ff0fa2 100644
--- a/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java
+++ b/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java
@@ -18,6 +18,11 @@
 
 package org.apache.cassandra.transport;
 
+import java.io.IOException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableList;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -31,7 +36,15 @@ import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.virtual.AbstractVirtualTable;
+import org.apache.cassandra.db.virtual.SimpleDataSet;
+import org.apache.cassandra.db.virtual.VirtualKeyspace;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
+import org.apache.cassandra.db.virtual.VirtualTable;
 import org.apache.cassandra.exceptions.OverloadedException;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.transport.messages.QueryMessage;
 
 @RunWith(OrderedJUnit4ClassRunner.class)
@@ -41,6 +54,16 @@ public class InflightRequestPayloadTrackerTest extends CQLTester
     private static long LOW_LIMIT = 600L;
     private static long HIGH_LIMIT = 5000000000L;
 
+    private static final QueryOptions V5_DEFAULT_OPTIONS = QueryOptions.create(
+    QueryOptions.DEFAULT.getConsistency(),
+    QueryOptions.DEFAULT.getValues(),
+    QueryOptions.DEFAULT.skipMetadata(),
+    QueryOptions.DEFAULT.getPageSize(),
+    QueryOptions.DEFAULT.getPagingState(),
+    QueryOptions.DEFAULT.getSerialConsistency(),
+    ProtocolVersion.V5,
+    KEYSPACE);
+
     @BeforeClass
     public static void setUp()
     {
@@ -69,137 +92,127 @@ public class InflightRequestPayloadTrackerTest extends CQLTester
         }
     }
 
+    private SimpleClient client() throws IOException
+    {
+        return new SimpleClient(nativeAddr.getHostAddress(),
+                                nativePort,
+                                ProtocolVersion.V5,
+                                true,
+                                new EncryptionOptions())
+               .connect(false, false, true);
+    }
+
     @Test
     public void testQueryExecutionWithThrowOnOverload() throws Throwable
     {
-        SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
-                                               nativePort,
-                                               ProtocolVersion.V5,
-                                               true,
-                                               new EncryptionOptions());
-
-        try
+        try (SimpleClient client = client())
         {
-            client.connect(false, false, true);
-            QueryOptions queryOptions = QueryOptions.create(
-            QueryOptions.DEFAULT.getConsistency(),
-            QueryOptions.DEFAULT.getValues(),
-            QueryOptions.DEFAULT.skipMetadata(),
-            QueryOptions.DEFAULT.getPageSize(),
-            QueryOptions.DEFAULT.getPagingState(),
-            QueryOptions.DEFAULT.getSerialConsistency(),
-            ProtocolVersion.V5,
-            KEYSPACE);
-
             QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable (pk1 int PRIMARY KEY, v text)",
-                                                         queryOptions);
+                                                         V5_DEFAULT_OPTIONS);
             client.execute(queryMessage);
         }
-        finally
-        {
-            client.close();
-        }
     }
 
     @Test
     public void testQueryExecutionWithoutThrowOnOverload() throws Throwable
     {
-        SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
-                                               nativePort,
-                                               ProtocolVersion.V5,
-                                               true,
-                                               new EncryptionOptions());
-
-        try
+        try (SimpleClient client = client())
         {
             client.connect(false, false, false);
-            QueryOptions queryOptions = QueryOptions.create(
-            QueryOptions.DEFAULT.getConsistency(),
-            QueryOptions.DEFAULT.getValues(),
-            QueryOptions.DEFAULT.skipMetadata(),
-            QueryOptions.DEFAULT.getPageSize(),
-            QueryOptions.DEFAULT.getPagingState(),
-            QueryOptions.DEFAULT.getSerialConsistency(),
-            ProtocolVersion.V5,
-            KEYSPACE);
 
             QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable (pk int PRIMARY KEY, v text)",
-                                                         queryOptions);
+                                                         V5_DEFAULT_OPTIONS);
             client.execute(queryMessage);
             queryMessage = new QueryMessage("SELECT * FROM atable",
-                                            queryOptions);
+                                            V5_DEFAULT_OPTIONS);
             client.execute(queryMessage);
         }
-        finally
+    }
+
+    @Test
+    public void testQueryUpdatesConcurrentMetricsUpdate() throws Throwable
+    {
+        try (SimpleClient client = client())
         {
-            client.close();
+            CyclicBarrier barrier = new CyclicBarrier(2);
+            String table = createTableName();
+
+            // reusing table name for keyspace name since cannot reuse KEYSPACE and want it to be unique
+            TableMetadata tableMetadata =
+                TableMetadata.builder(table, table)
+                             .kind(TableMetadata.Kind.VIRTUAL)
+                             .addPartitionKeyColumn("pk", UTF8Type.instance)
+                             .addRegularColumn("v", Int32Type.instance)
+                             .build();
+
+            VirtualTable vt1 = new AbstractVirtualTable.SimpleTable(tableMetadata, () -> {
+                try
+                {
+                    // sync up with main thread thats waiting for query to be in progress
+                    barrier.await(30, TimeUnit.SECONDS);
+                    // wait until metric has been checked
+                    barrier.await(30, TimeUnit.SECONDS);
+                }
+                catch (Exception e)
+                {
+                    // ignore interuption and barrier exceptions
+                }
+                return new SimpleDataSet(tableMetadata);
+            });
+            VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(table, ImmutableList.of(vt1)));
+
+            final QueryMessage queryMessage = new QueryMessage(String.format("SELECT * FROM %s.%s", table, table),
+                                                               V5_DEFAULT_OPTIONS);
+
+            Assert.assertEquals(0L, Server.EndpointPayloadTracker.getCurrentGlobalUsage());
+            try
+            {
+                Thread tester = new Thread(() -> client.execute(queryMessage));
+                tester.setDaemon(true); // so wont block exit if something fails
+                tester.start();
+                // block until query in progress
+                barrier.await(30, TimeUnit.SECONDS);
+                Assert.assertTrue(Server.EndpointPayloadTracker.getCurrentGlobalUsage() > 0);
+            } finally
+            {
+                // notify query thread that metric has been checked. This will also throw TimeoutException if both
+                // the query threads barriers are not reached
+                barrier.await(30, TimeUnit.SECONDS);
+            }
         }
     }
 
     @Test
     public void testQueryExecutionWithoutThrowOnOverloadAndInflightLimitedExceeded() throws Throwable
     {
-        SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
-                                               nativePort,
-                                               ProtocolVersion.V5,
-                                               true,
-                                               new EncryptionOptions());
-
-        try
+        try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+                                                    nativePort,
+                                                    ProtocolVersion.V5,
+                                                    true,
+                                                    new EncryptionOptions()))
         {
             client.connect(false, false, false);
-            QueryOptions queryOptions = QueryOptions.create(
-            QueryOptions.DEFAULT.getConsistency(),
-            QueryOptions.DEFAULT.getValues(),
-            QueryOptions.DEFAULT.skipMetadata(),
-            QueryOptions.DEFAULT.getPageSize(),
-            QueryOptions.DEFAULT.getPagingState(),
-            QueryOptions.DEFAULT.getSerialConsistency(),
-            ProtocolVersion.V5,
-            KEYSPACE);
-
             QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable (pk int PRIMARY KEY, v text)",
-                                                         queryOptions);
+                                                         V5_DEFAULT_OPTIONS);
             client.execute(queryMessage);
 
             queryMessage = new QueryMessage("INSERT INTO atable (pk, v) VALUES (1, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa [...]
-                                            queryOptions);
+                                            V5_DEFAULT_OPTIONS);
             client.execute(queryMessage);
         }
-        finally
-        {
-            client.close();
-        }
     }
 
     @Test
     public void testOverloadedExceptionForEndpointInflightLimit() throws Throwable
     {
-        SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
-                                               nativePort,
-                                               ProtocolVersion.V5,
-                                               true,
-                                               new EncryptionOptions());
-
-        try
+        try (SimpleClient client = client())
         {
-            client.connect(false, false, true);
-            QueryOptions queryOptions = QueryOptions.create(
-            QueryOptions.DEFAULT.getConsistency(),
-            QueryOptions.DEFAULT.getValues(),
-            QueryOptions.DEFAULT.skipMetadata(),
-            QueryOptions.DEFAULT.getPageSize(),
-            QueryOptions.DEFAULT.getPagingState(),
-            QueryOptions.DEFAULT.getSerialConsistency(),
-            ProtocolVersion.V5,
-            KEYSPACE);
-
             QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable (pk int PRIMARY KEY, v text)",
-                                                         queryOptions);
+                                                         V5_DEFAULT_OPTIONS);
             client.execute(queryMessage);
 
             queryMessage = new QueryMessage("INSERT INTO atable (pk, v) VALUES (1, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa [...]
-                                            queryOptions);
+                                            V5_DEFAULT_OPTIONS);
             try
             {
                 client.execute(queryMessage);
@@ -210,40 +223,19 @@ public class InflightRequestPayloadTrackerTest extends CQLTester
                 Assert.assertTrue(e.getCause() instanceof OverloadedException);
             }
         }
-        finally
-        {
-            client.close();
-        }
     }
 
     @Test
     public void testOverloadedExceptionForOverallInflightLimit() throws Throwable
     {
-        SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
-                                               nativePort,
-                                               ProtocolVersion.V5,
-                                               true,
-                                               new EncryptionOptions());
-
-        try
+        try (SimpleClient client = client())
         {
-            client.connect(false, false, true);
-            QueryOptions queryOptions = QueryOptions.create(
-            QueryOptions.DEFAULT.getConsistency(),
-            QueryOptions.DEFAULT.getValues(),
-            QueryOptions.DEFAULT.skipMetadata(),
-            QueryOptions.DEFAULT.getPageSize(),
-            QueryOptions.DEFAULT.getPagingState(),
-            QueryOptions.DEFAULT.getSerialConsistency(),
-            ProtocolVersion.V5,
-            KEYSPACE);
-
             QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable (pk int PRIMARY KEY, v text)",
-                                                         queryOptions);
+                                                         V5_DEFAULT_OPTIONS);
             client.execute(queryMessage);
 
             queryMessage = new QueryMessage("INSERT INTO atable (pk, v) VALUES (1, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa [...]
-                                            queryOptions);
+                                            V5_DEFAULT_OPTIONS);
             try
             {
                 client.execute(queryMessage);
@@ -254,40 +246,20 @@ public class InflightRequestPayloadTrackerTest extends CQLTester
                 Assert.assertTrue(e.getCause() instanceof OverloadedException);
             }
         }
-        finally
-        {
-            client.close();
-        }
     }
 
     @Test
     public void testChangingLimitsAtRuntime() throws Throwable
     {
-        SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
-                                               nativePort,
-                                               ProtocolVersion.V5,
-                                               true,
-                                               new EncryptionOptions());
-        client.connect(false, true, true);
-
-        QueryOptions queryOptions = QueryOptions.create(
-        QueryOptions.DEFAULT.getConsistency(),
-        QueryOptions.DEFAULT.getValues(),
-        QueryOptions.DEFAULT.skipMetadata(),
-        QueryOptions.DEFAULT.getPageSize(),
-        QueryOptions.DEFAULT.getPagingState(),
-        QueryOptions.DEFAULT.getSerialConsistency(),
-        ProtocolVersion.V5,
-        KEYSPACE);
-
+        SimpleClient client = client();
         try
         {
             QueryMessage queryMessage = new QueryMessage(String.format("CREATE TABLE %s.atable (pk int PRIMARY KEY, v text)", KEYSPACE),
-                                                         queryOptions);
+                                                         V5_DEFAULT_OPTIONS);
             client.execute(queryMessage);
 
             queryMessage = new QueryMessage(String.format("INSERT INTO %s.atable (pk, v) VALUES (1, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa [...]
-                                            queryOptions);
+                                            V5_DEFAULT_OPTIONS);
             try
             {
                 client.execute(queryMessage);
@@ -298,14 +270,13 @@ public class InflightRequestPayloadTrackerTest extends CQLTester
                 Assert.assertTrue(e.getCause() instanceof OverloadedException);
             }
 
-
             // change global limit, query will still fail because endpoint limit
             Server.EndpointPayloadTracker.setGlobalLimit(HIGH_LIMIT);
             Assert.assertEquals("new global limit not returned by EndpointPayloadTrackers", HIGH_LIMIT, Server.EndpointPayloadTracker.getGlobalLimit());
             Assert.assertEquals("new global limit not returned by DatabaseDescriptor", HIGH_LIMIT, DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytes());
 
             queryMessage = new QueryMessage(String.format("INSERT INTO %s.atable (pk, v) VALUES (1, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa [...]
-                                            queryOptions);
+                                            V5_DEFAULT_OPTIONS);
             try
             {
                 client.execute(queryMessage);
@@ -322,7 +293,7 @@ public class InflightRequestPayloadTrackerTest extends CQLTester
             Assert.assertEquals("new endpoint limit not returned by DatabaseDescriptor", HIGH_LIMIT, DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytesPerIp());
 
             queryMessage = new QueryMessage(String.format("INSERT INTO %s.atable (pk, v) VALUES (1, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa [...]
-                                            queryOptions);
+                                            V5_DEFAULT_OPTIONS);
             client.execute(queryMessage);
 
             // ensure new clients also see the new raised limits
@@ -335,7 +306,7 @@ public class InflightRequestPayloadTrackerTest extends CQLTester
             client.connect(false, true, true);
 
             queryMessage = new QueryMessage(String.format("INSERT INTO %s.atable (pk, v) VALUES (1, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa [...]
-                                            queryOptions);
+                                            V5_DEFAULT_OPTIONS);
             client.execute(queryMessage);
 
             // lower the global limit and ensure the query fails again
@@ -344,7 +315,7 @@ public class InflightRequestPayloadTrackerTest extends CQLTester
             Assert.assertEquals("new global limit not returned by DatabaseDescriptor", LOW_LIMIT, DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytes());
 
             queryMessage = new QueryMessage(String.format("INSERT INTO %s.atable (pk, v) VALUES (1, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa [...]
-                                            queryOptions);
+                                            V5_DEFAULT_OPTIONS);
             try
             {
                 client.execute(queryMessage);
@@ -361,7 +332,7 @@ public class InflightRequestPayloadTrackerTest extends CQLTester
             Assert.assertEquals("new endpoint limit not returned by DatabaseDescriptor", 60, DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytesPerIp());
 
             queryMessage = new QueryMessage(String.format("CREATE TABLE %s.atable (pk int PRIMARY KEY, v text)", KEYSPACE),
-                                                         queryOptions);
+                                            V5_DEFAULT_OPTIONS);
             try
             {
                 client.execute(queryMessage);
@@ -382,7 +353,7 @@ public class InflightRequestPayloadTrackerTest extends CQLTester
             client.connect(false, true, true);
 
             queryMessage = new QueryMessage(String.format("CREATE TABLE %s.atable (pk int PRIMARY KEY, v text)", KEYSPACE),
-                                            queryOptions);
+                                            V5_DEFAULT_OPTIONS);
             try
             {
                 client.execute(queryMessage);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org