You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2014/09/07 16:38:46 UTC
[03/15] Improve stress workload realism
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsPopulation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsPopulation.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsPopulation.java
new file mode 100644
index 0000000..da4c282
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsPopulation.java
@@ -0,0 +1,176 @@
+package org.apache.cassandra.stress.settings;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.cassandra.stress.generate.DistributionFactory;
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+
+public class SettingsPopulation implements Serializable
+{
+
+ public final DistributionFactory distribution;
+ public final DistributionFactory readlookback;
+ public final PartitionGenerator.Order order;
+ public final boolean wrap;
+ public final long[] sequence;
+
+ public static enum GenerateOrder
+ {
+ ARBITRARY, SHUFFLED, SORTED
+ }
+
+ private SettingsPopulation(GenerateOptions options, DistributionOptions dist, SequentialOptions pop)
+ {
+ this.order = !options.contents.setByUser() ? PartitionGenerator.Order.ARBITRARY : PartitionGenerator.Order.valueOf(options.contents.value().toUpperCase());
+ if (dist != null)
+ {
+ this.distribution = dist.seed.get();
+ this.sequence = null;
+ this.readlookback = null;
+ this.wrap = false;
+ }
+ else
+ {
+ this.distribution = null;
+ String[] bounds = pop.populate.value().split("\\.\\.+");
+ this.sequence = new long[] { OptionDistribution.parseLong(bounds[0]), OptionDistribution.parseLong(bounds[1]) };
+ this.readlookback = pop.lookback.get();
+ this.wrap = !pop.nowrap.setByUser();
+ }
+ }
+
+ public SettingsPopulation(DistributionOptions options)
+ {
+ this(options, options, null);
+ }
+
+ public SettingsPopulation(SequentialOptions options)
+ {
+ this(options, null, options);
+ }
+
+ // Option Declarations
+
+ private static class GenerateOptions extends GroupedOptions
+ {
+ final OptionSimple contents = new OptionSimple("contents=", "(sorted|shuffled)", null, "SORTED or SHUFFLED (intra-)partition order; if not specified, will be consistent but arbitrary order", false);
+
+ @Override
+ public List<? extends Option> options()
+ {
+ return Arrays.asList(contents);
+ }
+ }
+
+ private static final class DistributionOptions extends GenerateOptions
+ {
+ final OptionDistribution seed;
+
+ public DistributionOptions(String defaultLimit)
+ {
+ seed = new OptionDistribution("dist=", "gaussian(1.." + defaultLimit + ")", "Seeds are selected from this distribution");
+ }
+
+ @Override
+ public List<? extends Option> options()
+ {
+ return ImmutableList.<Option>builder().add(seed).addAll(super.options()).build();
+ }
+ }
+
+ private static final class SequentialOptions extends GenerateOptions
+ {
+ final OptionSimple populate;
+ final OptionDistribution lookback = new OptionDistribution("read-lookback=", "fixed(1)", "Select read seeds from the recently visited write seeds");
+ final OptionSimple nowrap = new OptionSimple("no-wrap", "", null, "Terminate the stress test once all seeds in the range have been visited", false);
+
+ public SequentialOptions(String defaultLimit)
+ {
+ populate = new OptionSimple("seq=", "[0-9]+\\.\\.+[0-9]+[MBK]?",
+ "1.." + defaultLimit,
+ "Generate all seeds in sequence", true);
+ }
+
+ @Override
+ public List<? extends Option> options()
+ {
+ return ImmutableList.<Option>builder().add(populate, nowrap, lookback).addAll(super.options()).build();
+ }
+ }
+
+ // CLI Utility Methods
+
+ public static SettingsPopulation get(Map<String, String[]> clArgs, SettingsCommand command)
+ {
+ // set default size to number of commands requested, unless set to err convergence, then use 1M
+ String defaultLimit = command.count <= 0 ? "1000000" : Long.toString(command.count);
+
+ String[] params = clArgs.remove("-pop");
+ if (params == null)
+ {
+ // return defaults:
+ switch(command.type)
+ {
+ case WRITE:
+ case COUNTER_WRITE:
+ return new SettingsPopulation(new SequentialOptions(defaultLimit));
+ default:
+ return new SettingsPopulation(new DistributionOptions(defaultLimit));
+ }
+ }
+ GroupedOptions options = GroupedOptions.select(params, new SequentialOptions(defaultLimit), new DistributionOptions(defaultLimit));
+ if (options == null)
+ {
+ printHelp();
+ System.out.println("Invalid -pop options provided, see output for valid options");
+ System.exit(1);
+ }
+ return options instanceof SequentialOptions ?
+ new SettingsPopulation((SequentialOptions) options) :
+ new SettingsPopulation((DistributionOptions) options);
+ }
+
+ public static void printHelp()
+ {
+ GroupedOptions.printOptions(System.out, "-pop", new SequentialOptions("N"), new DistributionOptions("N"));
+ }
+
+ public static Runnable helpPrinter()
+ {
+ return new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ printHelp();
+ }
+ };
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSchema.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSchema.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSchema.java
index 5fb2bb2..6e3a02e 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSchema.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsSchema.java
@@ -44,12 +44,7 @@ public class SettingsSchema implements Serializable
public SettingsSchema(Options options, SettingsCommand command)
{
if (command instanceof SettingsCommandUser)
- {
- if (options.compaction.setByUser() || options.keyspace.setByUser() || options.compression.setByUser() || options.replication.setByUser())
- throw new IllegalArgumentException("Cannot provide command line schema settings if a user profile is provided");
-
keyspace = ((SettingsCommandUser) command).profile.keyspaceName;
- }
else
keyspace = options.keyspace.value();
@@ -62,14 +57,7 @@ public class SettingsSchema implements Serializable
public void createKeySpaces(StressSettings settings)
{
- if (!(settings.command instanceof SettingsCommandUser))
- {
- createKeySpacesThrift(settings);
- }
- else
- {
- ((SettingsCommandUser) settings.command).profile.maybeCreateSchema(settings);
- }
+ createKeySpacesThrift(settings);
}
@@ -189,6 +177,9 @@ public class SettingsSchema implements Serializable
if (params == null)
return new SettingsSchema(new Options(), command);
+ if (command instanceof SettingsCommandUser)
+ throw new IllegalArgumentException("-schema can only be provided with predefined operations insert, read, etc.; the 'user' command requires a schema yaml instead");
+
GroupedOptions options = GroupedOptions.select(params, new Options());
if (options == null)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
index ab57289..bdd10e5 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/StressSettings.java
@@ -40,8 +40,10 @@ public class StressSettings implements Serializable
{
public final SettingsCommand command;
public final SettingsRate rate;
- public final SettingsKey keys;
+ public final SettingsPopulation generate;
+ public final SettingsInsert insert;
public final SettingsColumn columns;
+ public final SettingsErrors errors;
public final SettingsLog log;
public final SettingsMode mode;
public final SettingsNode node;
@@ -50,12 +52,14 @@ public class StressSettings implements Serializable
public final SettingsPort port;
public final String sendToDaemon;
- public StressSettings(SettingsCommand command, SettingsRate rate, SettingsKey keys, SettingsColumn columns, SettingsLog log, SettingsMode mode, SettingsNode node, SettingsSchema schema, SettingsTransport transport, SettingsPort port, String sendToDaemon)
+ public StressSettings(SettingsCommand command, SettingsRate rate, SettingsPopulation generate, SettingsInsert insert, SettingsColumn columns, SettingsErrors errors, SettingsLog log, SettingsMode mode, SettingsNode node, SettingsSchema schema, SettingsTransport transport, SettingsPort port, String sendToDaemon)
{
this.command = command;
this.rate = rate;
- this.keys = keys;
+ this.insert = insert;
+ this.generate = generate;
this.columns = columns;
+ this.errors = errors;
this.log = log;
this.mode = mode;
this.node = node;
@@ -129,7 +133,7 @@ public class StressSettings implements Serializable
}
catch (Exception e)
{
- throw new RuntimeException(e.getMessage());
+ throw new RuntimeException(e);
}
return client;
@@ -189,9 +193,10 @@ public class StressSettings implements Serializable
public void maybeCreateKeyspaces()
{
- if (command.type == Command.WRITE || command.type == Command.COUNTER_WRITE || command.type == Command.USER)
+ if (command.type == Command.WRITE || command.type == Command.COUNTER_WRITE)
schema.createKeySpaces(this);
-
+ else if (command.type == Command.USER)
+ ((SettingsCommandUser) command).profile.maybeCreateSchema(this);
}
public static StressSettings parse(String[] args)
@@ -221,8 +226,10 @@ public class StressSettings implements Serializable
String sendToDaemon = SettingsMisc.getSendToDaemon(clArgs);
SettingsPort port = SettingsPort.get(clArgs);
SettingsRate rate = SettingsRate.get(clArgs, command);
- SettingsKey keys = SettingsKey.get(clArgs, command);
+ SettingsPopulation generate = SettingsPopulation.get(clArgs, command);
+ SettingsInsert insert = SettingsInsert.get(clArgs);
SettingsColumn columns = SettingsColumn.get(clArgs);
+ SettingsErrors errors = SettingsErrors.get(clArgs);
SettingsLog log = SettingsLog.get(clArgs);
SettingsMode mode = SettingsMode.get(clArgs);
SettingsNode node = SettingsNode.get(clArgs);
@@ -244,7 +251,7 @@ public class StressSettings implements Serializable
}
System.exit(1);
}
- return new StressSettings(command, rate, keys, columns, log, mode, node, schema, transport, port, sendToDaemon);
+ return new StressSettings(command, rate, generate, insert, columns, errors, log, mode, node, schema, transport, port, sendToDaemon);
}
private static Map<String, String[]> parseMap(String[] args)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/util/DynamicList.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/DynamicList.java b/tools/stress/src/org/apache/cassandra/stress/util/DynamicList.java
new file mode 100644
index 0000000..2a38e7d
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/util/DynamicList.java
@@ -0,0 +1,259 @@
+package org.apache.cassandra.stress.util;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.TreeSet;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.cassandra.stress.generate.FasterRandom;
+
+// simple thread-unsafe skiplist that permits indexing/removal by position, insertion at the end
+// (though easily extended to insertion at any position, not necessary here)
+// we use it for sampling items by position for visiting writes in the pool of pending writes
+public class DynamicList<E>
+{
+
+ // represents a value and an index simultaneously; each node maintains a list
+ // of next pointers for each height in the skip-list this node participates in
+ // (a contiguous range from [0..height))
+ public static class Node<E>
+ {
+ // stores the size of each descendant
+ private final int[] size;
+ // TODO: alternate links to save space
+ private final Node<E>[] links;
+ private final E value;
+
+ private Node(int height, E value)
+ {
+ this.value = value;
+ links = new Node[height * 2];
+ size = new int[height];
+ Arrays.fill(size, 1);
+ }
+
+ private int height()
+ {
+ return size.length;
+ }
+
+ private Node<E> next(int i)
+ {
+ return links[i * 2];
+ }
+
+ private Node<E> prev(int i)
+ {
+ return links[1 + i * 2];
+ }
+
+ private void setNext(int i, Node<E> next)
+ {
+ links[i * 2] = next;
+ }
+
+ private void setPrev(int i, Node<E> prev)
+ {
+ links[1 + i * 2] = prev;
+ }
+
+ private Node parent(int parentHeight)
+ {
+ Node prev = this;
+ while (true)
+ {
+ int height = prev.height();
+ if (parentHeight < height)
+ return prev;
+ prev = prev.prev(height - 1);
+ }
+ }
+ }
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final int maxHeight;
+ private final Node<E> head;
+ private int size;
+
+ public DynamicList(int maxExpectedSize)
+ {
+ this.maxHeight = 3 + (int) Math.ceil(Math.log(maxExpectedSize) / Math.log(2));
+ head = new Node<>(maxHeight, null);
+ }
+
+ private int randomLevel()
+ {
+ return 1 + Integer.bitCount(ThreadLocalRandom.current().nextInt() & ((1 << (maxHeight - 1)) - 1));
+ }
+
+ // add the value to the end of the list, and return the associated Node that permits efficient removal
+ // regardless of its future position in the list from other modifications
+ public Node<E> append(E value)
+ {
+ Node<E> newTail = new Node<>(randomLevel(), value);
+
+ lock.writeLock().lock();
+ try
+ {
+ size++;
+
+ Node<E> tail = head;
+ for (int i = maxHeight - 1 ; i >= newTail.height() ; i--)
+ {
+ Node<E> next;
+ while ((next = tail.next(i)) != null)
+ tail = next;
+ tail.size[i]++;
+ }
+
+ for (int i = newTail.height() - 1 ; i >= 0 ; i--)
+ {
+ Node<E> next;
+ while ((next = tail.next(i)) != null)
+ tail = next;
+ tail.setNext(i, newTail);
+ newTail.setPrev(i, tail);
+ }
+
+ return newTail;
+ }
+ finally
+ {
+ lock.writeLock().unlock();
+ }
+ }
+
+ // remove the provided node and its associated value from the list
+ public void remove(Node<E> node)
+ {
+ lock.writeLock().lock();
+ try
+ {
+ size--;
+
+ // go up through each level in the skip list, unlinking this node; this entails
+ // simply linking each neighbour to each other, and appending the size of the
+ // current level owned by this node's index to the preceding neighbour (since
+ // ownership is defined as any node that you must visit through the index,
+ // removal of ourselves from a level means the preceding index entry is the
+ // entry point to all of the removed node's descendants)
+ for (int i = 0 ; i < node.height() ; i++)
+ {
+ Node<E> prev = node.prev(i);
+ Node<E> next = node.next(i);
+ assert prev != null;
+ prev.setNext(i, next);
+ if (next != null)
+ next.setPrev(i, prev);
+ prev.size[i] += node.size[i] - 1;
+ }
+
+ // then go up the levels, removing 1 from the size at each height above ours
+ for (int i = node.height() ; i < maxHeight ; i++)
+ {
+ // if we're at our height limit, we backtrack at our top level until we
+ // hit a neighbour with a greater height
+ while (i == node.height())
+ node = node.prev(i - 1);
+ node.size[i]--;
+ }
+ }
+ finally
+ {
+ lock.writeLock().unlock();
+ }
+ }
+
+ // retrieve the item at the provided index, or return null if the index is past the end of the list
+ public E get(int index)
+ {
+ lock.readLock().lock();
+ try
+ {
+ if (index >= size)
+ return null;
+
+ index++;
+ int c = 0;
+ Node<E> finger = head;
+ for (int i = maxHeight - 1 ; i >= 0 ; i--)
+ {
+ while (c + finger.size[i] <= index)
+ {
+ c += finger.size[i];
+ finger = finger.next(i);
+ }
+ }
+
+ assert c == index;
+ return finger.value;
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
+ }
+
+ // some quick and dirty tests to confirm the skiplist works as intended
+ // don't create a separate unit test - tools tree doesn't currently warrant them
+
+ private boolean isWellFormed()
+ {
+ for (int i = 0 ; i < maxHeight ; i++)
+ {
+ int c = 0;
+ for (Node node = head ; node != null ; node = node.next(i))
+ {
+ if (node.prev(i) != null && node.prev(i).next(i) != node)
+ return false;
+ if (node.next(i) != null && node.next(i).prev(i) != node)
+ return false;
+ c += node.size[i];
+ if (i + 1 < maxHeight && node.parent(i + 1).next(i + 1) == node.next(i))
+ {
+ if (node.parent(i + 1).size[i + 1] != c)
+ return false;
+ c = 0;
+ }
+ }
+ if (i == maxHeight - 1 && c != size + 1)
+ return false;
+ }
+ return true;
+ }
+
+ public static void main(String[] args)
+ {
+ DynamicList<Integer> list = new DynamicList<>(20);
+ TreeSet<Integer> canon = new TreeSet<>();
+ HashMap<Integer, Node> nodes = new HashMap<>();
+ int c = 0;
+ for (int i = 0 ; i < 100000 ; i++)
+ {
+ nodes.put(c, list.append(c));
+ canon.add(c);
+ c++;
+ }
+ FasterRandom rand = new FasterRandom();
+ assert list.isWellFormed();
+ for (int loop = 0 ; loop < 100 ; loop++)
+ {
+ System.out.println(loop);
+ for (int i = 0 ; i < 100000 ; i++)
+ {
+ int index = rand.nextInt(100000);
+ Integer seed = list.get(index);
+// assert canon.headSet(seed, false).size() == index;
+ list.remove(nodes.remove(seed));
+ canon.remove(seed);
+ nodes.put(c, list.append(c));
+ canon.add(c);
+ c++;
+ }
+ assert list.isWellFormed();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/util/Timer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/Timer.java b/tools/stress/src/org/apache/cassandra/stress/util/Timer.java
index 45e1ba7..4e2b0a3 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/Timer.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/Timer.java
@@ -30,7 +30,7 @@ import java.util.concurrent.CountDownLatch;
public final class Timer
{
- private static final int SAMPLE_SIZE_SHIFT = 10;
+ private static final int SAMPLE_SIZE_SHIFT = 14;
private static final int SAMPLE_SIZE_MASK = (1 << SAMPLE_SIZE_SHIFT) - 1;
private final Random rnd = new Random();
@@ -66,6 +66,11 @@ public final class Timer
return 1 + (index >>> SAMPLE_SIZE_SHIFT);
}
+ public boolean running()
+ {
+ return finalReport == null;
+ }
+
public void stop(long partitionCount, long rowCount)
{
maybeReport();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/util/Timing.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/Timing.java b/tools/stress/src/org/apache/cassandra/stress/util/Timing.java
index 2bdca82..b6d4e52 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/Timing.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/Timing.java
@@ -40,6 +40,7 @@ public class Timing
private final CopyOnWriteArrayList<Timer> timers = new CopyOnWriteArrayList<>();
private volatile TimingInterval history;
private final Random rnd = new Random();
+ private boolean done;
// TIMING
@@ -57,11 +58,16 @@ public class Timing
if (!ready.await(2L, TimeUnit.MINUTES))
throw new RuntimeException("Timed out waiting for a timer thread - seems one got stuck");
+ boolean done = true;
// reports have been filled in by timer threadCount, so merge
List<TimingInterval> intervals = new ArrayList<>();
for (Timer timer : timers)
+ {
intervals.add(timer.report);
+ done &= !timer.running();
+ }
+ this.done = done;
return TimingInterval.merge(rnd, intervals, Integer.MAX_VALUE, history.endNanos());
}
@@ -78,10 +84,15 @@ public class Timing
history = new TimingInterval(System.nanoTime());
}
+ public boolean done()
+ {
+ return done;
+ }
+
public TimingInterval snapInterval() throws InterruptedException
{
final TimingInterval interval = snapInterval(rnd);
- history = TimingInterval.merge(rnd, Arrays.asList(interval, history), 50000, history.startNanos());
+ history = TimingInterval.merge(rnd, Arrays.asList(interval, history), 200000, history.startNanos());
return interval;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0580fb2b/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java b/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java
index db3fef1..50ab608 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java
@@ -97,14 +97,14 @@ public final class TimingInterval
}
- public double realOpRate()
+ public double opRate()
{
return operationCount / ((end - start) * 0.000000001d);
}
- public double adjustedOpRate()
+ public double adjustedRowRate()
{
- return operationCount / ((end - (start + pauseLength)) * 0.000000001d);
+ return rowCount / ((end - (start + pauseLength)) * 0.000000001d);
}
public double partitionRate()