You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2019/07/15 14:14:06 UTC

[cassandra] branch trunk updated (149caf0 -> 8a04204)

This is an automated email from the ASF dual-hosted git repository.

benedict pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 149caf0  CassandraNetworkAuthorizer gets login privilege from RolesCache
     add 5a03898  Prevent client requests from blocking on executor task queue
     add f5fe483  Merge branch 'cassandra-3.0' into cassandra-3.11
     new 8a04204  Merge branch 'cassandra-3.11' into trunk

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   1 +
 doc/native_protocol_v4.spec                        |   4 +
 src/java/org/apache/cassandra/config/Config.java   |   2 +
 .../cassandra/config/DatabaseDescriptor.java       |  30 +++
 .../apache/cassandra/metrics/ClientMetrics.java    |  14 ++
 .../org/apache/cassandra/net/ResourceLimits.java   |  70 ++++--
 .../cassandra/service/NativeTransportService.java  |  17 +-
 .../org/apache/cassandra/transport/Connection.java |  11 +
 src/java/org/apache/cassandra/transport/Frame.java |   8 +-
 .../org/apache/cassandra/transport/Message.java    | 146 +++++++++++-
 .../transport/RequestThreadPoolExecutor.java       |  96 --------
 .../org/apache/cassandra/transport/Server.java     |  64 +++--
 .../apache/cassandra/transport/SimpleClient.java   |   8 +
 .../transport/messages/StartupMessage.java         |   3 +
 test/unit/org/apache/cassandra/cql3/CQLTester.java |   9 +-
 .../service/NativeTransportServiceTest.java        |   3 +-
 .../InflightRequestPayloadTrackerTest.java         | 258 +++++++++++++++++++++
 17 files changed, 592 insertions(+), 152 deletions(-)
 delete mode 100644 src/java/org/apache/cassandra/transport/RequestThreadPoolExecutor.java
 create mode 100644 test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.11' into trunk

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 8a04204c10eafc2c1051a336d82211899889e276
Merge: 149caf0 f5fe483
Author: Benedict Elliott Smith <be...@apache.org>
AuthorDate: Mon Jul 15 15:13:53 2019 +0100

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |   1 +
 doc/native_protocol_v4.spec                        |   4 +
 src/java/org/apache/cassandra/config/Config.java   |   2 +
 .../cassandra/config/DatabaseDescriptor.java       |  30 +++
 .../apache/cassandra/metrics/ClientMetrics.java    |  14 ++
 .../org/apache/cassandra/net/ResourceLimits.java   |  70 ++++--
 .../cassandra/service/NativeTransportService.java  |  17 +-
 .../org/apache/cassandra/transport/Connection.java |  11 +
 src/java/org/apache/cassandra/transport/Frame.java |   8 +-
 .../org/apache/cassandra/transport/Message.java    | 146 +++++++++++-
 .../transport/RequestThreadPoolExecutor.java       |  96 --------
 .../org/apache/cassandra/transport/Server.java     |  64 +++--
 .../apache/cassandra/transport/SimpleClient.java   |   8 +
 .../transport/messages/StartupMessage.java         |   3 +
 test/unit/org/apache/cassandra/cql3/CQLTester.java |   9 +-
 .../service/NativeTransportServiceTest.java        |   3 +-
 .../InflightRequestPayloadTrackerTest.java         | 258 +++++++++++++++++++++
 17 files changed, 592 insertions(+), 152 deletions(-)

diff --cc src/java/org/apache/cassandra/config/Config.java
index 6b487fe,1d79e2a..34a5ce8
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -182,11 -160,13 +182,13 @@@ public class Confi
      public int native_transport_max_frame_size_in_mb = 256;
      public volatile long native_transport_max_concurrent_connections = -1L;
      public volatile long native_transport_max_concurrent_connections_per_ip = -1L;
 -    public boolean native_transport_flush_in_batches_legacy = true;
 +    public boolean native_transport_flush_in_batches_legacy = false;
 +    public volatile boolean native_transport_allow_older_protocols = true;
 +    public int native_transport_frame_block_size_in_kb = 32;
+     public volatile long native_transport_max_concurrent_requests_in_bytes_per_ip = -1L;
+     public volatile long native_transport_max_concurrent_requests_in_bytes = -1L;
  
  
 -    @Deprecated
 -    public int thrift_max_message_length_in_mb = 16;
      /**
       * Max size of values in SSTables, in MegaBytes.
       * Default is the same as the native protocol frame limit: 256Mb.
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index bb92716,75296b6..0166c5f
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -2010,31 -1849,36 +2020,51 @@@ public class DatabaseDescripto
          return conf.native_transport_flush_in_batches_legacy;
      }
  
 -    public static double getCommitLogSyncBatchWindow()
 +    public static boolean getNativeTransportAllowOlderProtocols()
      {
 -        return conf.commitlog_sync_batch_window_in_ms;
 +        return conf.native_transport_allow_older_protocols;
      }
  
 -    public static void setCommitLogSyncBatchWindow(double windowMillis)
 +    public static void setNativeTransportAllowOlderProtocols(boolean isEnabled)
      {
 -        conf.commitlog_sync_batch_window_in_ms = windowMillis;
 +        conf.native_transport_allow_older_protocols = isEnabled;
 +    }
 +
 +    public static int getNativeTransportFrameBlockSize()
 +    {
 +        return conf.native_transport_frame_block_size_in_kb * 1024;
 +    }
 +
 +    public static double getCommitLogSyncGroupWindow()
 +    {
 +        return conf.commitlog_sync_group_window_in_ms;
 +    }
 +
 +    public static void setCommitLogSyncGroupWindow(double windowMillis)
 +    {
 +        conf.commitlog_sync_group_window_in_ms = windowMillis;
      }
  
+     public static long getNativeTransportMaxConcurrentRequestsInBytesPerIp()
+     {
+         return conf.native_transport_max_concurrent_requests_in_bytes_per_ip;
+     }
+ 
+     public static void setNativeTransportMaxConcurrentRequestsInBytesPerIp(long maxConcurrentRequestsInBytes)
+     {
+         conf.native_transport_max_concurrent_requests_in_bytes_per_ip = maxConcurrentRequestsInBytes;
+     }
+ 
+     public static long getNativeTransportMaxConcurrentRequestsInBytes()
+     {
+         return conf.native_transport_max_concurrent_requests_in_bytes;
+     }
+ 
+     public static void setNativeTransportMaxConcurrentRequestsInBytes(long maxConcurrentRequestsInBytes)
+     {
+         conf.native_transport_max_concurrent_requests_in_bytes = maxConcurrentRequestsInBytes;
+     }
+ 
      public static int getCommitLogSyncPeriod()
      {
          return conf.commitlog_sync_period_in_ms;
diff --cc src/java/org/apache/cassandra/metrics/ClientMetrics.java
index a80033a,67aa05b..7599096
--- a/src/java/org/apache/cassandra/metrics/ClientMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ClientMetrics.java
@@@ -18,7 -18,10 +18,8 @@@
   */
  package org.apache.cassandra.metrics;
  
 -import java.util.Collection;
 -import java.util.Collections;
 -import java.util.concurrent.Callable;
 +import java.util.*;
+ import java.util.concurrent.atomic.AtomicInteger;
  
  import com.codahale.metrics.Gauge;
  import com.codahale.metrics.Meter;
@@@ -28,42 -29,28 +29,51 @@@ import org.apache.cassandra.transport.S
  
  import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
  
 -
 -public class ClientMetrics
 +public final class ClientMetrics
  {
 -    private static final MetricNameFactory factory = new DefaultNameFactory("Client");
      public static final ClientMetrics instance = new ClientMetrics();
  
 -    private volatile boolean initialized = false;
 +    private static final MetricNameFactory factory = new DefaultNameFactory("Client");
  
 +    private volatile boolean initialized = false;
      private Collection<Server> servers = Collections.emptyList();
  
 +    private Meter authSuccess;
 +    private Meter authFailure;
 +
+     private AtomicInteger pausedConnections;
+     private Gauge<Integer> pausedConnectionsGauge;
+     private Meter requestDiscarded;
+ 
      private ClientMetrics()
      {
      }
  
 +    public void markAuthSuccess()
 +    {
 +        authSuccess.mark();
 +    }
 +
 +    public void markAuthFailure()
 +    {
 +        authFailure.mark();
 +    }
 +
+     public void pauseConnection() { pausedConnections.incrementAndGet(); }
+     public void unpauseConnection() { pausedConnections.decrementAndGet(); }
++
+     public void markRequestDiscarded() { requestDiscarded.mark(); }
+ 
 +    public List<ConnectedClient> allConnectedClients()
 +    {
 +        List<ConnectedClient> clients = new ArrayList<>();
 +
 +        for (Server server : servers)
 +            clients.addAll(server.getConnectedClients());
 +
 +        return clients;
 +    }
 +
      public synchronized void init(Collection<Server> servers)
      {
          if (initialized)
@@@ -71,14 -58,12 +81,18 @@@
  
          this.servers = servers;
  
 -        registerGauge("connectedNativeClients", this::countConnectedClients);
 +        registerGauge("connectedNativeClients",       this::countConnectedClients);
 +        registerGauge("connectedNativeClientsByUser", this::countConnectedClientsByUser);
 +        registerGauge("connections",                  this::connectedClients);
 +        registerGauge("clientsByProtocolVersion",     this::recentClientStats);
 +
 +        authSuccess = registerMeter("AuthSuccess");
 +        authFailure = registerMeter("AuthFailure");
  
+         pausedConnections = new AtomicInteger();
+         pausedConnectionsGauge = registerGauge("PausedConnections", pausedConnections::get);
+         requestDiscarded = registerMeter("RequestDiscarded");
+ 
          initialized = true;
      }
  
diff --cc src/java/org/apache/cassandra/service/NativeTransportService.java
index 79caafc,6343f0d..66b5000
--- a/src/java/org/apache/cassandra/service/NativeTransportService.java
+++ b/src/java/org/apache/cassandra/service/NativeTransportService.java
@@@ -32,12 -31,12 +32,11 @@@ import io.netty.channel.EventLoopGroup
  import io.netty.channel.epoll.Epoll;
  import io.netty.channel.epoll.EpollEventLoopGroup;
  import io.netty.channel.nio.NioEventLoopGroup;
--import io.netty.util.concurrent.EventExecutor;
  import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.metrics.AuthMetrics;
  import org.apache.cassandra.metrics.ClientMetrics;
- import org.apache.cassandra.transport.RequestThreadPoolExecutor;
+ import org.apache.cassandra.transport.Message;
  import org.apache.cassandra.transport.Server;
 +import org.apache.cassandra.utils.NativeLibrary;
  
  /**
   * Handles native transport server lifecycle and associated resources. Lazily initialized.
diff --cc src/java/org/apache/cassandra/transport/Connection.java
index 908e7e9,7e17f46..b7f5b17
--- a/src/java/org/apache/cassandra/transport/Connection.java
+++ b/src/java/org/apache/cassandra/transport/Connection.java
@@@ -29,7 -28,8 +29,8 @@@ public class Connectio
      private final ProtocolVersion version;
      private final Tracker tracker;
  
 -    private volatile FrameCompressor frameCompressor;
 +    private volatile FrameBodyTransformer transformer;
+     private boolean throwOnOverload;
  
      public Connection(Channel channel, ProtocolVersion version, Tracker tracker)
      {
@@@ -40,16 -40,26 +41,26 @@@
          tracker.addConnection(channel, this);
      }
  
 -    public void setCompressor(FrameCompressor compressor)
 +    public void setTransformer(FrameBodyTransformer transformer)
      {
 -        this.frameCompressor = compressor;
 +        this.transformer = transformer;
      }
  
 -    public FrameCompressor getCompressor()
 +    public FrameBodyTransformer getTransformer()
      {
 -        return frameCompressor;
 +        return transformer;
      }
  
+     public void setThrowOnOverload(boolean throwOnOverload)
+     {
+         this.throwOnOverload = throwOnOverload;
+     }
+ 
+     public boolean isThrowOnOverload()
+     {
+         return throwOnOverload;
+     }
+ 
      public Tracker getTracker()
      {
          return tracker;
diff --cc src/java/org/apache/cassandra/transport/Frame.java
index d3c810b,388cbc2..8163d7a
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@@ -227,16 -225,6 +229,16 @@@ public class Fram
              idx += bodyLength;
              buffer.readerIndex(idx);
  
-             return new Frame(new Header(version, decodedFlags, streamId, type), body);
++            return new Frame(new Header(version, decodedFlags, streamId, type, bodyLength), body);
 +        }
 +
 +        @Override
 +        protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> results)
 +        throws Exception
 +        {
 +            Frame frame = decodeFrame(buffer);
 +            if (frame == null) return;
 +
              Attribute<Connection> attrConn = ctx.channel().attr(Connection.attributeKey);
              Connection connection = attrConn.get();
              if (connection == null)
diff --cc src/java/org/apache/cassandra/transport/Message.java
index 0571478,271b690..99c0127
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@@ -42,14 -42,18 +42,21 @@@ import com.google.common.collect.Immuta
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
+ import org.apache.cassandra.concurrent.LocalAwareExecutorService;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.exceptions.OverloadedException;
+ import org.apache.cassandra.metrics.ClientMetrics;
+ import org.apache.cassandra.net.ResourceLimits;
  import org.apache.cassandra.service.ClientWarn;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.tracing.Tracing;
  import org.apache.cassandra.transport.messages.*;
  import org.apache.cassandra.service.QueryState;
  import org.apache.cassandra.utils.JVMStabilityInspector;
 +import org.apache.cassandra.utils.UUIDGen;
  
+ import static org.apache.cassandra.concurrent.SharedExecutorPool.SHARED;
+ 
  /**
   * A message from the CQL binary protocol.
   */
diff --cc src/java/org/apache/cassandra/transport/Server.java
index f16aa88,b96c517..c4690f1
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@@ -23,8 -23,11 +23,10 @@@ import java.net.InetSocketAddress
  import java.net.UnknownHostException;
  import java.util.*;
  import java.util.concurrent.ConcurrentHashMap;
 -import java.util.concurrent.atomic.AtomicBoolean;
 -import javax.net.ssl.SSLContext;
 -import javax.net.ssl.SSLEngine;
 +import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.ConcurrentMap;
 +import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicInteger;
  
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -54,9 -51,7 +56,10 @@@ import org.apache.cassandra.auth.Authen
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.config.EncryptionOptions;
  import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.locator.InetAddressAndPort;
+ import org.apache.cassandra.net.ResourceLimits;
 +import org.apache.cassandra.schema.Schema;
 +import org.apache.cassandra.schema.SchemaChangeListener;
  import org.apache.cassandra.security.SSLFactory;
  import org.apache.cassandra.service.*;
  import org.apache.cassandra.transport.messages.EventMessage;
@@@ -104,11 -98,9 +106,9 @@@ public class Server implements Cassandr
              else
                  workerGroup = new NioEventLoopGroup();
          }
-         if (builder.eventExecutorGroup != null)
-             eventExecutorGroup = builder.eventExecutorGroup;
          EventNotifier notifier = new EventNotifier(this);
          StorageService.instance.register(notifier);
 -        MigrationManager.instance.register(notifier);
 +        Schema.instance.registerListener(notifier);
      }
  
      public void stop()
@@@ -321,36 -274,60 +315,78 @@@
              */
              return allChannels.size() != 0 ? allChannels.size() - 1 : 0;
          }
 +
 +        Map<String, Integer> countConnectedClientsByUser()
 +        {
 +            Map<String, Integer> result = new HashMap<>();
 +            for (Channel c : allChannels)
 +            {
 +                Connection connection = c.attr(Connection.attributeKey).get();
 +                if (connection instanceof ServerConnection)
 +                {
 +                    ServerConnection conn = (ServerConnection) connection;
 +                    AuthenticatedUser user = conn.getClientState().getUser();
 +                    String name = (null != user) ? user.getName() : null;
 +                    result.put(name, result.getOrDefault(name, 0) + 1);
 +                }
 +            }
 +            return result;
 +        }
 +
      }
  
+     // global inflight payload across all channels across all endpoints
+     private static final ResourceLimits.Concurrent globalRequestPayloadInFlight = new ResourceLimits.Concurrent(DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytes());
+ 
+     public static class EndpointPayloadTracker
+     {
+         // inflight payload per endpoint across corresponding channels
+         private static final ConcurrentMap<InetAddress, EndpointPayloadTracker> requestPayloadInFlightPerEndpoint = new ConcurrentHashMap<>();
+ 
+         private final AtomicInteger refCount = new AtomicInteger(0);
+         private final InetAddress endpoint;
+ 
+         final ResourceLimits.EndpointAndGlobal endpointAndGlobalPayloadsInFlight = new ResourceLimits.EndpointAndGlobal(new ResourceLimits.Concurrent(DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytesPerIp()),
+                                                                                                                          globalRequestPayloadInFlight);
+ 
+         private EndpointPayloadTracker(InetAddress endpoint)
+         {
+             this.endpoint = endpoint;
+         }
+ 
+         public static EndpointPayloadTracker get(InetAddress endpoint)
+         {
+             while (true)
+             {
+                 EndpointPayloadTracker result = requestPayloadInFlightPerEndpoint.computeIfAbsent(endpoint, EndpointPayloadTracker::new);
+                 if (result.acquire())
+                     return result;
+ 
+                 requestPayloadInFlightPerEndpoint.remove(endpoint, result);
+             }
+         }
+ 
+         private boolean acquire()
+         {
+             return 0 < refCount.updateAndGet(i -> i < 0 ? i : i + 1);
+         }
+ 
+         public void release()
+         {
+             if (-1 == refCount.updateAndGet(i -> i == 1 ? -1 : i - 1))
+                 requestPayloadInFlightPerEndpoint.remove(endpoint, this);
+         }
+     }
+ 
      private static class Initializer extends ChannelInitializer<Channel>
      {
          // Stateless handlers
          private static final Message.ProtocolDecoder messageDecoder = new Message.ProtocolDecoder();
          private static final Message.ProtocolEncoder messageEncoder = new Message.ProtocolEncoder();
 -        private static final Frame.Decompressor frameDecompressor = new Frame.Decompressor();
 -        private static final Frame.Compressor frameCompressor = new Frame.Compressor();
 +        private static final Frame.InboundBodyTransformer inboundFrameTransformer = new Frame.InboundBodyTransformer();
 +        private static final Frame.OutboundBodyTransformer outboundFrameTransformer = new Frame.OutboundBodyTransformer();
          private static final Frame.Encoder frameEncoder = new Frame.Encoder();
          private static final Message.ExceptionHandler exceptionHandler = new Message.ExceptionHandler();
-         private static final Message.Dispatcher dispatcher = new Message.Dispatcher(DatabaseDescriptor.useNativeTransportLegacyFlusher());
          private static final ConnectionLimitHandler connectionLimitHandler = new ConnectionLimitHandler();
  
          private final Server server;
diff --cc src/java/org/apache/cassandra/transport/SimpleClient.java
index deba207,c03becd..6340b69
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@@ -119,31 -118,31 +119,39 @@@ public class SimpleClient implements Cl
  
      public SimpleClient(String host, int port)
      {
 -        this(host, port, new ClientEncryptionOptions());
 +        this(host, port, new EncryptionOptions());
      }
  
 -    public void connect(boolean useCompression) throws IOException
 +    public SimpleClient connect(boolean useCompression, boolean useChecksums) throws IOException
      {
 -        connect(useCompression, false);
++        return connect(useCompression, useChecksums, false);
+     }
+ 
 -    public void connect(boolean useCompression, boolean throwOnOverload) throws IOException
++    public SimpleClient connect(boolean useCompression, boolean useChecksums, boolean throwOnOverload) throws IOException
+     {
          establishConnection();
  
          Map<String, String> options = new HashMap<>();
          options.put(StartupMessage.CQL_VERSION, "3.0.0");
 -
+         if (throwOnOverload)
+             options.put(StartupMessage.THROW_ON_OVERLOAD, "1");
+         connection.setThrowOnOverload(throwOnOverload);
  
 -        if (useCompression)
 +        if (useChecksums)
 +        {
 +            Compressor compressor = useCompression ? LZ4Compressor.INSTANCE : null;
 +            connection.setTransformer(ChecksummingTransformer.getTransformer(ChecksumType.CRC32, compressor));
 +            options.put(StartupMessage.CHECKSUM, "crc32");
 +            options.put(StartupMessage.COMPRESSION, "lz4");
 +        }
 +        else if (useCompression)
          {
 -            options.put(StartupMessage.COMPRESSION, "snappy");
 -            connection.setCompressor(FrameCompressor.SnappyCompressor.instance);
 +            connection.setTransformer(CompressingTransformer.getTransformer(LZ4Compressor.INSTANCE));
 +            options.put(StartupMessage.COMPRESSION, "lz4");
          }
 +
          execute(new StartupMessage(options));
 +        return this;
      }
  
      public void setEventHandler(EventHandler eventHandler)
diff --cc src/java/org/apache/cassandra/transport/messages/StartupMessage.java
index ef846c1,8b4b0a4..ee2b34e
--- a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
@@@ -43,9 -36,8 +43,10 @@@ public class StartupMessage extends Mes
      public static final String CQL_VERSION = "CQL_VERSION";
      public static final String COMPRESSION = "COMPRESSION";
      public static final String PROTOCOL_VERSIONS = "PROTOCOL_VERSIONS";
 -    public static final String NO_COMPACT = "NO_COMPACT";
 +    public static final String DRIVER_NAME = "DRIVER_NAME";
 +    public static final String DRIVER_VERSION = "DRIVER_VERSION";
 +    public static final String CHECKSUM = "CONTENT_CHECKSUM";
+     public static final String THROW_ON_OVERLOAD = "THROW_ON_OVERLOAD";
  
      public static final Message.Codec<StartupMessage> codec = new Message.Codec<StartupMessage>()
      {
@@@ -90,28 -81,30 +91,30 @@@
              throw new ProtocolException(e.getMessage());
          }
  
 -        if (options.containsKey(COMPRESSION))
 +        ChecksumType checksumType = getChecksumType();
 +        Compressor compressor = getCompressor();
 +
 +        if (null != checksumType)
          {
 -            String compression = options.get(COMPRESSION).toLowerCase();
 -            if (compression.equals("snappy"))
 -            {
 -                if (FrameCompressor.SnappyCompressor.instance == null)
 -                    throw new ProtocolException("This instance does not support Snappy compression");
 -                connection.setCompressor(FrameCompressor.SnappyCompressor.instance);
 -            }
 -            else if (compression.equals("lz4"))
 -            {
 -                connection.setCompressor(FrameCompressor.LZ4Compressor.instance);
 -            }
 -            else
 -            {
 -                throw new ProtocolException(String.format("Unknown compression algorithm: %s", compression));
 -            }
 +            if (!connection.getVersion().supportsChecksums())
 +                throw new ProtocolException(String.format("Invalid message flag. Protocol version %s does not support frame body checksums", connection.getVersion().toString()));
 +            connection.setTransformer(ChecksummingTransformer.getTransformer(checksumType, compressor));
 +        }
 +        else if (null != compressor)
 +        {
 +            connection.setTransformer(CompressingTransformer.getTransformer(compressor));
          }
 -
 -        if (options.containsKey(NO_COMPACT) && Boolean.parseBoolean(options.get(NO_COMPACT)))
 -            state.getClientState().setNoCompactMode();
  
+         connection.setThrowOnOverload("1".equals(options.get(THROW_ON_OVERLOAD)));
+ 
 +        ClientState clientState = state.getClientState();
 +        String driverName = options.get(DRIVER_NAME);
 +        if (null != driverName)
 +        {
 +            clientState.setDriverName(driverName);
 +            clientState.setDriverVersion(options.get(DRIVER_VERSION));
 +        }
 +
          if (DatabaseDescriptor.getAuthenticator().requireAuthentication())
              return new AuthenticateMessage(DatabaseDescriptor.getAuthenticator().getClass().getName());
          else
diff --cc test/unit/org/apache/cassandra/cql3/CQLTester.java
index 1eb56bc,d438630..9c4f22e
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@@ -47,14 -43,13 +47,15 @@@ import com.datastax.driver.core.ResultS
  
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.concurrent.ScheduledExecutors;
 -import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.index.SecondaryIndexManager;
 +import org.apache.cassandra.config.EncryptionOptions;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.locator.Replica;
 +import org.apache.cassandra.locator.TokenMetadata;
+ import org.apache.cassandra.metrics.ClientMetrics;
 +import org.apache.cassandra.schema.*;
  import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.config.Schema;
 -import org.apache.cassandra.config.SchemaConstants;
  import org.apache.cassandra.cql3.functions.FunctionName;
 -import org.apache.cassandra.cql3.statements.ParsedStatement;
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.commitlog.CommitLog;
  import org.apache.cassandra.db.marshal.*;
@@@ -891,11 -761,6 +893,16 @@@ public abstract class CQLTeste
          return sessions.get(protocolVersion);
      }
  
++    protected SimpleClient newSimpleClient(ProtocolVersion version, boolean compression, boolean checksums, boolean isOverloadedException) throws IOException
++    {
++        return new SimpleClient(nativeAddr.getHostAddress(), nativePort, version, version.isBeta(), new EncryptionOptions()).connect(compression, checksums, isOverloadedException);
++    }
++
 +    protected SimpleClient newSimpleClient(ProtocolVersion version, boolean compression, boolean checksums) throws IOException
 +    {
-         return new SimpleClient(nativeAddr.getHostAddress(), nativePort, version, version.isBeta(), new EncryptionOptions()).connect(compression, checksums);
++        return newSimpleClient(version, compression, checksums, false);
 +    }
 +
      protected String formatQuery(String query)
      {
          return formatQuery(KEYSPACE, query);
diff --cc test/unit/org/apache/cassandra/service/NativeTransportServiceTest.java
index 80b3596,25fac21..86b73ab
--- a/test/unit/org/apache/cassandra/service/NativeTransportServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/NativeTransportServiceTest.java
@@@ -84,12 -84,11 +84,11 @@@ public class NativeTransportServiceTes
      public void testDestroy()
      {
          withService((NativeTransportService service) -> {
 -            Supplier<Boolean> allTerminated = () ->
 -                                              service.getWorkerGroup().isShutdown() && service.getWorkerGroup().isTerminated();
 -            assertFalse(allTerminated.get());
 +            BooleanSupplier allTerminated = () ->
-                                               service.getWorkerGroup().isShutdown() && service.getWorkerGroup().isTerminated() &&
-                                               service.getEventExecutor().isShutdown() && service.getEventExecutor().isTerminated();
++                                            service.getWorkerGroup().isShutdown() && service.getWorkerGroup().isTerminated();
 +            assertFalse(allTerminated.getAsBoolean());
              service.destroy();
 -            assertTrue(allTerminated.get());
 +            assertTrue(allTerminated.getAsBoolean());
          });
      }
  
diff --cc test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java
index 0000000,21dfed8..c9a9a02
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java
+++ b/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java
@@@ -1,0 -1,248 +1,258 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.transport;
+ 
+ import org.junit.After;
+ import org.junit.AfterClass;
+ import org.junit.Assert;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ import org.junit.runner.RunWith;
+ 
+ import org.apache.cassandra.OrderedJUnit4ClassRunner;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.config.EncryptionOptions;
+ import org.apache.cassandra.cql3.CQLTester;
+ import org.apache.cassandra.cql3.QueryOptions;
+ import org.apache.cassandra.cql3.QueryProcessor;
+ import org.apache.cassandra.exceptions.OverloadedException;
+ import org.apache.cassandra.transport.messages.QueryMessage;
+ 
+ @RunWith(OrderedJUnit4ClassRunner.class)
+ public class InflightRequestPayloadTrackerTest extends CQLTester
+ {
+     @BeforeClass
+     public static void setUp()
+     {
+         DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytesPerIp(600);
+         DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytes(600);
+         requireNetwork();
+     }
+ 
+     @AfterClass
+     public static void tearDown()
+     {
+         DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytesPerIp(3000000000L);
+         DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytes(5000000000L);
+     }
+ 
+     @After
+     public void dropCreatedTable()
+     {
+         try
+         {
+             QueryProcessor.executeOnceInternal("DROP TABLE " + KEYSPACE + ".atable");
+         }
+         catch (Throwable t)
+         {
+             // ignore
+         }
+     }
+ 
+     @Test
+     public void testQueryExecutionWithThrowOnOverload() throws Throwable
+     {
+         SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+                                                nativePort,
 -                                               ProtocolVersion.V4,
 -                                               new EncryptionOptions.ClientEncryptionOptions());
++                                               ProtocolVersion.V5,
++                                               true,
++                                               new EncryptionOptions());
+ 
+         try
+         {
 -            client.connect(false, true);
++            client.connect(false, false, true);
+             QueryOptions queryOptions = QueryOptions.create(
+             QueryOptions.DEFAULT.getConsistency(),
+             QueryOptions.DEFAULT.getValues(),
+             QueryOptions.DEFAULT.skipMetadata(),
+             QueryOptions.DEFAULT.getPageSize(),
+             QueryOptions.DEFAULT.getPagingState(),
+             QueryOptions.DEFAULT.getSerialConsistency(),
 -            ProtocolVersion.V4);
++            ProtocolVersion.V5,
++            KEYSPACE);
+ 
 -            QueryMessage queryMessage = new QueryMessage(String.format("CREATE TABLE %s.atable (pk1 int PRIMARY KEY, v text)", KEYSPACE),
++            QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable (pk1 int PRIMARY KEY, v text)",
+                                                          queryOptions);
+             client.execute(queryMessage);
+         }
+         finally
+         {
+             client.close();
+         }
+     }
+ 
+     @Test
+     public void testQueryExecutionWithoutThrowOnOverload() throws Throwable
+     {
+         SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+                                                nativePort,
 -                                               ProtocolVersion.V4,
 -                                               new EncryptionOptions.ClientEncryptionOptions());
++                                               ProtocolVersion.V5,
++                                               true,
++                                               new EncryptionOptions());
+ 
+         try
+         {
 -            client.connect(false, false);
++            client.connect(false, false, false);
+             QueryOptions queryOptions = QueryOptions.create(
+             QueryOptions.DEFAULT.getConsistency(),
+             QueryOptions.DEFAULT.getValues(),
+             QueryOptions.DEFAULT.skipMetadata(),
+             QueryOptions.DEFAULT.getPageSize(),
+             QueryOptions.DEFAULT.getPagingState(),
+             QueryOptions.DEFAULT.getSerialConsistency(),
 -            ProtocolVersion.V4);
++            ProtocolVersion.V5,
++            KEYSPACE);
+ 
 -            QueryMessage queryMessage = new QueryMessage(String.format("CREATE TABLE %s.atable (pk int PRIMARY KEY, v text)", KEYSPACE),
++            QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable (pk int PRIMARY KEY, v text)",
+                                                          queryOptions);
+             client.execute(queryMessage);
 -            queryMessage = new QueryMessage(String.format("SELECT * FROM %s.atable", KEYSPACE),
++            queryMessage = new QueryMessage("SELECT * FROM atable",
+                                             queryOptions);
+             client.execute(queryMessage);
+         }
+         finally
+         {
+             client.close();
+         }
+     }
+ 
+     @Test
+     public void testQueryExecutionWithoutThrowOnOverloadAndInflightLimitedExceeded() throws Throwable
+     {
+         SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+                                                nativePort,
 -                                               ProtocolVersion.V4,
 -                                               new EncryptionOptions.ClientEncryptionOptions());
++                                               ProtocolVersion.V5,
++                                               true,
++                                               new EncryptionOptions());
+ 
+         try
+         {
 -            client.connect(false, false);
++            client.connect(false, false, false);
+             QueryOptions queryOptions = QueryOptions.create(
+             QueryOptions.DEFAULT.getConsistency(),
+             QueryOptions.DEFAULT.getValues(),
+             QueryOptions.DEFAULT.skipMetadata(),
+             QueryOptions.DEFAULT.getPageSize(),
+             QueryOptions.DEFAULT.getPagingState(),
+             QueryOptions.DEFAULT.getSerialConsistency(),
 -            ProtocolVersion.V4);
++            ProtocolVersion.V5,
++            KEYSPACE);
+ 
 -            QueryMessage queryMessage = new QueryMessage(String.format("CREATE TABLE %s.atable (pk int PRIMARY KEY, v text)", KEYSPACE),
++            QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable (pk int PRIMARY KEY, v text)",
+                                                          queryOptions);
+             client.execute(queryMessage);
+ 
 -            queryMessage = new QueryMessage(String.format("INSERT INTO %s.atable (pk, v) VALUES (1, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa [...]
++            queryMessage = new QueryMessage("INSERT INTO atable (pk, v) VALUES (1, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa [...]
+                                             queryOptions);
+             client.execute(queryMessage);
+         }
+         finally
+         {
+             client.close();
+         }
+     }
+ 
+     @Test
+     public void testOverloadedExceptionForEndpointInflightLimit() throws Throwable
+     {
+         SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+                                                nativePort,
 -                                               ProtocolVersion.V4,
 -                                               new EncryptionOptions.ClientEncryptionOptions());
++                                               ProtocolVersion.V5,
++                                               true,
++                                               new EncryptionOptions());
+ 
+         try
+         {
 -            client.connect(false, true);
++            client.connect(false, false, true);
+             QueryOptions queryOptions = QueryOptions.create(
+             QueryOptions.DEFAULT.getConsistency(),
+             QueryOptions.DEFAULT.getValues(),
+             QueryOptions.DEFAULT.skipMetadata(),
+             QueryOptions.DEFAULT.getPageSize(),
+             QueryOptions.DEFAULT.getPagingState(),
+             QueryOptions.DEFAULT.getSerialConsistency(),
 -            ProtocolVersion.V4);
++            ProtocolVersion.V5,
++            KEYSPACE);
+ 
 -            QueryMessage queryMessage = new QueryMessage(String.format("CREATE TABLE %s.atable (pk int PRIMARY KEY, v text)", KEYSPACE),
++            QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable (pk int PRIMARY KEY, v text)",
+                                                          queryOptions);
+             client.execute(queryMessage);
+ 
 -            queryMessage = new QueryMessage(String.format("INSERT INTO %s.atable (pk, v) VALUES (1, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa [...]
++            queryMessage = new QueryMessage("INSERT INTO atable (pk, v) VALUES (1, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa [...]
+                                             queryOptions);
+             try
+             {
+                 client.execute(queryMessage);
+                 Assert.fail();
+             }
+             catch (RuntimeException e)
+             {
+                 Assert.assertTrue(e.getCause() instanceof OverloadedException);
+             }
+         }
+         finally
+         {
+             client.close();
+         }
+     }
+ 
+     @Test
+     public void testOverloadedExceptionForOverallInflightLimit() throws Throwable
+     {
+         SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+                                                nativePort,
 -                                               ProtocolVersion.V4,
 -                                               new EncryptionOptions.ClientEncryptionOptions());
++                                               ProtocolVersion.V5,
++                                               true,
++                                               new EncryptionOptions());
+ 
+         try
+         {
 -            client.connect(false, true);
++            client.connect(false, false, true);
+             QueryOptions queryOptions = QueryOptions.create(
+             QueryOptions.DEFAULT.getConsistency(),
+             QueryOptions.DEFAULT.getValues(),
+             QueryOptions.DEFAULT.skipMetadata(),
+             QueryOptions.DEFAULT.getPageSize(),
+             QueryOptions.DEFAULT.getPagingState(),
+             QueryOptions.DEFAULT.getSerialConsistency(),
 -            ProtocolVersion.V4);
++            ProtocolVersion.V5,
++            KEYSPACE);
+ 
 -            QueryMessage queryMessage = new QueryMessage(String.format("CREATE TABLE %s.atable (pk int PRIMARY KEY, v text)", KEYSPACE),
++            QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable (pk int PRIMARY KEY, v text)",
+                                                          queryOptions);
+             client.execute(queryMessage);
+ 
 -            queryMessage = new QueryMessage(String.format("INSERT INTO %s.atable (pk, v) VALUES (1, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa [...]
++            queryMessage = new QueryMessage("INSERT INTO atable (pk, v) VALUES (1, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa [...]
+                                             queryOptions);
+             try
+             {
+                 client.execute(queryMessage);
+                 Assert.fail();
+             }
+             catch (RuntimeException e)
+             {
+                 Assert.assertTrue(e.getCause() instanceof OverloadedException);
+             }
+         }
+         finally
+         {
+             client.close();
+         }
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org