You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by mc...@apache.org on 2020/10/17 18:51:26 UTC
[cassandra] branch trunk updated: Add metric for client concurrent
byte throttle
This is an automated email from the ASF dual-hosted git repository.
mck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4adb8a5 Add metric for client concurrent byte throttle
4adb8a5 is described below
commit 4adb8a5e075f5647f36a82f53c43ce7fc0c95b62
Author: Chris Lohfink <cl...@apple.com>
AuthorDate: Mon Sep 21 04:27:20 2020 -0500
Add metric for client concurrent byte throttle
patch by Chris Lohfink; reviewed by David Capwell, Mick Semb Wever for CASSANDRA-16074
---
doc/source/operating/metrics.rst | 17 +-
.../cassandra/db/virtual/AbstractVirtualTable.java | 16 ++
.../apache/cassandra/metrics/ClientMetrics.java | 14 +-
.../org/apache/cassandra/transport/Server.java | 44 +++-
.../InflightRequestPayloadTrackerTest.java | 251 +++++++++------------
5 files changed, 193 insertions(+), 149 deletions(-)
diff --git a/doc/source/operating/metrics.rst b/doc/source/operating/metrics.rst
index 2cabfd5..2b6cef2 100644
--- a/doc/source/operating/metrics.rst
+++ b/doc/source/operating/metrics.rst
@@ -622,13 +622,16 @@ Reported name format:
**JMX MBean**
``org.apache.cassandra.metrics:type=Client name=<MetricName>``
-============================== =============================== ===========
-Name Type Description
-============================== =============================== ===========
-connectedNativeClients Gauge<Integer> Number of clients connected to this nodes native protocol server
-connections Gauge<List<Map<String, String>> List of all connections and their state information
-connectedNativeClientsByUser Gauge<Map<String, Int> Number of connnective native clients by username
-============================== =============================== ===========
+============================== ================================ ===========
+Name Type Description
+============================== ================================ ===========
+ConnectedNativeClients Gauge<Integer> Number of clients connected to this nodes native protocol server
+Connections Gauge<List<Map<String, String>> List of all connections and their state information
+ConnectedNativeClientsByUser Gauge<Map<String, Int> Number of connnective native clients by username
+ClientsByProtocolVersion Gauge<List<Map<String, String>>> List of up to last 100 connections including protocol version. Can be reset with clearConnectionHistory operation in org.apache.cassandra.db:StorageService mbean.
+RequestsSize Gauge<Long> How many concurrent bytes used in currently processing requests
+RequestsSizeByIpDistribution Histogram How many concurrent bytes used in currently processing requests by different ips
+============================== ================================ ===========
Batch Metrics
diff --git a/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java b/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
index 6c49b9a..c2de1db 100644
--- a/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
+++ b/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.virtual;
import java.util.Iterator;
import java.util.NavigableMap;
+import java.util.function.Supplier;
import com.google.common.collect.AbstractIterator;
@@ -219,4 +220,19 @@ public abstract class AbstractVirtualTable implements VirtualTable
};
}
}
+
+ public static class SimpleTable extends AbstractVirtualTable
+ {
+ private final Supplier<? extends AbstractVirtualTable.DataSet> supplier;
+ public SimpleTable(TableMetadata metadata, Supplier<AbstractVirtualTable.DataSet> supplier)
+ {
+ super(metadata);
+ this.supplier = supplier;
+ }
+
+ public AbstractVirtualTable.DataSet data()
+ {
+ return supplier.get();
+ }
+ }
}
diff --git a/src/java/org/apache/cassandra/metrics/ClientMetrics.java b/src/java/org/apache/cassandra/metrics/ClientMetrics.java
index 6464a17..c7d880e 100644
--- a/src/java/org/apache/cassandra/metrics/ClientMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ClientMetrics.java
@@ -22,7 +22,9 @@ import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
+import com.codahale.metrics.Reservoir;
import org.apache.cassandra.transport.ClientStat;
import org.apache.cassandra.transport.ConnectedClient;
import org.apache.cassandra.transport.Server;
@@ -40,7 +42,6 @@ public final class ClientMetrics
private Meter authSuccess;
private Meter authFailure;
-
private AtomicInteger pausedConnections;
@SuppressWarnings({ "unused", "FieldCanBeLocal" })
@@ -89,6 +90,17 @@ public final class ClientMetrics
registerGauge("ConnectedNativeClientsByUser", "connectedNativeClientsByUser", this::countConnectedClientsByUser);
registerGauge("Connections", "connections", this::connectedClients);
registerGauge("ClientsByProtocolVersion", "clientsByProtocolVersion", this::recentClientStats);
+ registerGauge("RequestsSize", Server.EndpointPayloadTracker::getCurrentGlobalUsage);
+
+ Reservoir ipUsageReservoir = Server.EndpointPayloadTracker.ipUsageReservoir();
+ Metrics.register(factory.createMetricName("RequestsSizeByIpDistribution"),
+ new Histogram(ipUsageReservoir)
+ {
+ public long getCount()
+ {
+ return ipUsageReservoir.size();
+ }
+ });
authSuccess = registerMeter("AuthSuccess");
authFailure = registerMeter("AuthFailure");
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 69c87ee..b1fad53 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -31,6 +31,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.codahale.metrics.Reservoir;
+import com.codahale.metrics.Snapshot;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
@@ -44,7 +46,6 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
-import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.Version;
@@ -57,6 +58,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.DecayingEstimatedHistogramReservoir;
import org.apache.cassandra.net.ResourceLimits;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaChangeListener;
@@ -366,6 +368,21 @@ public class Server implements CassandraDaemon.Server
}
}
+ public static long getCurrentGlobalUsage()
+ {
+ return globalRequestPayloadInFlight.using();
+ }
+
+ public static Snapshot getCurrentIpUsage()
+ {
+ DecayingEstimatedHistogramReservoir histogram = new DecayingEstimatedHistogramReservoir();
+ for (EndpointPayloadTracker tracker : requestPayloadInFlightPerEndpoint.values())
+ {
+ histogram.update(tracker.endpointAndGlobalPayloadsInFlight.endpoint().using());
+ }
+ return histogram.getSnapshot();
+ }
+
public static long getGlobalLimit()
{
return DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytes();
@@ -404,6 +421,31 @@ public class Server implements CassandraDaemon.Server
if (-1 == refCount.updateAndGet(i -> i == 1 ? -1 : i - 1))
requestPayloadInFlightPerEndpoint.remove(endpoint, this);
}
+
+ /**
+ * This will recompute the ip usage histo on each query of the snapshot when requested instead of trying to keep
+ * a histogram up to date with each request
+ */
+ public static Reservoir ipUsageReservoir()
+ {
+ return new Reservoir()
+ {
+ public int size()
+ {
+ return requestPayloadInFlightPerEndpoint.size();
+ }
+
+ public void update(long l)
+ {
+ throw new IllegalStateException();
+ }
+
+ public Snapshot getSnapshot()
+ {
+ return getCurrentIpUsage();
+ }
+ };
+ }
}
private static class Initializer extends ChannelInitializer<Channel>
diff --git a/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java b/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java
index 3c18a75..5ff0fa2 100644
--- a/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java
+++ b/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java
@@ -18,6 +18,11 @@
package org.apache.cassandra.transport;
+import java.io.IOException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableList;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -31,7 +36,15 @@ import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.virtual.AbstractVirtualTable;
+import org.apache.cassandra.db.virtual.SimpleDataSet;
+import org.apache.cassandra.db.virtual.VirtualKeyspace;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
+import org.apache.cassandra.db.virtual.VirtualTable;
import org.apache.cassandra.exceptions.OverloadedException;
+import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.transport.messages.QueryMessage;
@RunWith(OrderedJUnit4ClassRunner.class)
@@ -41,6 +54,16 @@ public class InflightRequestPayloadTrackerTest extends CQLTester
private static long LOW_LIMIT = 600L;
private static long HIGH_LIMIT = 5000000000L;
+ private static final QueryOptions V5_DEFAULT_OPTIONS = QueryOptions.create(
+ QueryOptions.DEFAULT.getConsistency(),
+ QueryOptions.DEFAULT.getValues(),
+ QueryOptions.DEFAULT.skipMetadata(),
+ QueryOptions.DEFAULT.getPageSize(),
+ QueryOptions.DEFAULT.getPagingState(),
+ QueryOptions.DEFAULT.getSerialConsistency(),
+ ProtocolVersion.V5,
+ KEYSPACE);
+
@BeforeClass
public static void setUp()
{
@@ -69,137 +92,127 @@ public class InflightRequestPayloadTrackerTest extends CQLTester
}
}
+ private SimpleClient client() throws IOException
+ {
+ return new SimpleClient(nativeAddr.getHostAddress(),
+ nativePort,
+ ProtocolVersion.V5,
+ true,
+ new EncryptionOptions())
+ .connect(false, false, true);
+ }
+
@Test
public void testQueryExecutionWithThrowOnOverload() throws Throwable
{
- SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
- nativePort,
- ProtocolVersion.V5,
- true,
- new EncryptionOptions());
-
- try
+ try (SimpleClient client = client())
{
- client.connect(false, false, true);
- QueryOptions queryOptions = QueryOptions.create(
- QueryOptions.DEFAULT.getConsistency(),
- QueryOptions.DEFAULT.getValues(),
- QueryOptions.DEFAULT.skipMetadata(),
- QueryOptions.DEFAULT.getPageSize(),
- QueryOptions.DEFAULT.getPagingState(),
- QueryOptions.DEFAULT.getSerialConsistency(),
- ProtocolVersion.V5,
- KEYSPACE);
-
QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable (pk1 int PRIMARY KEY, v text)",
- queryOptions);
+ V5_DEFAULT_OPTIONS);
client.execute(queryMessage);
}
- finally
- {
- client.close();
- }
}
@Test
public void testQueryExecutionWithoutThrowOnOverload() throws Throwable
{
- SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
- nativePort,
- ProtocolVersion.V5,
- true,
- new EncryptionOptions());
-
- try
+ try (SimpleClient client = client())
{
client.connect(false, false, false);
- QueryOptions queryOptions = QueryOptions.create(
- QueryOptions.DEFAULT.getConsistency(),
- QueryOptions.DEFAULT.getValues(),
- QueryOptions.DEFAULT.skipMetadata(),
- QueryOptions.DEFAULT.getPageSize(),
- QueryOptions.DEFAULT.getPagingState(),
- QueryOptions.DEFAULT.getSerialConsistency(),
- ProtocolVersion.V5,
- KEYSPACE);
QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable (pk int PRIMARY KEY, v text)",
- queryOptions);
+ V5_DEFAULT_OPTIONS);
client.execute(queryMessage);
queryMessage = new QueryMessage("SELECT * FROM atable",
- queryOptions);
+ V5_DEFAULT_OPTIONS);
client.execute(queryMessage);
}
- finally
+ }
+
+ @Test
+ public void testQueryUpdatesConcurrentMetricsUpdate() throws Throwable
+ {
+ try (SimpleClient client = client())
{
- client.close();
+ CyclicBarrier barrier = new CyclicBarrier(2);
+ String table = createTableName();
+
+ // reusing table name for keyspace name since cannot reuse KEYSPACE and want it to be unique
+ TableMetadata tableMetadata =
+ TableMetadata.builder(table, table)
+ .kind(TableMetadata.Kind.VIRTUAL)
+ .addPartitionKeyColumn("pk", UTF8Type.instance)
+ .addRegularColumn("v", Int32Type.instance)
+ .build();
+
+ VirtualTable vt1 = new AbstractVirtualTable.SimpleTable(tableMetadata, () -> {
+ try
+ {
+ // sync up with main thread thats waiting for query to be in progress
+ barrier.await(30, TimeUnit.SECONDS);
+ // wait until metric has been checked
+ barrier.await(30, TimeUnit.SECONDS);
+ }
+ catch (Exception e)
+ {
+ // ignore interuption and barrier exceptions
+ }
+ return new SimpleDataSet(tableMetadata);
+ });
+ VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(table, ImmutableList.of(vt1)));
+
+ final QueryMessage queryMessage = new QueryMessage(String.format("SELECT * FROM %s.%s", table, table),
+ V5_DEFAULT_OPTIONS);
+
+ Assert.assertEquals(0L, Server.EndpointPayloadTracker.getCurrentGlobalUsage());
+ try
+ {
+ Thread tester = new Thread(() -> client.execute(queryMessage));
+ tester.setDaemon(true); // so wont block exit if something fails
+ tester.start();
+ // block until query in progress
+ barrier.await(30, TimeUnit.SECONDS);
+ Assert.assertTrue(Server.EndpointPayloadTracker.getCurrentGlobalUsage() > 0);
+ } finally
+ {
+ // notify query thread that metric has been checked. This will also throw TimeoutException if both
+ // the query threads barriers are not reached
+ barrier.await(30, TimeUnit.SECONDS);
+ }
}
}
@Test
public void testQueryExecutionWithoutThrowOnOverloadAndInflightLimitedExceeded() throws Throwable
{
- SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
- nativePort,
- ProtocolVersion.V5,
- true,
- new EncryptionOptions());
-
- try
+ try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+ nativePort,
+ ProtocolVersion.V5,
+ true,
+ new EncryptionOptions()))
{
client.connect(false, false, false);
- QueryOptions queryOptions = QueryOptions.create(
- QueryOptions.DEFAULT.getConsistency(),
- QueryOptions.DEFAULT.getValues(),
- QueryOptions.DEFAULT.skipMetadata(),
- QueryOptions.DEFAULT.getPageSize(),
- QueryOptions.DEFAULT.getPagingState(),
- QueryOptions.DEFAULT.getSerialConsistency(),
- ProtocolVersion.V5,
- KEYSPACE);
-
QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable (pk int PRIMARY KEY, v text)",
- queryOptions);
+ V5_DEFAULT_OPTIONS);
client.execute(queryMessage);
queryMessage = new QueryMessage("INSERT INTO atable (pk, v) VALUES (1, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa [...]
- queryOptions);
+ V5_DEFAULT_OPTIONS);
client.execute(queryMessage);
}
- finally
- {
- client.close();
- }
}
@Test
public void testOverloadedExceptionForEndpointInflightLimit() throws Throwable
{
- SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
- nativePort,
- ProtocolVersion.V5,
- true,
- new EncryptionOptions());
-
- try
+ try (SimpleClient client = client())
{
- client.connect(false, false, true);
- QueryOptions queryOptions = QueryOptions.create(
- QueryOptions.DEFAULT.getConsistency(),
- QueryOptions.DEFAULT.getValues(),
- QueryOptions.DEFAULT.skipMetadata(),
- QueryOptions.DEFAULT.getPageSize(),
- QueryOptions.DEFAULT.getPagingState(),
- QueryOptions.DEFAULT.getSerialConsistency(),
- ProtocolVersion.V5,
- KEYSPACE);
-
QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable (pk int PRIMARY KEY, v text)",
- queryOptions);
+ V5_DEFAULT_OPTIONS);
client.execute(queryMessage);
queryMessage = new QueryMessage("INSERT INTO atable (pk, v) VALUES (1, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa [...]
- queryOptions);
+ V5_DEFAULT_OPTIONS);
try
{
client.execute(queryMessage);
@@ -210,40 +223,19 @@ public class InflightRequestPayloadTrackerTest extends CQLTester
Assert.assertTrue(e.getCause() instanceof OverloadedException);
}
}
- finally
- {
- client.close();
- }
}
@Test
public void testOverloadedExceptionForOverallInflightLimit() throws Throwable
{
- SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
- nativePort,
- ProtocolVersion.V5,
- true,
- new EncryptionOptions());
-
- try
+ try (SimpleClient client = client())
{
- client.connect(false, false, true);
- QueryOptions queryOptions = QueryOptions.create(
- QueryOptions.DEFAULT.getConsistency(),
- QueryOptions.DEFAULT.getValues(),
- QueryOptions.DEFAULT.skipMetadata(),
- QueryOptions.DEFAULT.getPageSize(),
- QueryOptions.DEFAULT.getPagingState(),
- QueryOptions.DEFAULT.getSerialConsistency(),
- ProtocolVersion.V5,
- KEYSPACE);
-
QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable (pk int PRIMARY KEY, v text)",
- queryOptions);
+ V5_DEFAULT_OPTIONS);
client.execute(queryMessage);
queryMessage = new QueryMessage("INSERT INTO atable (pk, v) VALUES (1, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa [...]
- queryOptions);
+ V5_DEFAULT_OPTIONS);
try
{
client.execute(queryMessage);
@@ -254,40 +246,20 @@ public class InflightRequestPayloadTrackerTest extends CQLTester
Assert.assertTrue(e.getCause() instanceof OverloadedException);
}
}
- finally
- {
- client.close();
- }
}
@Test
public void testChangingLimitsAtRuntime() throws Throwable
{
- SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
- nativePort,
- ProtocolVersion.V5,
- true,
- new EncryptionOptions());
- client.connect(false, true, true);
-
- QueryOptions queryOptions = QueryOptions.create(
- QueryOptions.DEFAULT.getConsistency(),
- QueryOptions.DEFAULT.getValues(),
- QueryOptions.DEFAULT.skipMetadata(),
- QueryOptions.DEFAULT.getPageSize(),
- QueryOptions.DEFAULT.getPagingState(),
- QueryOptions.DEFAULT.getSerialConsistency(),
- ProtocolVersion.V5,
- KEYSPACE);
-
+ SimpleClient client = client();
try
{
QueryMessage queryMessage = new QueryMessage(String.format("CREATE TABLE %s.atable (pk int PRIMARY KEY, v text)", KEYSPACE),
- queryOptions);
+ V5_DEFAULT_OPTIONS);
client.execute(queryMessage);
queryMessage = new QueryMessage(String.format("INSERT INTO %s.atable (pk, v) VALUES (1, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa [...]
- queryOptions);
+ V5_DEFAULT_OPTIONS);
try
{
client.execute(queryMessage);
@@ -298,14 +270,13 @@ public class InflightRequestPayloadTrackerTest extends CQLTester
Assert.assertTrue(e.getCause() instanceof OverloadedException);
}
-
// change global limit, query will still fail because endpoint limit
Server.EndpointPayloadTracker.setGlobalLimit(HIGH_LIMIT);
Assert.assertEquals("new global limit not returned by EndpointPayloadTrackers", HIGH_LIMIT, Server.EndpointPayloadTracker.getGlobalLimit());
Assert.assertEquals("new global limit not returned by DatabaseDescriptor", HIGH_LIMIT, DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytes());
queryMessage = new QueryMessage(String.format("INSERT INTO %s.atable (pk, v) VALUES (1, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa [...]
- queryOptions);
+ V5_DEFAULT_OPTIONS);
try
{
client.execute(queryMessage);
@@ -322,7 +293,7 @@ public class InflightRequestPayloadTrackerTest extends CQLTester
Assert.assertEquals("new endpoint limit not returned by DatabaseDescriptor", HIGH_LIMIT, DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytesPerIp());
queryMessage = new QueryMessage(String.format("INSERT INTO %s.atable (pk, v) VALUES (1, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa [...]
- queryOptions);
+ V5_DEFAULT_OPTIONS);
client.execute(queryMessage);
// ensure new clients also see the new raised limits
@@ -335,7 +306,7 @@ public class InflightRequestPayloadTrackerTest extends CQLTester
client.connect(false, true, true);
queryMessage = new QueryMessage(String.format("INSERT INTO %s.atable (pk, v) VALUES (1, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa [...]
- queryOptions);
+ V5_DEFAULT_OPTIONS);
client.execute(queryMessage);
// lower the global limit and ensure the query fails again
@@ -344,7 +315,7 @@ public class InflightRequestPayloadTrackerTest extends CQLTester
Assert.assertEquals("new global limit not returned by DatabaseDescriptor", LOW_LIMIT, DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytes());
queryMessage = new QueryMessage(String.format("INSERT INTO %s.atable (pk, v) VALUES (1, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa [...]
- queryOptions);
+ V5_DEFAULT_OPTIONS);
try
{
client.execute(queryMessage);
@@ -361,7 +332,7 @@ public class InflightRequestPayloadTrackerTest extends CQLTester
Assert.assertEquals("new endpoint limit not returned by DatabaseDescriptor", 60, DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytesPerIp());
queryMessage = new QueryMessage(String.format("CREATE TABLE %s.atable (pk int PRIMARY KEY, v text)", KEYSPACE),
- queryOptions);
+ V5_DEFAULT_OPTIONS);
try
{
client.execute(queryMessage);
@@ -382,7 +353,7 @@ public class InflightRequestPayloadTrackerTest extends CQLTester
client.connect(false, true, true);
queryMessage = new QueryMessage(String.format("CREATE TABLE %s.atable (pk int PRIMARY KEY, v text)", KEYSPACE),
- queryOptions);
+ V5_DEFAULT_OPTIONS);
try
{
client.execute(queryMessage);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org