You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2015/11/25 15:02:55 UTC

cassandra git commit: For CASSANDRA-7217, have stress calculate maximum # of pending requests based on thread count and allow manually specifying max pending requests per connection as well as # of connections

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 602f736e2 -> a0076e70e


For CASSANDRA-7217, have stress calculate maximum # of pending requests based on thread count and allow manually specifying max pending requests per connection as well as # of connections

patch by Ariel Weisberg; reviewed by tjake for CASSANDRA-7217


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a0076e70
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a0076e70
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a0076e70

Branch: refs/heads/cassandra-3.0
Commit: a0076e70eed5501ac9d8c3ff41ce8018710a1585
Parents: 602f736
Author: Ariel Weisberg <ar...@datastax.com>
Authored: Tue Nov 17 13:22:55 2015 -0500
Committer: T Jake Luciani <ja...@apache.org>
Committed: Wed Nov 25 09:02:02 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/stress/settings/SettingsMode.java | 22 +++++++++++++----
 .../cassandra/stress/util/JavaDriverClient.java | 26 +++++++++++++++++---
 3 files changed, 40 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0076e70/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 116d4c3..513e682 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.1
+ * Improve stress performance over 1k threads (CASSANDRA-7217)
  * Wait for migration responses to complete before bootstrapping (CASSANDRA-10731)
  * Unable to create a function with argument of type Inet (CASSANDRA-10741)
  * Fix backward incompatibiliy in CqlInputFormat (CASSANDRA-10717)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0076e70/tools/stress/src/org/apache/cassandra/stress/settings/SettingsMode.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsMode.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsMode.java
index 2c91b6d..699f10e 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsMode.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsMode.java
@@ -1,6 +1,6 @@
 package org.apache.cassandra.stress.settings;
 /*
- * 
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -8,16 +8,16 @@ package org.apache.cassandra.stress.settings;
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- * 
+ *
  */
 
 
@@ -42,6 +42,9 @@ public class SettingsMode implements Serializable
     public final String authProviderClassname;
     public final AuthProvider authProvider;
 
+    public final Integer maxPendingPerConnection;
+    public final Integer connectionsPerHost;
+
     private final String compression;
 
     public SettingsMode(GroupedOptions options)
@@ -55,6 +58,8 @@ public class SettingsMode implements Serializable
             compression = ProtocolOptions.Compression.valueOf(opts.useCompression.value().toUpperCase()).name();
             username = opts.user.value();
             password = opts.password.value();
+            maxPendingPerConnection = opts.maxPendingPerConnection.value().isEmpty() ? null : Integer.valueOf(opts.maxPendingPerConnection.value());
+            connectionsPerHost = opts.connectionsPerHost.value().isEmpty() ? null : Integer.valueOf(opts.connectionsPerHost.value());
             authProviderClassname = opts.authProvider.value();
             if (authProviderClassname != null)
             {
@@ -94,6 +99,8 @@ public class SettingsMode implements Serializable
             password = null;
             authProvider = null;
             authProviderClassname = null;
+            maxPendingPerConnection = null;
+            connectionsPerHost = null;
         }
         else if (options instanceof ThriftOptions)
         {
@@ -106,6 +113,8 @@ public class SettingsMode implements Serializable
             password = opts.password.value();
             authProviderClassname = null;
             authProvider = null;
+            maxPendingPerConnection = null;
+            connectionsPerHost = null;
         }
         else
             throw new IllegalStateException();
@@ -145,12 +154,15 @@ public class SettingsMode implements Serializable
         final OptionSimple user = new OptionSimple("user=", ".+", null, "username", false);
         final OptionSimple password = new OptionSimple("password=", ".+", null, "password", false);
         final OptionSimple authProvider = new OptionSimple("auth-provider=", ".*", null, "Fully qualified implementation of com.datastax.driver.core.AuthProvider", false);
+        final OptionSimple maxPendingPerConnection = new OptionSimple("maxPending=", "[0-9]+", "", "Maximum pending requests per connection", false);
+        final OptionSimple connectionsPerHost = new OptionSimple("connectionsPerHost=", "[0-9]+", "", "Number of connections per host", false);
 
         abstract OptionSimple mode();
         @Override
         public List<? extends Option> options()
         {
-            return Arrays.asList(mode(), useUnPrepared, api, useCompression, port, user, password, authProvider);
+            return Arrays.asList(mode(), useUnPrepared, api, useCompression, port, user, password, authProvider,
+                                 maxPendingPerConnection, connectionsPerHost);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0076e70/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java b/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
index 0040003..d0779ed 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
@@ -44,6 +44,8 @@ public class JavaDriverClient
     public final String username;
     public final String password;
     public final AuthProvider authProvider;
+    public final int maxPendingPerConnection;
+    public final int connectionsPerHost;
 
     private final EncryptionOptions.ClientEncryptionOptions encryptionOptions;
     private Cluster cluster;
@@ -69,6 +71,19 @@ public class JavaDriverClient
             whitelist = new WhiteListPolicy(new DCAwareRoundRobinPolicy(), settings.node.resolveAll(settings.port.nativePort));
         else
             whitelist = null;
+        connectionsPerHost = settings.mode.connectionsPerHost == null ? 8 : settings.mode.connectionsPerHost;
+
+        int maxThreadCount = 0;
+        if (settings.rate.auto)
+            maxThreadCount = settings.rate.maxThreads;
+        else
+            maxThreadCount = settings.rate.threadCount;
+
+        //Always allow enough pending requests so every thread can have a request pending
+        //See https://issues.apache.org/jira/browse/CASSANDRA-7217
+        int requestsPerConnection = (maxThreadCount / connectionsPerHost) + connectionsPerHost;
+
+        maxPendingPerConnection = settings.mode.maxPendingPerConnection == null ? Math.max(128, requestsPerConnection ) : settings.mode.maxPendingPerConnection;
     }
 
     public PreparedStatement prepare(String query)
@@ -91,8 +106,8 @@ public class JavaDriverClient
     {
 
         PoolingOptions poolingOpts = new PoolingOptions()
-                                     .setConnectionsPerHost(HostDistance.LOCAL, 8, 8)
-                                     .setMaxRequestsPerConnection(HostDistance.LOCAL, 128)
+                                     .setConnectionsPerHost(HostDistance.LOCAL, connectionsPerHost, connectionsPerHost)
+                                     .setMaxRequestsPerConnection(HostDistance.LOCAL, maxPendingPerConnection)
                                      .setNewConnectionThreshold(HostDistance.LOCAL, 100);
 
         Cluster.Builder clusterBuilder = Cluster.builder()
@@ -123,8 +138,11 @@ public class JavaDriverClient
 
         cluster = clusterBuilder.build();
         Metadata metadata = cluster.getMetadata();
-        System.out.printf("Connected to cluster: %s%n",
-                metadata.getClusterName());
+        System.out.printf(
+                "Connected to cluster: %s, max pending requests per connection %d, max connections per host %d%n",
+                metadata.getClusterName(),
+                maxPendingPerConnection,
+                connectionsPerHost);
         for (Host host : metadata.getAllHosts())
         {
             System.out.printf("Datatacenter: %s; Host: %s; Rack: %s%n",