You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2019/07/15 13:54:15 UTC
[cassandra] branch cassandra-3.0 updated: Prevent client requests
from blocking on executor task queue
This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
new 5a03898 Prevent client requests from blocking on executor task queue
5a03898 is described below
commit 5a03898c680ed6ada63901e8a4b278ccc8070717
Author: sumanthpasupuleti <su...@gmail.com>
AuthorDate: Mon Mar 25 08:06:13 2019 -0700
Prevent client requests from blocking on executor task queue
patch by Sumanth Pasupuleti, reviewed by Benedict for CASSANDRA-15013
---
CHANGES.txt | 1 +
doc/native_protocol_v4.spec | 4 +
src/java/org/apache/cassandra/config/Config.java | 3 +
.../cassandra/config/DatabaseDescriptor.java | 31 +++
.../apache/cassandra/metrics/ClientMetrics.java | 56 ++++-
.../org/apache/cassandra/net/ResourceLimits.java | 245 ++++++++++++++++++++
.../cassandra/service/NativeTransportService.java | 26 +--
.../org/apache/cassandra/transport/Connection.java | 11 +
src/java/org/apache/cassandra/transport/Frame.java | 12 +-
.../org/apache/cassandra/transport/Message.java | 146 +++++++++++-
.../transport/RequestThreadPoolExecutor.java | 96 --------
.../org/apache/cassandra/transport/Server.java | 64 ++++--
.../apache/cassandra/transport/SimpleClient.java | 10 +
.../transport/messages/StartupMessage.java | 3 +
test/unit/org/apache/cassandra/cql3/CQLTester.java | 2 +
.../service/NativeTransportServiceTest.java | 3 +-
.../InflightRequestPayloadTrackerTest.java | 248 +++++++++++++++++++++
17 files changed, 811 insertions(+), 150 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index c8bd30d..68d309c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.19
+ * Prevent client requests from blocking on executor task queue (CASSANDRA-15013)
* Toughen up column drop/recreate type validations (CASSANDRA-15204)
* LegacyLayout should handle paging states that cross a collection column (CASSANDRA-15201)
* Prevent RuntimeException when username or password is empty/null (CASSANDRA-15198)
diff --git a/doc/native_protocol_v4.spec b/doc/native_protocol_v4.spec
index 02802a7..8beb77b 100644
--- a/doc/native_protocol_v4.spec
+++ b/doc/native_protocol_v4.spec
@@ -275,6 +275,9 @@ Table of Contents
mode. This mode will make all Thrift and Compact Tables to be exposed as if
they were CQL Tables. This is optional; if not specified, the option will
not be used.
+ - "THROW_ON_OVERLOAD": In case of server overloaded with too many requests, by default the server puts
+ back pressure on the client connection. Instead, the server can send an OverloadedException error message back to
+ the client if this option is set to true.
4.1.2. AUTH_RESPONSE
@@ -1175,3 +1178,4 @@ Table of Contents
* The <paging_state> returned in the v4 protocol is not compatible with the v3
protocol. In other words, a <paging_state> returned by a node using protocol v4
should not be used to query a node using protocol v3 (and vice-versa).
+ * Added THROW_ON_OVERLOAD startup option (Section 4.1.1).
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index de158bd..830d3e1 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -154,6 +154,9 @@ public class Config
public volatile Long native_transport_max_concurrent_connections = -1L;
public volatile Long native_transport_max_concurrent_connections_per_ip = -1L;
public boolean native_transport_flush_in_batches_legacy = true;
+ public volatile long native_transport_max_concurrent_requests_in_bytes_per_ip = -1L;
+ public volatile long native_transport_max_concurrent_requests_in_bytes = -1L;
+
@Deprecated
public Integer thrift_max_message_length_in_mb = 16;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index db55c20..8417c39 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -470,6 +470,17 @@ public class DatabaseDescriptor
{
throw new ConfigurationException("Missing endpoint_snitch directive", false);
}
+
+ if (conf.native_transport_max_concurrent_requests_in_bytes <= 0)
+ {
+ conf.native_transport_max_concurrent_requests_in_bytes = Runtime.getRuntime().maxMemory() / 10;
+ }
+
+ if (conf.native_transport_max_concurrent_requests_in_bytes_per_ip <= 0)
+ {
+ conf.native_transport_max_concurrent_requests_in_bytes_per_ip = Runtime.getRuntime().maxMemory() / 40;
+ }
+
snitch = createEndpointSnitch(conf.endpoint_snitch);
EndpointSnitchInfo.create();
@@ -1524,6 +1535,26 @@ public class DatabaseDescriptor
conf.commitlog_sync_batch_window_in_ms = windowMillis;
}
+ public static long getNativeTransportMaxConcurrentRequestsInBytesPerIp()
+ {
+ return conf.native_transport_max_concurrent_requests_in_bytes_per_ip;
+ }
+
+ public static void setNativeTransportMaxConcurrentRequestsInBytesPerIp(long maxConcurrentRequestsInBytes)
+ {
+ conf.native_transport_max_concurrent_requests_in_bytes_per_ip = maxConcurrentRequestsInBytes;
+ }
+
+ public static long getNativeTransportMaxConcurrentRequestsInBytes()
+ {
+ return conf.native_transport_max_concurrent_requests_in_bytes;
+ }
+
+ public static void setNativeTransportMaxConcurrentRequestsInBytes(long maxConcurrentRequestsInBytes)
+ {
+ conf.native_transport_max_concurrent_requests_in_bytes = maxConcurrentRequestsInBytes;
+ }
+
public static int getCommitLogSyncPeriod()
{
return conf.commitlog_sync_period_in_ms;
diff --git a/src/java/org/apache/cassandra/metrics/ClientMetrics.java b/src/java/org/apache/cassandra/metrics/ClientMetrics.java
index 4a384eb..08f0531 100644
--- a/src/java/org/apache/cassandra/metrics/ClientMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ClientMetrics.java
@@ -18,9 +18,14 @@
*/
package org.apache.cassandra.metrics;
+import java.util.Collection;
+import java.util.Collections;
import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
+import org.apache.cassandra.transport.Server;
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
@@ -28,13 +33,40 @@ import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
public class ClientMetrics
{
private static final MetricNameFactory factory = new DefaultNameFactory("Client");
-
public static final ClientMetrics instance = new ClientMetrics();
-
+
+ private volatile boolean initialized = false;
+
+ private Collection<Server> servers = Collections.emptyList();
+
+ private AtomicInteger pausedConnections;
+ private Gauge<Integer> pausedConnectionsGauge;
+ private Meter requestDiscarded;
+
private ClientMetrics()
{
}
+ public void pauseConnection() { pausedConnections.incrementAndGet(); }
+ public void unpauseConnection() { pausedConnections.decrementAndGet(); }
+ public void markRequestDiscarded() { requestDiscarded.mark(); }
+
+ public synchronized void init(Collection<Server> servers)
+ {
+ if (initialized)
+ return;
+
+ this.servers = servers;
+
+ registerGauge("connectedNativeClients", this::countConnectedClients);
+
+ pausedConnections = new AtomicInteger();
+ pausedConnectionsGauge = registerGauge("PausedConnections", pausedConnections::get);
+ requestDiscarded = registerMeter("RequestDiscarded");
+
+ initialized = true;
+ }
+
public void addCounter(String name, final Callable<Integer> provider)
{
Metrics.register(factory.createMetricName(name), new Gauge<Integer>()
@@ -51,4 +83,24 @@ public class ClientMetrics
}
});
}
+
+ private int countConnectedClients()
+ {
+ int count = 0;
+
+ for (Server server : servers)
+ count += server.getConnectedClients();
+
+ return count;
+ }
+
+ private <T> Gauge<T> registerGauge(String name, Gauge<T> gauge)
+ {
+ return Metrics.register(factory.createMetricName(name), gauge);
+ }
+
+ private Meter registerMeter(String name)
+ {
+ return Metrics.meter(factory.createMetricName(name));
+ }
}
diff --git a/src/java/org/apache/cassandra/net/ResourceLimits.java b/src/java/org/apache/cassandra/net/ResourceLimits.java
new file mode 100644
index 0000000..f8d24d7
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/ResourceLimits.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
+public abstract class ResourceLimits
+{
+ /**
+ * Represents permits to utilise a resource and ways to allocate and release them.
+ *
+ * Two implementations are currently provided:
+ * 1. {@link Concurrent}, for shared limits, which is thread-safe;
+ * 2. {@link Basic}, for limits that are not shared between threads, is not thread-safe.
+ */
+ public interface Limit
+ {
+ /**
+ * @return total amount of permits represented by this {@link Limit} - the capacity
+ */
+ long limit();
+
+ /**
+ * @return remaining, unallocated permit amount
+ */
+ long remaining();
+
+ /**
+ * @return amount of permits currently in use
+ */
+ long using();
+
+ /**
+ * Attempts to allocate an amount of permits from this limit. If allocated, <em>MUST</em> eventually
+ * be released back with {@link #release(long)}.
+ *
+ * @return {@code true} if the allocation was successful, {@code false} otherwise
+ */
+ boolean tryAllocate(long amount);
+
+ /**
+ * Allocates an amount independent of permits available from this limit. <em>MUST</em> eventually
+ * be released back with {@link #release(long)}.
+ *
+ */
+ void allocate(long amount);
+
+ /**
+ * @param amount return the amount of permits back to this limit
+ * @return {@code ABOVE_LIMIT} if there aren't enough permits available even after the release, or
+ * {@code BELOW_LIMIT} if there are enough permits available after the releaese.
+ */
+ Outcome release(long amount);
+ }
+
+ /**
+ * A thread-safe permit container.
+ */
+ public static class Concurrent implements Limit
+ {
+ private final long limit;
+
+ private volatile long using;
+ private static final AtomicLongFieldUpdater<Concurrent> usingUpdater =
+ AtomicLongFieldUpdater.newUpdater(Concurrent.class, "using");
+
+ public Concurrent(long limit)
+ {
+ this.limit = limit;
+ }
+
+ public long limit()
+ {
+ return limit;
+ }
+
+ public long remaining()
+ {
+ return limit - using;
+ }
+
+ public long using()
+ {
+ return using;
+ }
+
+ public boolean tryAllocate(long amount)
+ {
+ long current, next;
+ do
+ {
+ current = using;
+ next = current + amount;
+
+ if (next > limit)
+ return false;
+ }
+ while (!usingUpdater.compareAndSet(this, current, next));
+
+ return true;
+ }
+
+ public void allocate(long amount)
+ {
+ long current, next;
+ do
+ {
+ current = using;
+ next = current + amount;
+ } while (!usingUpdater.compareAndSet(this, current, next));
+ }
+
+ public Outcome release(long amount)
+ {
+ assert amount >= 0;
+ long using = usingUpdater.addAndGet(this, -amount);
+ assert using >= 0;
+ return using >= limit ? Outcome.ABOVE_LIMIT : Outcome.BELOW_LIMIT;
+ }
+ }
+
+ /**
+ * A cheaper, thread-unsafe permit container to be used for unshared limits.
+ */
+ static class Basic implements Limit
+ {
+ private final long limit;
+ private long using;
+
+ Basic(long limit)
+ {
+ this.limit = limit;
+ }
+
+ public long limit()
+ {
+ return limit;
+ }
+
+ public long remaining()
+ {
+ return limit - using;
+ }
+
+ public long using()
+ {
+ return using;
+ }
+
+ public boolean tryAllocate(long amount)
+ {
+ if (using + amount > limit)
+ return false;
+
+ using += amount;
+ return true;
+ }
+
+ public void allocate(long amount)
+ {
+ using += amount;
+ }
+
+ public Outcome release(long amount)
+ {
+ assert amount >= 0 && amount <= using;
+ using -= amount;
+ return using >= limit ? Outcome.ABOVE_LIMIT : Outcome.BELOW_LIMIT;
+ }
+ }
+
+ /**
+ * A convenience class that groups a per-endpoint limit with the global one
+ * to allow allocating/releasing permits from/to both limits as one logical operation.
+ */
+ public static class EndpointAndGlobal
+ {
+ final Limit endpoint;
+ final Limit global;
+
+ public EndpointAndGlobal(Limit endpoint, Limit global)
+ {
+ this.endpoint = endpoint;
+ this.global = global;
+ }
+
+ public Limit endpoint()
+ {
+ return endpoint;
+ }
+
+ public Limit global()
+ {
+ return global;
+ }
+
+ /**
+ * @return {@code INSUFFICIENT_GLOBAL} if there weren't enough permits in the global limit, or
+ * {@code INSUFFICIENT_ENDPOINT} if there weren't enough permits in the per-endpoint limit, or
+ * {@code SUCCESS} if there were enough permits to take from both.
+ */
+ public Outcome tryAllocate(long amount)
+ {
+ if (!global.tryAllocate(amount))
+ return Outcome.INSUFFICIENT_GLOBAL;
+
+ if (endpoint.tryAllocate(amount))
+ return Outcome.SUCCESS;
+
+ global.release(amount);
+ return Outcome.INSUFFICIENT_ENDPOINT;
+ }
+
+ public void allocate(long amount)
+ {
+ global.allocate(amount);
+ endpoint.allocate(amount);
+ }
+
+ public Outcome release(long amount)
+ {
+ Outcome endpointReleaseOutcome = endpoint.release(amount);
+ Outcome globalReleaseOutcome = global.release(amount);
+ return (endpointReleaseOutcome == Outcome.ABOVE_LIMIT || globalReleaseOutcome == Outcome.ABOVE_LIMIT)
+ ? Outcome.ABOVE_LIMIT : Outcome.BELOW_LIMIT;
+ }
+ }
+
+ public enum Outcome { SUCCESS, INSUFFICIENT_ENDPOINT, INSUFFICIENT_GLOBAL, BELOW_LIMIT, ABOVE_LIMIT }
+}
diff --git a/src/java/org/apache/cassandra/service/NativeTransportService.java b/src/java/org/apache/cassandra/service/NativeTransportService.java
index 48839f1..2280818 100644
--- a/src/java/org/apache/cassandra/service/NativeTransportService.java
+++ b/src/java/org/apache/cassandra/service/NativeTransportService.java
@@ -31,11 +31,9 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.util.concurrent.EventExecutor;
-import io.netty.util.concurrent.Future;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.metrics.ClientMetrics;
-import org.apache.cassandra.transport.RequestThreadPoolExecutor;
+import org.apache.cassandra.transport.Message;
import org.apache.cassandra.transport.Server;
/**
@@ -50,7 +48,6 @@ public class NativeTransportService
private boolean initialized = false;
private EventLoopGroup workerGroup;
- private EventExecutor eventExecutorGroup;
/**
* Creates netty thread pools and event loops.
@@ -61,9 +58,6 @@ public class NativeTransportService
if (initialized)
return;
- // prepare netty resources
- eventExecutorGroup = new RequestThreadPoolExecutor();
-
if (useEpoll())
{
workerGroup = new EpollEventLoopGroup();
@@ -80,7 +74,6 @@ public class NativeTransportService
InetAddress nativeAddr = DatabaseDescriptor.getRpcAddress();
org.apache.cassandra.transport.Server.Builder builder = new org.apache.cassandra.transport.Server.Builder()
- .withEventExecutor(eventExecutorGroup)
.withEventLoopGroup(workerGroup)
.withHost(nativeAddr);
@@ -108,13 +101,7 @@ public class NativeTransportService
}
// register metrics
- ClientMetrics.instance.addCounter("connectedNativeClients", () ->
- {
- int ret = 0;
- for (Server server : servers)
- ret += server.getConnectedClients();
- return ret;
- });
+ ClientMetrics.instance.init(servers);
initialized = true;
}
@@ -147,8 +134,7 @@ public class NativeTransportService
// shutdown executors used by netty for native transport server
workerGroup.shutdownGracefully(3, 5, TimeUnit.SECONDS).awaitUninterruptibly();
- // shutdownGracefully not implemented yet in RequestThreadPoolExecutor
- eventExecutorGroup.shutdown();
+ Message.Dispatcher.shutdown();
}
/**
@@ -177,12 +163,6 @@ public class NativeTransportService
}
@VisibleForTesting
- EventExecutor getEventExecutor()
- {
- return eventExecutorGroup;
- }
-
- @VisibleForTesting
Collection<Server> getServers()
{
return servers;
diff --git a/src/java/org/apache/cassandra/transport/Connection.java b/src/java/org/apache/cassandra/transport/Connection.java
index af26557..2966d9b 100644
--- a/src/java/org/apache/cassandra/transport/Connection.java
+++ b/src/java/org/apache/cassandra/transport/Connection.java
@@ -29,6 +29,7 @@ public class Connection
private final Tracker tracker;
private volatile FrameCompressor frameCompressor;
+ private boolean throwOnOverload;
public Connection(Channel channel, int version, Tracker tracker)
{
@@ -49,6 +50,16 @@ public class Connection
return frameCompressor;
}
+ public void setThrowOnOverload(boolean throwOnOverload)
+ {
+ this.throwOnOverload = throwOnOverload;
+ }
+
+ public boolean isThrowOnOverload()
+ {
+ return throwOnOverload;
+ }
+
public Tracker getTracker()
{
return tracker;
diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java
index 3940b47..c28be9f 100644
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -68,7 +68,7 @@ public class Frame
public static Frame create(Message.Type type, int streamId, int version, EnumSet<Header.Flag> flags, ByteBuf body)
{
- Header header = new Header(version, flags, streamId, type);
+ Header header = new Header(version, flags, streamId, type, body.readableBytes());
return new Frame(header, body);
}
@@ -83,18 +83,20 @@ public class Frame
public final EnumSet<Flag> flags;
public final int streamId;
public final Message.Type type;
+ public final long bodySizeInBytes;
- private Header(int version, int flags, int streamId, Message.Type type)
+ private Header(int version, int flags, int streamId, Message.Type type, long bodySizeInBytes)
{
- this(version, Flag.deserialize(flags), streamId, type);
+ this(version, Flag.deserialize(flags), streamId, type, bodySizeInBytes);
}
- private Header(int version, EnumSet<Flag> flags, int streamId, Message.Type type)
+ private Header(int version, EnumSet<Flag> flags, int streamId, Message.Type type, long bodySizeInBytes)
{
this.version = version;
this.flags = flags;
this.streamId = streamId;
this.type = type;
+ this.bodySizeInBytes = bodySizeInBytes;
}
public static enum Flag
@@ -240,7 +242,7 @@ public class Frame
streamId);
}
- results.add(new Frame(new Header(version, flags, streamId, type), body));
+ results.add(new Frame(new Header(version, flags, streamId, type, bodyLength), body));
}
private void fail()
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 0851b19..08a8600 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -42,11 +42,18 @@ import com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.LocalAwareExecutorService;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.OverloadedException;
+import org.apache.cassandra.metrics.ClientMetrics;
+import org.apache.cassandra.net.ResourceLimits;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.transport.messages.*;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.utils.JVMStabilityInspector;
+import static org.apache.cassandra.concurrent.SharedExecutorPool.SHARED;
+
/**
* A message from the CQL binary protocol.
*/
@@ -404,19 +411,42 @@ public abstract class Message
}
}
- @ChannelHandler.Sharable
public static class Dispatcher extends SimpleChannelInboundHandler<Request>
{
+ private static final LocalAwareExecutorService requestExecutor = SHARED.newExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(),
+ Integer.MAX_VALUE,
+ "transport",
+ "Native-Transport-Requests");
+
+ /**
+ * Current count of *request* bytes that are live on the channel.
+ *
+ * Note: should only be accessed while on the netty event loop.
+ */
+ private long channelPayloadBytesInFlight;
+
+ private final Server.EndpointPayloadTracker endpointPayloadTracker;
+
+ private boolean paused;
+
private static class FlushItem
{
final ChannelHandlerContext ctx;
final Object response;
final Frame sourceFrame;
- private FlushItem(ChannelHandlerContext ctx, Object response, Frame sourceFrame)
+ final Dispatcher dispatcher;
+
+ private FlushItem(ChannelHandlerContext ctx, Object response, Frame sourceFrame, Dispatcher dispatcher)
{
this.ctx = ctx;
this.sourceFrame = sourceFrame;
this.response = response;
+ this.dispatcher = dispatcher;
+ }
+
+ public void release()
+ {
+ dispatcher.releaseItem(this);
}
}
@@ -472,7 +502,7 @@ public abstract class Message
for (ChannelHandlerContext channel : channels)
channel.flush();
for (FlushItem item : flushed)
- item.sourceFrame.release();
+ item.release();
channels.clear();
flushed.clear();
@@ -524,7 +554,7 @@ public abstract class Message
for (ChannelHandlerContext channel : channels)
channel.flush();
for (FlushItem item : flushed)
- item.sourceFrame.release();
+ item.release();
channels.clear();
flushed.clear();
@@ -536,16 +566,98 @@ public abstract class Message
private final boolean useLegacyFlusher;
- public Dispatcher(boolean useLegacyFlusher)
+ public Dispatcher(boolean useLegacyFlusher, Server.EndpointPayloadTracker endpointPayloadTracker)
{
super(false);
this.useLegacyFlusher = useLegacyFlusher;
+ this.endpointPayloadTracker = endpointPayloadTracker;
}
@Override
public void channelRead0(ChannelHandlerContext ctx, Request request)
{
+ // if we decide to handle this message, process it outside of the netty event loop
+ if (shouldHandleRequest(ctx, request))
+ requestExecutor.submit(() -> processRequest(ctx, request));
+ }
+
+ /** This check for inflight payload to potentially discard the request should have been ideally in one of the
+ * first handlers in the pipeline (Frame::decode()). However, incase of any exception thrown between that
+ * handler (where inflight payload is incremented) and this handler (Dispatcher::channelRead0) (where inflight
+ * payload in decremented), inflight payload becomes erroneous. ExceptionHandler is not sufficient for this
+ * purpose since it does not have the frame associated with the exception.
+ *
+ * Note: this method should execute on the netty event loop.
+ */
+ private boolean shouldHandleRequest(ChannelHandlerContext ctx, Request request)
+ {
+ long frameSize = request.getSourceFrame().header.bodySizeInBytes;
+
+ ResourceLimits.EndpointAndGlobal endpointAndGlobalPayloadsInFlight = endpointPayloadTracker.endpointAndGlobalPayloadsInFlight;
+
+ // check for overloaded state by trying to allocate framesize to inflight payload trackers
+ if (endpointAndGlobalPayloadsInFlight.tryAllocate(frameSize) != ResourceLimits.Outcome.SUCCESS)
+ {
+ if (request.connection.isThrowOnOverload())
+ {
+ // discard the request and throw an exception
+ ClientMetrics.instance.markRequestDiscarded();
+ logger.trace("Discarded request of size: {}. InflightChannelRequestPayload: {}, InflightEndpointRequestPayload: {}, InflightOverallRequestPayload: {}, Request: {}",
+ frameSize,
+ channelPayloadBytesInFlight,
+ endpointAndGlobalPayloadsInFlight.endpoint().using(),
+ endpointAndGlobalPayloadsInFlight.global().using(),
+ request);
+ throw ErrorMessage.wrap(new OverloadedException("Server is in overloaded state. Cannot accept more requests at this point"),
+ request.getSourceFrame().header.streamId);
+ }
+ else
+ {
+ // set backpressure on the channel, and handle the request
+ endpointAndGlobalPayloadsInFlight.allocate(frameSize);
+ ctx.channel().config().setAutoRead(false);
+ ClientMetrics.instance.pauseConnection();
+ paused = true;
+ }
+ }
+
+ channelPayloadBytesInFlight += frameSize;
+ return true;
+ }
+
+ /**
+ * Note: this method will be used in the {@link Flusher#run()}, which executes on the netty event loop
+ * ({@link Dispatcher#flusherLookup}). Thus, we assume the semantics and visibility of variables
+ * of being on the event loop.
+ */
+ private void releaseItem(FlushItem item)
+ {
+ long itemSize = item.sourceFrame.header.bodySizeInBytes;
+ item.sourceFrame.release();
+
+ // since the request has been processed, decrement inflight payload at channel, endpoint and global levels
+ channelPayloadBytesInFlight -= itemSize;
+ ResourceLimits.Outcome endpointGlobalReleaseOutcome = endpointPayloadTracker.endpointAndGlobalPayloadsInFlight.release(itemSize);
+
+ // now check to see if we need to reenable the channel's autoRead.
+ // If the current payload side is zero, we must reenable autoread as
+ // 1) we allow no other thread/channel to do it, and
+ // 2) there's no other events following this one (becuase we're at zero bytes in flight),
+ // so no successive to trigger the other clause in this if-block
+ ChannelConfig config = item.ctx.channel().config();
+ if (paused && (channelPayloadBytesInFlight == 0 || endpointGlobalReleaseOutcome == ResourceLimits.Outcome.BELOW_LIMIT))
+ {
+ paused = false;
+ ClientMetrics.instance.unpauseConnection();
+ config.setAutoRead(true);
+ }
+ }
+ /**
+ * Note: this method is not expected to execute on the netty event loop.
+ */
+ void processRequest(ChannelHandlerContext ctx, Request request)
+ {
final Response response;
final ServerConnection connection;
@@ -569,7 +681,7 @@ public abstract class Message
{
JVMStabilityInspector.inspectThrowable(t);
UnexpectedChannelExceptionHandler handler = new UnexpectedChannelExceptionHandler(ctx.channel(), true);
- flush(new FlushItem(ctx, ErrorMessage.fromException(t, handler).setStreamId(request.getStreamId()), request.getSourceFrame()));
+ flush(new FlushItem(ctx, ErrorMessage.fromException(t, handler).setStreamId(request.getStreamId()), request.getSourceFrame(), this));
return;
}
finally
@@ -578,7 +690,19 @@ public abstract class Message
}
logger.trace("Responding: {}, v={}", response, connection.getVersion());
- flush(new FlushItem(ctx, response, request.getSourceFrame()));
+ flush(new FlushItem(ctx, response, request.getSourceFrame(), this));
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx)
+ {
+ endpointPayloadTracker.release();
+ if (paused)
+ {
+ paused = false;
+ ClientMetrics.instance.unpauseConnection();
+ }
+ ctx.fireChannelInactive();
}
private void flush(FlushItem item)
@@ -596,6 +720,14 @@ public abstract class Message
flusher.queued.add(item);
flusher.start();
}
+
+ public static void shutdown()
+ {
+ if (requestExecutor != null)
+ {
+ requestExecutor.shutdown();
+ }
+ }
}
@ChannelHandler.Sharable
diff --git a/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java b/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java
deleted file mode 100644
index 75dd05d..0000000
--- a/src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.transport;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import io.netty.util.concurrent.AbstractEventExecutor;
-import io.netty.util.concurrent.EventExecutorGroup;
-import io.netty.util.concurrent.Future;
-import org.apache.cassandra.concurrent.LocalAwareExecutorService;
-import org.apache.cassandra.config.DatabaseDescriptor;
-
-import static org.apache.cassandra.concurrent.SharedExecutorPool.SHARED;
-
-public class RequestThreadPoolExecutor extends AbstractEventExecutor
-{
- private final static int MAX_QUEUED_REQUESTS = Integer.getInteger("cassandra.max_queued_native_transport_requests", 128);
- private final static String THREAD_FACTORY_ID = "Native-Transport-Requests";
- private final LocalAwareExecutorService wrapped = SHARED.newExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(),
- MAX_QUEUED_REQUESTS,
- "transport",
- THREAD_FACTORY_ID);
-
- public boolean isShuttingDown()
- {
- return wrapped.isShutdown();
- }
-
- public Future<?> shutdownGracefully(long l, long l2, TimeUnit timeUnit)
- {
- throw new IllegalStateException();
- }
-
- public Future<?> terminationFuture()
- {
- throw new IllegalStateException();
- }
-
- @Override
- public void shutdown()
- {
- wrapped.shutdown();
- }
-
- @Override
- public List<Runnable> shutdownNow()
- {
- return wrapped.shutdownNow();
- }
-
- public boolean isShutdown()
- {
- return wrapped.isShutdown();
- }
-
- public boolean isTerminated()
- {
- return wrapped.isTerminated();
- }
-
- public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
- {
- return wrapped.awaitTermination(timeout, unit);
- }
-
- public EventExecutorGroup parent()
- {
- return null;
- }
-
- public boolean inEventLoop(Thread thread)
- {
- return false;
- }
-
- public void execute(Runnable command)
- {
- wrapped.execute(command);
- }
-}
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 8c781db..83a676c 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -26,6 +26,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,6 +51,7 @@ import io.netty.util.internal.logging.Slf4JLoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.net.ResourceLimits;
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.service.*;
import org.apache.cassandra.transport.messages.EventMessage;
@@ -84,7 +87,6 @@ public class Server implements CassandraDaemon.Server
private final AtomicBoolean isRunning = new AtomicBoolean(false);
private EventLoopGroup workerGroup;
- private EventExecutor eventExecutorGroup;
private Server (Builder builder)
{
@@ -101,8 +103,6 @@ public class Server implements CassandraDaemon.Server
else
workerGroup = new NioEventLoopGroup();
}
- if (builder.eventExecutorGroup != null)
- eventExecutorGroup = builder.eventExecutorGroup;
EventNotifier notifier = new EventNotifier(this);
StorageService.instance.register(notifier);
MigrationManager.instance.register(notifier);
@@ -201,12 +201,6 @@ public class Server implements CassandraDaemon.Server
return this;
}
- public Builder withEventExecutor(EventExecutor eventExecutor)
- {
- this.eventExecutorGroup = eventExecutor;
- return this;
- }
-
public Builder withHost(InetAddress host)
{
this.hostAddr = host;
@@ -286,6 +280,49 @@ public class Server implements CassandraDaemon.Server
}
}
+ // global inflight payload across all channels across all endpoints
+ private static final ResourceLimits.Concurrent globalRequestPayloadInFlight = new ResourceLimits.Concurrent(DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytes());
+
+ public static class EndpointPayloadTracker
+ {
+ // inflight payload per endpoint across corresponding channels
+ private static final ConcurrentMap<InetAddress, EndpointPayloadTracker> requestPayloadInFlightPerEndpoint = new ConcurrentHashMap<>();
+
+ private final AtomicInteger refCount = new AtomicInteger(0);
+ private final InetAddress endpoint;
+
+ final ResourceLimits.EndpointAndGlobal endpointAndGlobalPayloadsInFlight = new ResourceLimits.EndpointAndGlobal(new ResourceLimits.Concurrent(DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytesPerIp()),
+ globalRequestPayloadInFlight);
+
+ private EndpointPayloadTracker(InetAddress endpoint)
+ {
+ this.endpoint = endpoint;
+ }
+
+ public static EndpointPayloadTracker get(InetAddress endpoint)
+ {
+ while (true)
+ {
+ EndpointPayloadTracker result = requestPayloadInFlightPerEndpoint.computeIfAbsent(endpoint, EndpointPayloadTracker::new);
+ if (result.acquire())
+ return result;
+
+ requestPayloadInFlightPerEndpoint.remove(endpoint, result);
+ }
+ }
+
+ private boolean acquire()
+ {
+ return 0 < refCount.updateAndGet(i -> i < 0 ? i : i + 1);
+ }
+
+ public void release()
+ {
+ if (-1 == refCount.updateAndGet(i -> i == 1 ? -1 : i - 1))
+ requestPayloadInFlightPerEndpoint.remove(endpoint, this);
+ }
+ }
+
private static class Initializer extends ChannelInitializer<Channel>
{
// Stateless handlers
@@ -295,7 +332,6 @@ public class Server implements CassandraDaemon.Server
private static final Frame.Compressor frameCompressor = new Frame.Compressor();
private static final Frame.Encoder frameEncoder = new Frame.Encoder();
private static final Message.ExceptionHandler exceptionHandler = new Message.ExceptionHandler();
- private static final Message.Dispatcher dispatcher = new Message.Dispatcher(DatabaseDescriptor.useNativeTransportLegacyFlusher());
private static final ConnectionLimitHandler connectionLimitHandler = new ConnectionLimitHandler();
private final Server server;
@@ -328,6 +364,9 @@ public class Server implements CassandraDaemon.Server
pipeline.addLast("messageDecoder", messageDecoder);
pipeline.addLast("messageEncoder", messageEncoder);
+ pipeline.addLast("executor", new Message.Dispatcher(DatabaseDescriptor.useNativeTransportLegacyFlusher(),
+ EndpointPayloadTracker.get(((InetSocketAddress) channel.remoteAddress()).getAddress())));
+
// The exceptionHandler will take care of handling exceptionCaught(...) events while still running
// on the same EventLoop as all previous added handlers in the pipeline. This is important as the used
// eventExecutorGroup may not enforce strict ordering for channel events.
@@ -335,11 +374,6 @@ public class Server implements CassandraDaemon.Server
// correctly handled before the handler itself is removed.
// See https://issues.apache.org/jira/browse/CASSANDRA-13649
pipeline.addLast("exceptionHandler", exceptionHandler);
-
- if (server.eventExecutorGroup != null)
- pipeline.addLast(server.eventExecutorGroup, "executor", dispatcher);
- else
- pipeline.addLast("executor", dispatcher);
}
}
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java
index 4759c2a..7916deb 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -115,10 +115,20 @@ public class SimpleClient implements Closeable
public void connect(boolean useCompression) throws IOException
{
+ connect(useCompression, false);
+ }
+
+ public void connect(boolean useCompression, boolean throwOnOverload) throws IOException
+ {
establishConnection();
Map<String, String> options = new HashMap<>();
options.put(StartupMessage.CQL_VERSION, "3.0.0");
+
+ if (throwOnOverload)
+ options.put(StartupMessage.THROW_ON_OVERLOAD, "1");
+ connection.setThrowOnOverload(throwOnOverload);
+
if (useCompression)
{
options.put(StartupMessage.COMPRESSION, "snappy");
diff --git a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
index 774be6a..92278fa 100644
--- a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
@@ -36,6 +36,7 @@ public class StartupMessage extends Message.Request
public static final String CQL_VERSION = "CQL_VERSION";
public static final String COMPRESSION = "COMPRESSION";
public static final String NO_COMPACT = "NO_COMPACT";
+ public static final String THROW_ON_OVERLOAD = "THROW_ON_OVERLOAD";
public static final Message.Codec<StartupMessage> codec = new Message.Codec<StartupMessage>()
{
@@ -101,6 +102,8 @@ public class StartupMessage extends Message.Request
if (options.containsKey(NO_COMPACT) && Boolean.parseBoolean(options.get(NO_COMPACT)))
state.getClientState().setNoCompactMode();
+ connection.setThrowOnOverload("1".equals(options.get(THROW_ON_OVERLOAD)));
+
if (DatabaseDescriptor.getAuthenticator().requireAuthentication())
return new AuthenticateMessage(DatabaseDescriptor.getAuthenticator().getClass().getName());
else
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 3c0cefc..999404e 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -44,6 +44,7 @@ import com.datastax.driver.core.ResultSet;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.metrics.ClientMetrics;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.cql3.functions.FunctionName;
@@ -334,6 +335,7 @@ public abstract class CQLTester
SchemaLoader.startGossiper();
server = new Server.Builder().withHost(nativeAddr).withPort(nativePort).build();
+ ClientMetrics.instance.init(Collections.singleton(server));
server.start();
for (int version : PROTOCOL_VERSIONS)
diff --git a/test/unit/org/apache/cassandra/service/NativeTransportServiceTest.java b/test/unit/org/apache/cassandra/service/NativeTransportServiceTest.java
index d0e291a..8f2689a 100644
--- a/test/unit/org/apache/cassandra/service/NativeTransportServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/NativeTransportServiceTest.java
@@ -79,8 +79,7 @@ public class NativeTransportServiceTest
{
withService((NativeTransportService service) -> {
Supplier<Boolean> allTerminated = () ->
- service.getWorkerGroup().isShutdown() && service.getWorkerGroup().isTerminated() &&
- service.getEventExecutor().isShutdown() && service.getEventExecutor().isTerminated();
+ service.getWorkerGroup().isShutdown() && service.getWorkerGroup().isTerminated();
assertFalse(allTerminated.get());
service.destroy();
assertTrue(allTerminated.get());
diff --git a/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java b/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java
new file mode 100644
index 0000000..e4d335b
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.cassandra.OrderedJUnit4ClassRunner;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.exceptions.OverloadedException;
+import org.apache.cassandra.transport.messages.QueryMessage;
+
+@RunWith(OrderedJUnit4ClassRunner.class)
+public class InflightRequestPayloadTrackerTest extends CQLTester
+{
+ @BeforeClass
+ public static void setUp()
+ {
+ DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytesPerIp(600);
+ DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytes(600);
+ requireNetwork();
+ }
+
+ @AfterClass
+ public static void tearDown()
+ {
+ DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytesPerIp(3000000000L);
+ DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytes(5000000000L);
+ }
+
+ @After
+ public void dropCreatedTable()
+ {
+ try
+ {
+ QueryProcessor.executeOnceInternal("DROP TABLE " + KEYSPACE + ".atable");
+ }
+ catch (Throwable t)
+ {
+ // ignore
+ }
+ }
+
+ @Test
+ public void testQueryExecutionWithThrowOnOverload() throws Throwable
+ {
+ SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+ nativePort,
+ Server.CURRENT_VERSION,
+ new EncryptionOptions.ClientEncryptionOptions());
+
+ try
+ {
+ client.connect(false, true);
+ QueryOptions queryOptions = QueryOptions.create(
+ QueryOptions.DEFAULT.getConsistency(),
+ QueryOptions.DEFAULT.getValues(),
+ QueryOptions.DEFAULT.skipMetadata(),
+ QueryOptions.DEFAULT.getPageSize(),
+ QueryOptions.DEFAULT.getPagingState(),
+ QueryOptions.DEFAULT.getSerialConsistency(),
+ Server.CURRENT_VERSION);
+
+ QueryMessage queryMessage = new QueryMessage(String.format("CREATE TABLE %s.atable (pk1 int PRIMARY KEY, v text)", KEYSPACE),
+ queryOptions);
+ client.execute(queryMessage);
+ }
+ finally
+ {
+ client.close();
+ }
+ }
+
+ @Test
+ public void testQueryExecutionWithoutThrowOnOverload() throws Throwable
+ {
+ SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+ nativePort,
+ Server.CURRENT_VERSION,
+ new EncryptionOptions.ClientEncryptionOptions());
+
+ try
+ {
+ client.connect(false, false);
+ QueryOptions queryOptions = QueryOptions.create(
+ QueryOptions.DEFAULT.getConsistency(),
+ QueryOptions.DEFAULT.getValues(),
+ QueryOptions.DEFAULT.skipMetadata(),
+ QueryOptions.DEFAULT.getPageSize(),
+ QueryOptions.DEFAULT.getPagingState(),
+ QueryOptions.DEFAULT.getSerialConsistency(),
+ Server.CURRENT_VERSION);
+
+ QueryMessage queryMessage = new QueryMessage(String.format("CREATE TABLE %s.atable (pk int PRIMARY KEY, v text)", KEYSPACE),
+ queryOptions);
+ client.execute(queryMessage);
+ queryMessage = new QueryMessage(String.format("SELECT * FROM %s.atable", KEYSPACE),
+ queryOptions);
+ client.execute(queryMessage);
+ }
+ finally
+ {
+ client.close();
+ }
+ }
+
+ @Test
+ public void testQueryExecutionWithoutThrowOnOverloadAndInflightLimitedExceeded() throws Throwable
+ {
+ SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+ nativePort,
+ Server.CURRENT_VERSION,
+ new EncryptionOptions.ClientEncryptionOptions());
+
+ try
+ {
+ client.connect(false, false);
+ QueryOptions queryOptions = QueryOptions.create(
+ QueryOptions.DEFAULT.getConsistency(),
+ QueryOptions.DEFAULT.getValues(),
+ QueryOptions.DEFAULT.skipMetadata(),
+ QueryOptions.DEFAULT.getPageSize(),
+ QueryOptions.DEFAULT.getPagingState(),
+ QueryOptions.DEFAULT.getSerialConsistency(),
+ Server.CURRENT_VERSION);
+
+ QueryMessage queryMessage = new QueryMessage(String.format("CREATE TABLE %s.atable (pk int PRIMARY KEY, v text)", KEYSPACE),
+ queryOptions);
+ client.execute(queryMessage);
+
+ queryMessage = new QueryMessage(String.format("INSERT INTO %s.atable (pk, v) VALUES (1, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa [...]
+ queryOptions);
+ client.execute(queryMessage);
+ }
+ finally
+ {
+ client.close();
+ }
+ }
+
+ @Test
+ public void testOverloadedExceptionForEndpointInflightLimit() throws Throwable
+ {
+ SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+ nativePort,
+ Server.CURRENT_VERSION,
+ new EncryptionOptions.ClientEncryptionOptions());
+
+ try
+ {
+ client.connect(false, true);
+ QueryOptions queryOptions = QueryOptions.create(
+ QueryOptions.DEFAULT.getConsistency(),
+ QueryOptions.DEFAULT.getValues(),
+ QueryOptions.DEFAULT.skipMetadata(),
+ QueryOptions.DEFAULT.getPageSize(),
+ QueryOptions.DEFAULT.getPagingState(),
+ QueryOptions.DEFAULT.getSerialConsistency(),
+ Server.CURRENT_VERSION);
+
+ QueryMessage queryMessage = new QueryMessage(String.format("CREATE TABLE %s.atable (pk int PRIMARY KEY, v text)", KEYSPACE),
+ queryOptions);
+ client.execute(queryMessage);
+
+ queryMessage = new QueryMessage(String.format("INSERT INTO %s.atable (pk, v) VALUES (1, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa [...]
+ queryOptions);
+ try
+ {
+ client.execute(queryMessage);
+ Assert.fail();
+ }
+ catch (RuntimeException e)
+ {
+ Assert.assertTrue(e.getCause() instanceof OverloadedException);
+ }
+ }
+ finally
+ {
+ client.close();
+ }
+ }
+
+ @Test
+ public void testOverloadedExceptionForOverallInflightLimit() throws Throwable
+ {
+ SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+ nativePort,
+ Server.CURRENT_VERSION,
+ new EncryptionOptions.ClientEncryptionOptions());
+
+ try
+ {
+ client.connect(false, true);
+ QueryOptions queryOptions = QueryOptions.create(
+ QueryOptions.DEFAULT.getConsistency(),
+ QueryOptions.DEFAULT.getValues(),
+ QueryOptions.DEFAULT.skipMetadata(),
+ QueryOptions.DEFAULT.getPageSize(),
+ QueryOptions.DEFAULT.getPagingState(),
+ QueryOptions.DEFAULT.getSerialConsistency(),
+ Server.CURRENT_VERSION);
+
+ QueryMessage queryMessage = new QueryMessage(String.format("CREATE TABLE %s.atable (pk int PRIMARY KEY, v text)", KEYSPACE),
+ queryOptions);
+ client.execute(queryMessage);
+
+ queryMessage = new QueryMessage(String.format("INSERT INTO %s.atable (pk, v) VALUES (1, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa [...]
+ queryOptions);
+ try
+ {
+ client.execute(queryMessage);
+ Assert.fail();
+ }
+ catch (RuntimeException e)
+ {
+ Assert.assertTrue(e.getCause() instanceof OverloadedException);
+ }
+ }
+ finally
+ {
+ client.close();
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org