You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2015/11/25 15:02:55 UTC
cassandra git commit: For CASSANDRA-7217,
have stress calculate maximum # of pending requests based on thread
count and allow manually specifying max pending requests per connection as
well as # of connections
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 602f736e2 -> a0076e70e
For CASSANDRA-7217, have stress calculate maximum # of pending requests based on thread count and allow manually specifying max pending requests per connection as well as # of connections
patch by Ariel Weisberg; reviewed by tjake for CASSANDRA-7217
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a0076e70
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a0076e70
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a0076e70
Branch: refs/heads/cassandra-3.0
Commit: a0076e70eed5501ac9d8c3ff41ce8018710a1585
Parents: 602f736
Author: Ariel Weisberg <ar...@datastax.com>
Authored: Tue Nov 17 13:22:55 2015 -0500
Committer: T Jake Luciani <ja...@apache.org>
Committed: Wed Nov 25 09:02:02 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/stress/settings/SettingsMode.java | 22 +++++++++++++----
.../cassandra/stress/util/JavaDriverClient.java | 26 +++++++++++++++++---
3 files changed, 40 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0076e70/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 116d4c3..513e682 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.1
+ * Improve stress performance over 1k threads (CASSANDRA-7217)
* Wait for migration responses to complete before bootstrapping (CASSANDRA-10731)
* Unable to create a function with argument of type Inet (CASSANDRA-10741)
* Fix backward incompatibiliy in CqlInputFormat (CASSANDRA-10717)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0076e70/tools/stress/src/org/apache/cassandra/stress/settings/SettingsMode.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsMode.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsMode.java
index 2c91b6d..699f10e 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsMode.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsMode.java
@@ -1,6 +1,6 @@
package org.apache.cassandra.stress.settings;
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -8,16 +8,16 @@ package org.apache.cassandra.stress.settings;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
@@ -42,6 +42,9 @@ public class SettingsMode implements Serializable
public final String authProviderClassname;
public final AuthProvider authProvider;
+ public final Integer maxPendingPerConnection;
+ public final Integer connectionsPerHost;
+
private final String compression;
public SettingsMode(GroupedOptions options)
@@ -55,6 +58,8 @@ public class SettingsMode implements Serializable
compression = ProtocolOptions.Compression.valueOf(opts.useCompression.value().toUpperCase()).name();
username = opts.user.value();
password = opts.password.value();
+ maxPendingPerConnection = opts.maxPendingPerConnection.value().isEmpty() ? null : Integer.valueOf(opts.maxPendingPerConnection.value());
+ connectionsPerHost = opts.connectionsPerHost.value().isEmpty() ? null : Integer.valueOf(opts.connectionsPerHost.value());
authProviderClassname = opts.authProvider.value();
if (authProviderClassname != null)
{
@@ -94,6 +99,8 @@ public class SettingsMode implements Serializable
password = null;
authProvider = null;
authProviderClassname = null;
+ maxPendingPerConnection = null;
+ connectionsPerHost = null;
}
else if (options instanceof ThriftOptions)
{
@@ -106,6 +113,8 @@ public class SettingsMode implements Serializable
password = opts.password.value();
authProviderClassname = null;
authProvider = null;
+ maxPendingPerConnection = null;
+ connectionsPerHost = null;
}
else
throw new IllegalStateException();
@@ -145,12 +154,15 @@ public class SettingsMode implements Serializable
final OptionSimple user = new OptionSimple("user=", ".+", null, "username", false);
final OptionSimple password = new OptionSimple("password=", ".+", null, "password", false);
final OptionSimple authProvider = new OptionSimple("auth-provider=", ".*", null, "Fully qualified implementation of com.datastax.driver.core.AuthProvider", false);
+ final OptionSimple maxPendingPerConnection = new OptionSimple("maxPending=", "[0-9]+", "", "Maximum pending requests per connection", false);
+ final OptionSimple connectionsPerHost = new OptionSimple("connectionsPerHost=", "[0-9]+", "", "Number of connections per host", false);
abstract OptionSimple mode();
@Override
public List<? extends Option> options()
{
- return Arrays.asList(mode(), useUnPrepared, api, useCompression, port, user, password, authProvider);
+ return Arrays.asList(mode(), useUnPrepared, api, useCompression, port, user, password, authProvider,
+ maxPendingPerConnection, connectionsPerHost);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0076e70/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java b/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
index 0040003..d0779ed 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
@@ -44,6 +44,8 @@ public class JavaDriverClient
public final String username;
public final String password;
public final AuthProvider authProvider;
+ public final int maxPendingPerConnection;
+ public final int connectionsPerHost;
private final EncryptionOptions.ClientEncryptionOptions encryptionOptions;
private Cluster cluster;
@@ -69,6 +71,19 @@ public class JavaDriverClient
whitelist = new WhiteListPolicy(new DCAwareRoundRobinPolicy(), settings.node.resolveAll(settings.port.nativePort));
else
whitelist = null;
+ connectionsPerHost = settings.mode.connectionsPerHost == null ? 8 : settings.mode.connectionsPerHost;
+
+ int maxThreadCount = 0;
+ if (settings.rate.auto)
+ maxThreadCount = settings.rate.maxThreads;
+ else
+ maxThreadCount = settings.rate.threadCount;
+
+ //Always allow enough pending requests so every thread can have a request pending
+ //See https://issues.apache.org/jira/browse/CASSANDRA-7217
+ int requestsPerConnection = (maxThreadCount / connectionsPerHost) + connectionsPerHost;
+
+ maxPendingPerConnection = settings.mode.maxPendingPerConnection == null ? Math.max(128, requestsPerConnection ) : settings.mode.maxPendingPerConnection;
}
public PreparedStatement prepare(String query)
@@ -91,8 +106,8 @@ public class JavaDriverClient
{
PoolingOptions poolingOpts = new PoolingOptions()
- .setConnectionsPerHost(HostDistance.LOCAL, 8, 8)
- .setMaxRequestsPerConnection(HostDistance.LOCAL, 128)
+ .setConnectionsPerHost(HostDistance.LOCAL, connectionsPerHost, connectionsPerHost)
+ .setMaxRequestsPerConnection(HostDistance.LOCAL, maxPendingPerConnection)
.setNewConnectionThreshold(HostDistance.LOCAL, 100);
Cluster.Builder clusterBuilder = Cluster.builder()
@@ -123,8 +138,11 @@ public class JavaDriverClient
cluster = clusterBuilder.build();
Metadata metadata = cluster.getMetadata();
- System.out.printf("Connected to cluster: %s%n",
- metadata.getClusterName());
+ System.out.printf(
+ "Connected to cluster: %s, max pending requests per connection %d, max connections per host %d%n",
+ metadata.getClusterName(),
+ maxPendingPerConnection,
+ connectionsPerHost);
for (Host host : metadata.getAllHosts())
{
System.out.printf("Datatacenter: %s; Host: %s; Rack: %s%n",