You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/09/14 10:13:58 UTC
[2/3] git commit: cassandra-stress supports whitelist mode for node
config
cassandra-stress supports whitelist mode for node config
patch by benedict; reviewed by tjake for CASSANDRA-7658
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/eea042b0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/eea042b0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/eea042b0
Branch: refs/heads/trunk
Commit: eea042b0b0abfb09f60b672c8930a924c5d7f25b
Parents: 0e652e7
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Sun Sep 14 09:13:18 2014 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Sun Sep 14 09:13:18 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/stress/settings/SettingsNode.java | 47 ++++++++++++++-
.../stress/settings/StressSettings.java | 5 +-
.../cassandra/stress/util/JavaDriverClient.java | 16 ++++-
.../stress/util/SmartThriftClient.java | 62 ++++++++++++++------
5 files changed, 106 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eea042b0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4c39f5c..7e18719 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.1
+ * cassandra-stress supports whitelist mode for node config
* GCInspector more closely tracks GC; cassandra-stress and nodetool report it
* nodetool won't output bogus ownership info without a keyspace (CASSANDRA-7173)
* Add human readable option to nodetool commands (CASSANDRA-5433)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eea042b0/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java
index 4fd7d34..30fe908 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java
@@ -22,15 +22,20 @@ package org.apache.cassandra.stress.settings;
import java.io.*;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public class SettingsNode implements Serializable
{
-
public final List<String> nodes;
+ public final boolean isWhiteList;
public SettingsNode(Options options)
{
@@ -63,6 +68,41 @@ public class SettingsNode implements Serializable
}
else
nodes = Arrays.asList(options.list.value().split(","));
+ isWhiteList = options.whitelist.setByUser();
+ }
+
+ public Set<InetAddress> resolveAll()
+ {
+ Set<InetAddress> r = new HashSet<>();
+ for (String node : nodes)
+ {
+ try
+ {
+ r.add(InetAddress.getByName(node));
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ return r;
+ }
+
+ public Set<InetSocketAddress> resolveAll(int port)
+ {
+ Set<InetSocketAddress> r = new HashSet<>();
+ for (String node : nodes)
+ {
+ try
+ {
+ r.add(new InetSocketAddress(InetAddress.getByName(node), port));
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ return r;
}
public String randomNode()
@@ -77,13 +117,14 @@ public class SettingsNode implements Serializable
public static final class Options extends GroupedOptions
{
+ final OptionSimple whitelist = new OptionSimple("whitelist", "", null, "Limit communications to the provided nodes", false);
final OptionSimple file = new OptionSimple("file=", ".*", null, "Node file (one per line)", false);
- final OptionSimple list = new OptionSimple("", "[^=,]+(,[^=,]+)*", "localhost", "comma delimited list of hosts", false);
+ final OptionSimple list = new OptionSimple("", "[^=,]+(,[^=,]+)*", "localhost", "comma delimited list of nodes", false);
@Override
public List<? extends Option> options()
{
- return Arrays.asList(file, list);
+ return Arrays.asList(whitelist, file, list);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eea042b0/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
index bdd10e5..ba72821 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
@@ -25,6 +25,9 @@ import java.io.Serializable;
import java.util.*;
import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+import com.datastax.driver.core.policies.WhiteListPolicy;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.stress.util.JavaDriverClient;
import org.apache.cassandra.stress.util.SimpleThriftClient;
@@ -177,7 +180,7 @@ public class StressSettings implements Serializable
return client;
EncryptionOptions.ClientEncryptionOptions encOptions = transport.getEncryptionOptions();
- JavaDriverClient c = new JavaDriverClient(currentNode, port.nativePort, encOptions);
+ JavaDriverClient c = new JavaDriverClient(this, currentNode, port.nativePort, encOptions);
c.connect(mode.compression());
if (setKeyspace)
c.execute("USE \"" + schema.keyspace + "\";", org.apache.cassandra.db.ConsistencyLevel.ONE);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eea042b0/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java b/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
index c901461..2105179 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
@@ -23,10 +23,13 @@ import java.util.concurrent.ConcurrentMap;
import javax.net.ssl.SSLContext;
import com.datastax.driver.core.*;
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.WhiteListPolicy;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.security.SSLFactory;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.netty.util.internal.logging.Slf4JLoggerFactory;
+import org.apache.cassandra.stress.settings.StressSettings;
public class JavaDriverClient
{
@@ -41,19 +44,24 @@ public class JavaDriverClient
private final EncryptionOptions.ClientEncryptionOptions encryptionOptions;
private Cluster cluster;
private Session session;
+ private final WhiteListPolicy whitelist;
private static final ConcurrentMap<String, PreparedStatement> stmts = new ConcurrentHashMap<>();
- public JavaDriverClient(String host, int port)
+ public JavaDriverClient(StressSettings settings, String host, int port)
{
- this(host, port, new EncryptionOptions.ClientEncryptionOptions());
+ this(settings, host, port, new EncryptionOptions.ClientEncryptionOptions());
}
- public JavaDriverClient(String host, int port, EncryptionOptions.ClientEncryptionOptions encryptionOptions)
+ public JavaDriverClient(StressSettings settings, String host, int port, EncryptionOptions.ClientEncryptionOptions encryptionOptions)
{
this.host = host;
this.port = port;
this.encryptionOptions = encryptionOptions;
+ if (settings.node.isWhiteList)
+ whitelist = new WhiteListPolicy(new DCAwareRoundRobinPolicy(), settings.node.resolveAll(settings.port.nativePort));
+ else
+ whitelist = null;
}
public PreparedStatement prepare(String query)
@@ -78,6 +86,8 @@ public class JavaDriverClient
.addContactPoint(host)
.withPort(port)
.withoutMetrics(); // The driver uses metrics 3 with conflict with our version
+ if (whitelist != null)
+ clusterBuilder.withLoadBalancingPolicy(whitelist);
clusterBuilder.withCompression(compression);
if (encryptionOptions.enabled)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eea042b0/tools/stress/src/org/apache/cassandra/stress/util/SmartThriftClient.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/SmartThriftClient.java b/tools/stress/src/org/apache/cassandra/stress/util/SmartThriftClient.java
index 7ede496..b880283 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/SmartThriftClient.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/SmartThriftClient.java
@@ -21,10 +21,13 @@ package org.apache.cassandra.stress.util;
*/
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import com.datastax.driver.core.Host;
@@ -41,17 +44,29 @@ public class SmartThriftClient implements ThriftClient
final String keyspace;
final Metadata metadata;
final StressSettings settings;
- final ConcurrentHashMap<Host, ConcurrentLinkedQueue<Client>> cache = new ConcurrentHashMap<>();
+ final ConcurrentHashMap<InetAddress, ConcurrentLinkedQueue<Client>> cache = new ConcurrentHashMap<>();
final AtomicInteger queryIdCounter = new AtomicInteger();
final ConcurrentHashMap<Integer, String> queryStrings = new ConcurrentHashMap<>();
final ConcurrentHashMap<String, Integer> queryIds = new ConcurrentHashMap<>();
+ final Set<InetAddress> whiteset;
+ final List<InetAddress> whitelist;
public SmartThriftClient(StressSettings settings, String keyspace, Metadata metadata)
{
this.metadata = metadata;
this.keyspace = keyspace;
this.settings = settings;
+ if (!settings.node.isWhiteList)
+ {
+ whiteset = null;
+ whitelist = null;
+ }
+ else
+ {
+ whiteset = settings.node.resolveAll();
+ whitelist = Arrays.asList(whiteset.toArray(new InetAddress[0]));
+ }
}
private final AtomicInteger roundrobin = new AtomicInteger();
@@ -73,13 +88,13 @@ public class SmartThriftClient implements ThriftClient
final class Client
{
final Cassandra.Client client;
- final Host host;
+ final InetAddress server;
final Map<Integer, Integer> queryMap = new HashMap<>();
- Client(Cassandra.Client client, Host host)
+ Client(Cassandra.Client client, InetAddress server)
{
this.client = client;
- this.host = host;
+ this.server = server;
}
Integer get(Integer id, boolean cql3) throws TException
@@ -111,22 +126,33 @@ public class SmartThriftClient implements ThriftClient
private Client get(ByteBuffer pk)
{
Set<Host> hosts = metadata.getReplicas(metadata.quote(keyspace), pk);
- int pos = roundrobin.incrementAndGet() % hosts.size();
- if (pos < 0)
- pos = -pos;
- Host host = Iterators.get(hosts.iterator(), pos);
- ConcurrentLinkedQueue<Client> q = cache.get(host);
+ InetAddress address = null;
+ if (hosts.size() > 0)
+ {
+ int pos = roundrobin.incrementAndGet() % hosts.size();
+ for (int i = 0 ; address == null && i < hosts.size() ; i++)
+ {
+ if (pos < 0)
+ pos = -pos;
+ Host host = Iterators.get(hosts.iterator(), (pos + i) % hosts.size());
+ if (whiteset == null || whiteset.contains(host.getAddress()))
+ address = host.getAddress();
+ }
+ }
+ if (address == null)
+ address = whitelist.get(ThreadLocalRandom.current().nextInt(whitelist.size()));
+ ConcurrentLinkedQueue<Client> q = cache.get(address);
if (q == null)
{
ConcurrentLinkedQueue<Client> newQ = new ConcurrentLinkedQueue<Client>();
- q = cache.putIfAbsent(host, newQ);
+ q = cache.putIfAbsent(address, newQ);
if (q == null)
q = newQ;
}
Client tclient = q.poll();
if (tclient != null)
return tclient;
- return new Client(settings.getRawThriftClient(host.getAddress().getHostAddress()), host);
+ return new Client(settings.getRawThriftClient(address.getHostAddress()), address);
}
@Override
@@ -140,7 +166,7 @@ public class SmartThriftClient implements ThriftClient
client.client.batch_mutate(Collections.singletonMap(e.getKey(), e.getValue()), consistencyLevel);
} finally
{
- cache.get(client.host).add(client);
+ cache.get(client.server).add(client);
}
}
}
@@ -154,7 +180,7 @@ public class SmartThriftClient implements ThriftClient
return client.client.get_slice(key, parent, predicate, consistencyLevel);
} finally
{
- cache.get(client.host).add(client);
+ cache.get(client.server).add(client);
}
}
@@ -167,7 +193,7 @@ public class SmartThriftClient implements ThriftClient
client.client.insert(key, column_parent, column, consistency_level);
} finally
{
- cache.get(client.host).add(client);
+ cache.get(client.server).add(client);
}
}
@@ -180,7 +206,7 @@ public class SmartThriftClient implements ThriftClient
return client.client.execute_cql_query(ByteBufferUtil.bytes(query), compression);
} finally
{
- cache.get(client.host).add(client);
+ cache.get(client.server).add(client);
}
}
@@ -193,7 +219,7 @@ public class SmartThriftClient implements ThriftClient
return client.client.execute_cql3_query(ByteBufferUtil.bytes(query), compression, consistency);
} finally
{
- cache.get(client.host).add(client);
+ cache.get(client.server).add(client);
}
}
@@ -212,7 +238,7 @@ public class SmartThriftClient implements ThriftClient
return client.client.execute_prepared_cql3_query(client.get(queryId, true), values, consistency);
} finally
{
- cache.get(client.host).add(client);
+ cache.get(client.server).add(client);
}
}
@@ -231,7 +257,7 @@ public class SmartThriftClient implements ThriftClient
return client.client.execute_prepared_cql_query(client.get(queryId, true), values);
} finally
{
- cache.get(client.host).add(client);
+ cache.get(client.server).add(client);
}
}