You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/03/10 15:26:42 UTC
[1/2] cassandra git commit: cassandra-stress prints per operation
statistics, plus misc other stress updates
Repository: cassandra
Updated Branches:
refs/heads/trunk bf9c50313 -> a0586f692
cassandra-stress prints per operation statistics, plus misc other stress updates
patch by anthony cozzie, reviewed by benedict for CASSANDRA-8769
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6bbfb557
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6bbfb557
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6bbfb557
Branch: refs/heads/trunk
Commit: 6bbfb5574e0487d490eb5872c70853c4c56d2940
Parents: ef6fa37
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Tue Mar 10 14:19:23 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Tue Mar 10 14:19:23 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/stress/Operation.java | 2 +-
.../apache/cassandra/stress/StressAction.java | 46 +++++----
.../apache/cassandra/stress/StressMetrics.java | 83 ++++++++++-----
.../apache/cassandra/stress/StressProfile.java | 15 ++-
.../stress/operations/FixedOpDistribution.java | 10 ++
.../stress/operations/OpDistribution.java | 2 +
.../operations/OpDistributionFactory.java | 6 +-
.../operations/SampledOpDistribution.java | 17 ++++
.../SampledOpDistributionFactory.java | 10 +-
.../cassandra/stress/settings/Command.java | 27 +++--
.../cassandra/stress/settings/Option.java | 1 +
.../stress/settings/OptionAnyProbabilities.java | 5 +
.../stress/settings/OptionDistribution.java | 17 ++--
.../cassandra/stress/settings/OptionMulti.java | 15 ++-
.../settings/OptionRatioDistribution.java | 5 +
.../stress/settings/OptionReplication.java | 2 +-
.../stress/settings/SettingsCommand.java | 48 ++++++---
.../settings/SettingsCommandPreDefined.java | 12 ++-
.../stress/settings/SettingsCommandUser.java | 5 +
.../org/apache/cassandra/stress/util/Timer.java | 10 +-
.../apache/cassandra/stress/util/Timing.java | 75 +++++++++-----
.../cassandra/stress/util/TimingInterval.java | 102 +++++++++++++------
23 files changed, 355 insertions(+), 161 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b2ac1aa..af5206b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.4
+ * cassandra-stress reports per-operation statistics, plus misc (CASSANDRA-8769)
* Add SimpleDate (cql date) and Time (cql time) types (CASSANDRA-7523)
* Use long for key count in cfstats (CASSANDRA-8913)
* Make SSTableRewriter.abort() more robust to failure (CASSANDRA-8832)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/Operation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Operation.java b/tools/stress/src/org/apache/cassandra/stress/Operation.java
index 05045f8..f4ac5ee 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Operation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Operation.java
@@ -176,7 +176,7 @@ public abstract class Operation
}
}
- timer.stop(run.partitionCount(), run.rowCount());
+ timer.stop(run.partitionCount(), run.rowCount(), !success);
if (!success)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/StressAction.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
index 1433742..f906a55 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
@@ -30,10 +30,11 @@ import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.cassandra.stress.operations.OpDistribution;
import org.apache.cassandra.stress.operations.OpDistributionFactory;
+import org.apache.cassandra.stress.settings.SettingsCommand;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.JavaDriverClient;
import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.stress.util.TimingInterval;
import org.apache.cassandra.transport.SimpleClient;
public class StressAction implements Runnable
@@ -53,13 +54,14 @@ public class StressAction implements Runnable
// creating keyspace and column families
settings.maybeCreateKeyspaces();
- // TODO: warmup should operate configurably over op/pk/row, and be of configurable length
- if (!settings.command.noWarmup)
- warmup(settings.command.getFactory(settings));
-
output.println("Sleeping 2s...");
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+ if (!settings.command.noWarmup)
+ warmup(settings.command.getFactory(settings));
+ if (settings.command.truncate == SettingsCommand.TruncateWhen.ONCE)
+ settings.command.truncateTables(settings);
+
// TODO : move this to a new queue wrapper that gates progress based on a poisson (or configurable) distribution
RateLimiter rateLimiter = null;
if (settings.rate.opRateTargetPerSecond > 0)
@@ -86,12 +88,19 @@ public class StressAction implements Runnable
// warmup - do 50k iterations; by default hotspot compiles methods after 10k invocations
PrintStream warmupOutput = new PrintStream(new OutputStream() { @Override public void write(int b) throws IOException { } } );
int iterations = 50000 * settings.node.nodes.size();
+ int threads = 20;
+
+ if (settings.rate.maxThreads > 0)
+ threads = Math.min(threads, settings.rate.maxThreads);
+ if (settings.rate.threadCount > 0)
+ threads = Math.min(threads, settings.rate.threadCount);
+
for (OpDistributionFactory single : operations.each())
{
// we need to warm up all the nodes in the cluster ideally, but we may not be the only stress instance;
// so warm up all the nodes we're speaking to only.
output.println(String.format("Warming up %s with %d iterations...", single.desc(), iterations));
- run(single, 20, iterations, 0, null, null, warmupOutput);
+ run(single, threads, iterations, 0, null, null, warmupOutput);
}
}
@@ -109,6 +118,9 @@ public class StressAction implements Runnable
{
output.println(String.format("Running with %d threadCount", threadCount));
+ if (settings.command.truncate == SettingsCommand.TruncateWhen.ALWAYS)
+ settings.command.truncateTables(settings);
+
StressMetrics result = run(settings.command.getFactory(settings), threadCount, settings.command.count,
settings.command.duration, rateLimiter, settings.command.durationUnits, output);
if (result == null)
@@ -146,7 +158,7 @@ public class StressAction implements Runnable
} while (!auto || (hasAverageImprovement(results, 3, 0) && hasAverageImprovement(results, 5, settings.command.targetUncertainty)));
// summarise all results
- StressMetrics.summarise(runIds, results, output);
+ StressMetrics.summarise(runIds, results, output, settings.samples.historyCount);
return true;
}
@@ -187,8 +199,8 @@ public class StressAction implements Runnable
final Consumer[] consumers = new Consumer[threadCount];
for (int i = 0; i < threadCount; i++)
{
- Timer timer = metrics.getTiming().newTimer(settings.samples.liveCount / threadCount);
- consumers[i] = new Consumer(operations, done, workManager, timer, metrics, rateLimiter);
+ consumers[i] = new Consumer(operations, done, workManager, metrics, rateLimiter,
+ settings.samples.liveCount / threadCount);
}
// starting worker threadCount
@@ -240,28 +252,27 @@ public class StressAction implements Runnable
private final OpDistribution operations;
private final StressMetrics metrics;
- private final Timer timer;
private final RateLimiter rateLimiter;
private volatile boolean success = true;
private final WorkManager workManager;
private final CountDownLatch done;
- public Consumer(OpDistributionFactory operations, CountDownLatch done, WorkManager workManager, Timer timer, StressMetrics metrics, RateLimiter rateLimiter)
+ public Consumer(OpDistributionFactory operations, CountDownLatch done, WorkManager workManager, StressMetrics metrics,
+ RateLimiter rateLimiter, int sampleCount)
{
this.done = done;
this.rateLimiter = rateLimiter;
this.workManager = workManager;
this.metrics = metrics;
- this.timer = timer;
- this.operations = operations.get(timer);
+ this.operations = operations.get(metrics.getTiming(), sampleCount);
}
public void run()
{
- timer.init();
+ operations.initTimers();
+
try
{
-
SimpleClient sclient = null;
ThriftClient tclient = null;
JavaDriverClient jclient = null;
@@ -324,11 +335,8 @@ public class StressAction implements Runnable
finally
{
done.countDown();
- timer.close();
+ operations.closeTimers();
}
-
}
-
}
-
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
index d1cc0d4..46ca488 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
@@ -24,18 +24,16 @@ package org.apache.cassandra.stress;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
+import org.apache.cassandra.stress.util.*;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.stress.settings.StressSettings;
-import org.apache.cassandra.stress.util.JmxCollector;
-import org.apache.cassandra.stress.util.Timing;
-import org.apache.cassandra.stress.util.TimingInterval;
-import org.apache.cassandra.stress.util.Uncertainty;
public class StressMetrics
{
@@ -51,10 +49,12 @@ public class StressMetrics
private final Timing timing;
private final Callable<JmxCollector.GcStats> gcStatsCollector;
private volatile JmxCollector.GcStats totalGcStats;
+ private final StressSettings settings;
public StressMetrics(PrintStream output, final long logIntervalMillis, StressSettings settings)
{
this.output = output;
+ this.settings = settings;
Callable<JmxCollector.GcStats> gcStatsCollector;
totalGcStats = new JmxCollector.GcStats(0);
try
@@ -93,7 +93,9 @@ public class StressMetrics
{
try
{
- long sleep = timing.getHistory().endMillis() + logIntervalMillis - System.currentTimeMillis();
+ long sleepNanos = timing.getHistory().endNanos() - System.nanoTime();
+ long sleep = (sleepNanos / 1000000) + logIntervalMillis;
+
if (sleep < logIntervalMillis >>> 3)
// if had a major hiccup, sleep full interval
Thread.sleep(logIntervalMillis);
@@ -154,9 +156,19 @@ public class StressMetrics
{
Timing.TimingResult<JmxCollector.GcStats> result = timing.snap(gcStatsCollector);
totalGcStats = JmxCollector.GcStats.aggregate(Arrays.asList(totalGcStats, result.extra));
- if (result.timing.partitionCount != 0)
- printRow("", result.timing, timing.getHistory(), result.extra, rowRateUncertainty, output);
- rowRateUncertainty.update(result.timing.adjustedRowRate());
+ TimingInterval current = result.intervals.combine(settings.samples.reportCount);
+ TimingInterval history = timing.getHistory().combine(settings.samples.historyCount);
+ rowRateUncertainty.update(current.adjustedRowRate());
+ if (current.partitionCount != 0)
+ {
+ if (result.intervals.intervals().size() > 1)
+ {
+ for (Map.Entry<String, TimingInterval> type : result.intervals.intervals().entrySet())
+ printRow("", type.getKey(), type.getValue(), timing.getHistory().get(type.getKey()), result.extra, rowRateUncertainty, output);
+ }
+
+ printRow("", "total", current, history, result.extra, rowRateUncertainty, output);
+ }
if (timing.done())
stop = true;
}
@@ -164,19 +176,19 @@ public class StressMetrics
// PRINT FORMATTING
- public static final String HEADFORMAT = "%-10s,%10s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%7s,%9s,%7s,%8s,%8s,%8s,%8s";
- public static final String ROWFORMAT = "%-10d,%10.0f,%8.0f,%8.0f,%8.0f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%7.1f,%9.5f,%7.0f,%8.0f,%8.0f,%8.0f,%8.0f";
+ public static final String HEADFORMAT = "%-10s%10s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%7s,%9s,%7s,%7s,%8s,%8s,%8s,%8s";
+ public static final String ROWFORMAT = "%-10s%10d,%8.0f,%8.0f,%8.0f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%7.1f,%9.5f,%7d,%7.0f,%8.0f,%8.0f,%8.0f,%8.0f";
private static void printHeader(String prefix, PrintStream output)
{
- output.println(prefix + String.format(HEADFORMAT, "total ops","adj row/s","op/s","pk/s","row/s","mean","med",".95",".99",".999","max","time","stderr", "gc: #", "max ms", "sum ms", "sdv ms", "mb"));
+ output.println(prefix + String.format(HEADFORMAT, "type,", "total ops","op/s","pk/s","row/s","mean","med",".95",".99",".999","max","time","stderr", "errors", "gc: #", "max ms", "sum ms", "sdv ms", "mb"));
}
- private static void printRow(String prefix, TimingInterval interval, TimingInterval total, JmxCollector.GcStats gcStats, Uncertainty opRateUncertainty, PrintStream output)
+ private static void printRow(String prefix, String type, TimingInterval interval, TimingInterval total, JmxCollector.GcStats gcStats, Uncertainty opRateUncertainty, PrintStream output)
{
output.println(prefix + String.format(ROWFORMAT,
+ type + ",",
total.operationCount,
- interval.adjustedRowRate(),
interval.opRate(),
interval.partitionRate(),
interval.rowRate(),
@@ -188,6 +200,7 @@ public class StressMetrics
interval.maxLatency(),
total.runTime() / 1000f,
opRateUncertainty.getUncertainty(),
+ interval.errorCount,
gcStats.count,
gcStats.maxms,
gcStats.summs,
@@ -200,16 +213,20 @@ public class StressMetrics
{
output.println("\n");
output.println("Results:");
- TimingInterval history = timing.getHistory();
- output.println(String.format("op rate : %.0f", history.opRate()));
- output.println(String.format("partition rate : %.0f", history.partitionRate()));
- output.println(String.format("row rate : %.0f", history.rowRate()));
- output.println(String.format("latency mean : %.1f", history.meanLatency()));
- output.println(String.format("latency median : %.1f", history.medianLatency()));
- output.println(String.format("latency 95th percentile : %.1f", history.rankLatency(.95f)));
- output.println(String.format("latency 99th percentile : %.1f", history.rankLatency(0.99f)));
- output.println(String.format("latency 99.9th percentile : %.1f", history.rankLatency(0.999f)));
- output.println(String.format("latency max : %.1f", history.maxLatency()));
+
+ TimingIntervals opHistory = timing.getHistory();
+ TimingInterval history = opHistory.combine(settings.samples.historyCount);
+ output.println(String.format("op rate : %.0f %s", history.opRate(), opHistory.opRates()));
+ output.println(String.format("partition rate : %.0f %s", history.partitionRate(), opHistory.partitionRates()));
+ output.println(String.format("row rate : %.0f %s", history.rowRate(), opHistory.rowRates()));
+ output.println(String.format("latency mean : %.1f %s", history.meanLatency(), opHistory.meanLatencies()));
+ output.println(String.format("latency median : %.1f %s", history.medianLatency(), opHistory.medianLatencies()));
+ output.println(String.format("latency 95th percentile : %.1f %s", history.rankLatency(.95f), opHistory.rankLatencies(0.95f)));
+ output.println(String.format("latency 99th percentile : %.1f %s", history.rankLatency(0.99f), opHistory.rankLatencies(0.99f)));
+ output.println(String.format("latency 99.9th percentile : %.1f %s", history.rankLatency(0.999f), opHistory.rankLatencies(0.999f)));
+ output.println(String.format("latency max : %.1f %s", history.maxLatency(), opHistory.maxLatencies()));
+ output.println(String.format("Total partitions : %d %s", history.partitionCount, opHistory.partitionCounts()));
+ output.println(String.format("Total errors : %d %s", history.errorCount, opHistory.errorCounts()));
output.println(String.format("total gc count : %.0f", totalGcStats.count));
output.println(String.format("total gc mb : %.0f", totalGcStats.bytes / (1 << 20)));
output.println(String.format("total gc time (s) : %.0f", totalGcStats.summs / 1000));
@@ -219,7 +236,7 @@ public class StressMetrics
history.runTime(), "HH:mm:ss", true));
}
- public static void summarise(List<String> ids, List<StressMetrics> summarise, PrintStream out)
+ public static void summarise(List<String> ids, List<StressMetrics> summarise, PrintStream out, int historySampleCount)
{
int idLen = 0;
for (String id : ids)
@@ -227,13 +244,27 @@ public class StressMetrics
String formatstr = "%" + idLen + "s, ";
printHeader(String.format(formatstr, "id"), out);
for (int i = 0 ; i < ids.size() ; i++)
+ {
+ for (Map.Entry<String, TimingInterval> type : summarise.get(i).timing.getHistory().intervals().entrySet())
+ {
+ printRow(String.format(formatstr, ids.get(i)),
+ type.getKey(),
+ type.getValue(),
+ type.getValue(),
+ summarise.get(i).totalGcStats,
+ summarise.get(i).rowRateUncertainty,
+ out);
+ }
+ TimingInterval hist = summarise.get(i).timing.getHistory().combine(historySampleCount);
printRow(String.format(formatstr, ids.get(i)),
- summarise.get(i).timing.getHistory(),
- summarise.get(i).timing.getHistory(),
+ "total",
+ hist,
+ hist,
summarise.get(i).totalGcStats,
summarise.get(i).rowRateUncertainty,
out
);
+ }
}
public Timing getTiming()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
index 687b3ae..6c73214 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
@@ -43,10 +43,7 @@ import org.apache.cassandra.stress.generate.*;
import org.apache.cassandra.stress.generate.values.*;
import org.apache.cassandra.stress.operations.userdefined.SchemaInsert;
import org.apache.cassandra.stress.operations.userdefined.SchemaQuery;
-import org.apache.cassandra.stress.settings.OptionDistribution;
-import org.apache.cassandra.stress.settings.OptionRatioDistribution;
-import org.apache.cassandra.stress.settings.StressSettings;
-import org.apache.cassandra.stress.settings.ValidationType;
+import org.apache.cassandra.stress.settings.*;
import org.apache.cassandra.stress.util.JavaDriverClient;
import org.apache.cassandra.stress.util.ThriftClient;
import org.apache.cassandra.stress.util.Timer;
@@ -188,6 +185,16 @@ public class StressProfile implements Serializable
maybeLoadSchemaInfo(settings);
}
+ public void truncateTable(StressSettings settings)
+ {
+ JavaDriverClient client = settings.getJavaDriverClient(false);
+ assert settings.command.truncate != SettingsCommand.TruncateWhen.NEVER;
+ String cql = String.format("TRUNCATE %s.%s", keyspaceName, tableName);
+ client.execute(cql, org.apache.cassandra.db.ConsistencyLevel.ONE);
+ System.out.println(String.format("Truncated %s.%s. Sleeping %ss for propagation.",
+ keyspaceName, tableName, settings.node.nodes.size()));
+ Uninterruptibles.sleepUninterruptibly(settings.node.nodes.size(), TimeUnit.SECONDS);
+ }
private void maybeLoadSchemaInfo(StressSettings settings)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java b/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
index 533b630..f2616cf 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
@@ -36,4 +36,14 @@ public class FixedOpDistribution implements OpDistribution
{
return operation;
}
+
+ public void initTimers()
+ {
+ operation.timer.init();
+ }
+
+ public void closeTimers()
+ {
+ operation.timer.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
index 0fc15a6..e09300a 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
@@ -28,4 +28,6 @@ public interface OpDistribution
Operation next();
+ public void initTimers();
+ public void closeTimers();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
index afbae7d..7e13fcd 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
@@ -21,13 +21,11 @@ package org.apache.cassandra.stress.operations;
*/
-import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.stress.util.Timing;
public interface OpDistributionFactory
{
-
- public OpDistribution get(Timer timer);
+ public OpDistribution get(Timing timing, int sampleCount);
public String desc();
Iterable<OpDistributionFactory> each();
-
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
index 432e991..9698421 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
@@ -25,6 +25,7 @@ import org.apache.commons.math3.distribution.EnumeratedDistribution;
import org.apache.cassandra.stress.Operation;
import org.apache.cassandra.stress.generate.Distribution;
+import org.apache.commons.math3.util.Pair;
public class SampledOpDistribution implements OpDistribution
{
@@ -50,4 +51,20 @@ public class SampledOpDistribution implements OpDistribution
remaining--;
return cur;
}
+
+ public void initTimers()
+ {
+ for (Pair<Operation, Double> op : operations.getPmf())
+ {
+ op.getFirst().timer.init();
+ }
+ }
+
+ public void closeTimers()
+ {
+ for (Pair<Operation, Double> op : operations.getPmf())
+ {
+ op.getFirst().timer.close();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
index 9713e93..10191a6 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
@@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.cassandra.stress.util.Timing;
import org.apache.commons.math3.distribution.EnumeratedDistribution;
import org.apache.commons.math3.util.Pair;
@@ -49,12 +50,13 @@ public abstract class SampledOpDistributionFactory<T> implements OpDistributionF
protected abstract Operation get(Timer timer, PartitionGenerator generator, T key);
protected abstract PartitionGenerator newGenerator();
- public OpDistribution get(Timer timer)
+ public OpDistribution get(Timing timing, int sampleCount)
{
PartitionGenerator generator = newGenerator();
List<Pair<Operation, Double>> operations = new ArrayList<>();
for (Map.Entry<T, Double> ratio : ratios.entrySet())
- operations.add(new Pair<>(get(timer, generator, ratio.getKey()), ratio.getValue()));
+ operations.add(new Pair<>(get(timing.newTimer(ratio.getKey().toString(), sampleCount), generator, ratio.getKey()),
+ ratio.getValue()));
return new SampledOpDistribution(new EnumeratedDistribution<>(operations), clustering.get());
}
@@ -73,9 +75,9 @@ public abstract class SampledOpDistributionFactory<T> implements OpDistributionF
{
out.add(new OpDistributionFactory()
{
- public OpDistribution get(Timer timer)
+ public OpDistribution get(Timing timing, int sampleCount)
{
- return new FixedOpDistribution(SampledOpDistributionFactory.this.get(timer, newGenerator(), ratio.getKey()));
+ return new FixedOpDistribution(SampledOpDistributionFactory.this.get(timing.newTimer(ratio.getKey().toString(), sampleCount), newGenerator(), ratio.getKey()));
}
public String desc()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/settings/Command.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/Command.java b/tools/stress/src/org/apache/cassandra/stress/settings/Command.java
index 9a93e34..c47c5d2 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/Command.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/Command.java
@@ -31,38 +31,37 @@ import com.google.common.collect.ImmutableList;
public enum Command
{
- READ(false, "standard1", "Super1",
+ READ(false, "standard1",
"Multiple concurrent reads - the cluster must first be populated by a write test",
CommandCategory.BASIC
),
- WRITE(true, "standard1", "Super1",
+ WRITE(true, "standard1",
"insert",
"Multiple concurrent writes against the cluster",
CommandCategory.BASIC
),
- MIXED(true, null, null,
+ MIXED(true, null,
"Interleaving of any basic commands, with configurable ratio and distribution - the cluster must first be populated by a write test",
CommandCategory.MIXED
),
- COUNTER_WRITE(true, "counter1", "SuperCounter1",
+ COUNTER_WRITE(true, "counter1",
"counter_add",
"Multiple concurrent updates of counters.",
CommandCategory.BASIC
),
- COUNTER_READ(false, "counter1", "SuperCounter1",
+ COUNTER_READ(false, "counter1",
"counter_get",
"Multiple concurrent reads of counters. The cluster must first be populated by a counterwrite test.",
CommandCategory.BASIC
),
- USER(true, null, null,
+ USER(true, null,
"Interleaving of user provided queries, with configurable ratio and distribution",
CommandCategory.USER
),
- HELP(false, null, null, "-?", "Print help for a command or option", null),
- PRINT(false, null, null, "Inspect the output of a distribution definition", null),
- LEGACY(false, null, null, "Legacy support mode", null)
-
+ HELP(false, null, "-?", "Print help for a command or option", null),
+ PRINT(false, null, "Inspect the output of a distribution definition", null),
+ LEGACY(false, null, "Legacy support mode", null)
;
private static final Map<String, Command> LOOKUP;
@@ -87,17 +86,15 @@ public enum Command
public final List<String> names;
public final String description;
public final String table;
- public final String supertable;
- Command(boolean updates, String table, String supertable, String description, CommandCategory category)
+ Command(boolean updates, String table, String description, CommandCategory category)
{
- this(updates, table, supertable, null, description, category);
+ this(updates, table, null, description, category);
}
- Command(boolean updates, String table, String supertable, String extra, String description, CommandCategory category)
+ Command(boolean updates, String table, String extra, String description, CommandCategory category)
{
this.table = table;
- this.supertable = supertable;
this.updates = updates;
this.category = category;
List<String> names = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/settings/Option.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/Option.java b/tools/stress/src/org/apache/cassandra/stress/settings/Option.java
index a9e669c..b9e402e 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/Option.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/Option.java
@@ -32,6 +32,7 @@ abstract class Option
abstract String longDisplay();
abstract List<String> multiLineDisplay();
abstract boolean setByUser();
+ abstract boolean present();
public int hashCode()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/settings/OptionAnyProbabilities.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/OptionAnyProbabilities.java b/tools/stress/src/org/apache/cassandra/stress/settings/OptionAnyProbabilities.java
index 9c2f367..28d8f96 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/OptionAnyProbabilities.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/OptionAnyProbabilities.java
@@ -73,6 +73,11 @@ public final class OptionAnyProbabilities extends OptionMulti
{
return !options.isEmpty();
}
+
+ boolean present()
+ {
+ return setByUser();
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/settings/OptionDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/OptionDistribution.java b/tools/stress/src/org/apache/cassandra/stress/settings/OptionDistribution.java
index 45e832a..7186efb 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/OptionDistribution.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/OptionDistribution.java
@@ -134,6 +134,11 @@ public class OptionDistribution extends Option
return spec != null;
}
+ boolean present()
+ {
+ return setByUser() || defaultSpec != null;
+ }
+
@Override
public String shortDisplay()
{
@@ -209,7 +214,7 @@ public class OptionDistribution extends Option
stdev = ((max - min) / 2d) / stdevsToEdge;
}
return new GaussianFactory(min, max, mean, stdev);
- } catch (Exception _)
+ } catch (Exception ignore)
{
throw new IllegalArgumentException("Invalid parameter list for uniform distribution: " + params);
}
@@ -233,7 +238,7 @@ public class OptionDistribution extends Option
// over entire range, but this results in overly skewed distribution, so take sqrt
final double mean = (max - min) / findBounds.inverseCumulativeProbability(1d - Math.sqrt(1d/(max-min)));
return new ExpFactory(min, max, mean);
- } catch (Exception _)
+ } catch (Exception ignore)
{
throw new IllegalArgumentException("Invalid parameter list for uniform distribution: " + params);
}
@@ -258,7 +263,7 @@ public class OptionDistribution extends Option
// over entire range, but this results in overly skewed distribution, so take sqrt
final double scale = (max - min) / findBounds.inverseCumulativeProbability(1d - Math.sqrt(1d/(max-min)));
return new ExtremeFactory(min, max, shape, scale);
- } catch (Exception _)
+ } catch (Exception ignore)
{
throw new IllegalArgumentException("Invalid parameter list for extreme (Weibull) distribution: " + params);
}
@@ -284,7 +289,7 @@ public class OptionDistribution extends Option
// over entire range, but this results in overly skewed distribution, so take sqrt
final double scale = (max - min) / findBounds.inverseCumulativeProbability(1d - Math.sqrt(1d/(max-min)));
return new QuantizedExtremeFactory(min, max, shape, scale, quantas);
- } catch (Exception _)
+ } catch (Exception ignore)
{
throw new IllegalArgumentException("Invalid parameter list for quantized extreme (Weibull) distribution: " + params);
}
@@ -305,7 +310,7 @@ public class OptionDistribution extends Option
final long min = parseLong(bounds[0]);
final long max = parseLong(bounds[1]);
return new UniformFactory(min, max);
- } catch (Exception _)
+ } catch (Exception ignore)
{
throw new IllegalArgumentException("Invalid parameter list for uniform distribution: " + params);
}
@@ -324,7 +329,7 @@ public class OptionDistribution extends Option
{
final long key = parseLong(params.get(0));
return new FixedFactory(key);
- } catch (Exception _)
+ } catch (Exception ignore)
{
throw new IllegalArgumentException("Invalid parameter list for uniform distribution: " + params);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/settings/OptionMulti.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/OptionMulti.java b/tools/stress/src/org/apache/cassandra/stress/settings/OptionMulti.java
index 6d11012..ad89a5b 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/OptionMulti.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/OptionMulti.java
@@ -182,6 +182,11 @@ abstract class OptionMulti extends Option
{
return !options.isEmpty();
}
+
+ boolean present()
+ {
+ return !options.isEmpty();
+ }
}
List<Option> optionsSetByUser()
@@ -197,7 +202,7 @@ abstract class OptionMulti extends Option
{
List<Option> r = new ArrayList<>();
for (Option option : delegate.options())
- if (!option.setByUser() && option.happy())
+ if (!option.setByUser() && option.present())
r.add(option);
return r;
}
@@ -210,4 +215,12 @@ abstract class OptionMulti extends Option
return false;
}
+ boolean present()
+ {
+ for (Option option : delegate.options())
+ if (option.present())
+ return true;
+ return false;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/settings/OptionRatioDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/OptionRatioDistribution.java b/tools/stress/src/org/apache/cassandra/stress/settings/OptionRatioDistribution.java
index 416f045..67cd1a9 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/OptionRatioDistribution.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/OptionRatioDistribution.java
@@ -124,6 +124,11 @@ public class OptionRatioDistribution extends Option
return delegate.setByUser();
}
+ boolean present()
+ {
+ return delegate.present();
+ }
+
@Override
public String shortDisplay()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/settings/OptionReplication.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/OptionReplication.java b/tools/stress/src/org/apache/cassandra/stress/settings/OptionReplication.java
index 8b65587..8d1a6fc 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/OptionReplication.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/OptionReplication.java
@@ -81,7 +81,7 @@ class OptionReplication extends OptionMulti
throw new IllegalArgumentException(clazz + " is not a replication strategy");
strategy = fullname;
break;
- } catch (Exception _)
+ } catch (Exception ignore)
{
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommand.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommand.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommand.java
index e8b45ec..60b4c09 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommand.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommand.java
@@ -27,18 +27,27 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import com.google.common.util.concurrent.Uninterruptibles;
+
import org.apache.cassandra.stress.operations.OpDistributionFactory;
+import org.apache.cassandra.stress.util.JavaDriverClient;
import org.apache.cassandra.thrift.ConsistencyLevel;
// Generic command settings - common to read/write/etc
public abstract class SettingsCommand implements Serializable
{
+ public static enum TruncateWhen
+ {
+ NEVER, ONCE, ALWAYS
+ }
+
public final Command type;
public final long count;
public final long duration;
public final TimeUnit durationUnits;
public final boolean noWarmup;
+ public final TruncateWhen truncate;
public final ConsistencyLevel consistencyLevel;
public final double targetUncertainty;
public final int minimumUncertaintyMeasurements;
@@ -60,6 +69,8 @@ public abstract class SettingsCommand implements Serializable
this.type = type;
this.consistencyLevel = ConsistencyLevel.valueOf(options.consistencyLevel.value().toUpperCase());
this.noWarmup = options.noWarmup.setByUser();
+ this.truncate = TruncateWhen.valueOf(options.truncate.value().toUpperCase());
+
if (count != null)
{
this.count = OptionDistribution.parseLong(count.count.value());
@@ -107,7 +118,8 @@ public abstract class SettingsCommand implements Serializable
static abstract class Options extends GroupedOptions
{
final OptionSimple noWarmup = new OptionSimple("no-warmup", "", null, "Do not warmup the process", false);
- final OptionSimple consistencyLevel = new OptionSimple("cl=", "ONE|QUORUM|LOCAL_QUORUM|EACH_QUORUM|ALL|ANY|TWO|THREE|SERIAL|LOCAL_SERIAL|LOCAL_ONE", "ONE", "Consistency level to use", false);
+ final OptionSimple truncate = new OptionSimple("truncate=", "never|once|always", "never", "Truncate the table: never, before performing any work, or before each iteration", false);
+ final OptionSimple consistencyLevel = new OptionSimple("cl=", "ONE|QUORUM|LOCAL_QUORUM|EACH_QUORUM|ALL|ANY", "ONE", "Consistency level to use", false);
}
static class Count extends Options
@@ -116,7 +128,7 @@ public abstract class SettingsCommand implements Serializable
@Override
public List<? extends Option> options()
{
- return Arrays.asList(count, noWarmup, consistencyLevel);
+ return Arrays.asList(count, noWarmup, truncate, consistencyLevel);
}
}
@@ -126,7 +138,7 @@ public abstract class SettingsCommand implements Serializable
@Override
public List<? extends Option> options()
{
- return Arrays.asList(duration, noWarmup, consistencyLevel);
+ return Arrays.asList(duration, noWarmup, truncate, consistencyLevel);
}
}
@@ -138,10 +150,26 @@ public abstract class SettingsCommand implements Serializable
@Override
public List<? extends Option> options()
{
- return Arrays.asList(uncertainty, minMeasurements, maxMeasurements, noWarmup, consistencyLevel);
+ return Arrays.asList(uncertainty, minMeasurements, maxMeasurements, noWarmup, truncate, consistencyLevel);
}
}
+ public abstract void truncateTables(StressSettings settings);
+
+ protected void truncateTables(StressSettings settings, String ks, String ... tables)
+ {
+ JavaDriverClient client = settings.getJavaDriverClient(false);
+ assert settings.command.truncate != SettingsCommand.TruncateWhen.NEVER;
+ for (String table : tables)
+ {
+ String cql = String.format("TRUNCATE %s.%s", ks, table);
+ client.execute(cql, org.apache.cassandra.db.ConsistencyLevel.ONE);
+ }
+ System.out.println(String.format("Truncated %s.%s. Sleeping %ss for propagation.",
+ ks, Arrays.toString(tables), settings.node.nodes.size()));
+ Uninterruptibles.sleepUninterruptibly(settings.node.nodes.size(), TimeUnit.SECONDS);
+ }
+
// CLI Utility Methods
static SettingsCommand get(Map<String, String[]> clArgs)
@@ -171,18 +199,6 @@ public abstract class SettingsCommand implements Serializable
return null;
}
-/* static SettingsCommand build(Command type, String[] params)
- {
- GroupedOptions options = GroupedOptions.select(params, new Count(), new Duration(), new Uncertainty());
- if (options == null)
- {
- printHelp(type);
- System.out.println("Invalid " + type + " options provided, see output for valid options");
- System.exit(1);
- }
- return new SettingsCommand(type, options);
- }*/
-
static void printHelp(Command type)
{
printHelp(type.toString().toLowerCase());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java
index ee1958b..83f444c 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandPreDefined.java
@@ -37,7 +37,7 @@ import org.apache.cassandra.stress.operations.FixedOpDistribution;
import org.apache.cassandra.stress.operations.OpDistribution;
import org.apache.cassandra.stress.operations.OpDistributionFactory;
import org.apache.cassandra.stress.operations.predefined.PredefinedOperation;
-import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.stress.util.Timing;
// Settings unique to the mixed command type
public class SettingsCommandPreDefined extends SettingsCommand
@@ -51,9 +51,10 @@ public class SettingsCommandPreDefined extends SettingsCommand
final SeedManager seeds = new SeedManager(settings);
return new OpDistributionFactory()
{
- public OpDistribution get(Timer timer)
+ public OpDistribution get(Timing timing, int sampleCount)
{
- return new FixedOpDistribution(PredefinedOperation.operation(type, timer, newGenerator(settings), seeds, settings, add));
+ return new FixedOpDistribution(PredefinedOperation.operation(type, timing.newTimer(type.toString(), sampleCount),
+ newGenerator(settings), seeds, settings, add));
}
public String desc()
@@ -108,6 +109,11 @@ public class SettingsCommandPreDefined extends SettingsCommand
}
+ public void truncateTables(StressSettings settings)
+ {
+ truncateTables(settings, settings.schema.keyspace, "standard1", "counter1", "counter3");
+ }
+
// CLI utility methods
public static SettingsCommandPreDefined build(Command type, String[] params)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
index d5b221c..d4e43cf 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsCommandUser.java
@@ -89,6 +89,11 @@ public class SettingsCommandUser extends SettingsCommand
};
}
+ public void truncateTables(StressSettings settings)
+ {
+ profile.truncateTable(settings);
+ }
+
static final class Options extends GroupedOptions
{
final SettingsCommand.Options parent;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/util/Timer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/Timer.java b/tools/stress/src/org/apache/cassandra/stress/util/Timer.java
index ff625a8..88e8020 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/Timer.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/Timer.java
@@ -39,6 +39,7 @@ public final class Timer
private int opCount;
// aggregate info
+ private long errorCount;
private long partitionCount;
private long rowCount;
private long total;
@@ -78,7 +79,7 @@ public final class Timer
return finalReport == null;
}
- public void stop(long partitionCount, long rowCount)
+ public void stop(long partitionCount, long rowCount, boolean error)
{
maybeReport();
long now = System.nanoTime();
@@ -94,6 +95,8 @@ public final class Timer
opCount += 1;
this.partitionCount += partitionCount;
this.rowCount += rowCount;
+ if (error)
+ this.errorCount++;
upToDateAsOf = now;
}
@@ -108,14 +111,15 @@ public final class Timer
( new SampleOfLongs(Arrays.copyOf(sample, index(opCount)), p(opCount)),
new SampleOfLongs(Arrays.copyOfRange(sample, index(opCount), Math.min(opCount, sample.length)), p(opCount) - 1)
);
- final TimingInterval report = new TimingInterval(lastSnap, upToDateAsOf, max, maxStart, max, partitionCount, rowCount, total, opCount,
- SampleOfLongs.merge(rnd, sampleLatencies, Integer.MAX_VALUE));
+ final TimingInterval report = new TimingInterval(lastSnap, upToDateAsOf, max, maxStart, max, partitionCount,
+ rowCount, total, opCount, errorCount, SampleOfLongs.merge(rnd, sampleLatencies, Integer.MAX_VALUE));
// reset counters
opCount = 0;
partitionCount = 0;
rowCount = 0;
total = 0;
max = 0;
+ errorCount = 0;
lastSnap = upToDateAsOf;
return report;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/util/Timing.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/Timing.java b/tools/stress/src/org/apache/cassandra/stress/util/Timing.java
index 9464b19..403bee0 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/Timing.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/Timing.java
@@ -21,9 +21,7 @@ package org.apache.cassandra.stress.util;
*/
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
+import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
@@ -36,9 +34,10 @@ import java.util.concurrent.TimeUnit;
// metrics calculated over the interval are accurate
public class Timing
{
-
- private final CopyOnWriteArrayList<Timer> timers = new CopyOnWriteArrayList<>();
- private volatile TimingInterval history;
+ // concurrency: this should be ok as the consumers are created serially by StressAction.run / warmup
+ // Probably the CopyOnWriteArrayList could be changed to an ordinary list as well.
+ private final Map<String, List<Timer>> timers = new TreeMap<>();
+ private volatile TimingIntervals history;
private final int historySampleCount;
private final int reportSampleCount;
private boolean done;
@@ -54,22 +53,31 @@ public class Timing
public static class TimingResult<E>
{
public final E extra;
- public final TimingInterval timing;
- public TimingResult(E extra, TimingInterval timing)
+ public final TimingIntervals intervals;
+ public TimingResult(E extra, TimingIntervals intervals)
{
this.extra = extra;
- this.timing = timing;
+ this.intervals = intervals;
}
}
public <E> TimingResult<E> snap(Callable<E> call) throws InterruptedException
{
- final Timer[] timers = this.timers.toArray(new Timer[0]);
- final CountDownLatch ready = new CountDownLatch(timers.length);
- for (int i = 0 ; i < timers.length ; i++)
+ // Count up total # of timers
+ int timerCount = 0;
+ for (List<Timer> timersForOperation : timers.values())
{
- final Timer timer = timers[i];
- timer.requestReport(ready);
+ timerCount += timersForOperation.size();
+ }
+ final CountDownLatch ready = new CountDownLatch(timerCount);
+
+ // request reports
+ for (List <Timer> timersForOperation : timers.values())
+ {
+ for(Timer timer : timersForOperation)
+ {
+ timer.requestReport(ready);
+ }
}
E extra;
@@ -86,34 +94,48 @@ public class Timing
// TODO fail gracefully after timeout if a thread is stuck
if (!ready.await(5L, TimeUnit.MINUTES))
+ {
throw new RuntimeException("Timed out waiting for a timer thread - seems one got stuck. Check GC/Heap size");
+ }
boolean done = true;
+
// reports have been filled in by timer threadCount, so merge
- List<TimingInterval> intervals = new ArrayList<>();
- for (Timer timer : timers)
+ Map<String, TimingInterval> intervals = new TreeMap<>();
+ for (Map.Entry<String, List<Timer>> entry : timers.entrySet())
{
- intervals.add(timer.report);
- done &= !timer.running();
+ List<TimingInterval> operationIntervals = new ArrayList<>();
+ for (Timer timer : entry.getValue())
+ {
+ operationIntervals.add(timer.report);
+ done &= !timer.running();
+ }
+
+ intervals.put(entry.getKey(), TimingInterval.merge(operationIntervals, reportSampleCount,
+ history.get(entry.getKey()).endNanos()));
}
+ TimingIntervals result = new TimingIntervals(intervals);
this.done = done;
- TimingResult<E> result = new TimingResult<>(extra, TimingInterval.merge(intervals, reportSampleCount, history.endNanos()));
- history = TimingInterval.merge(Arrays.asList(result.timing, history), historySampleCount, history.startNanos());
- return result;
+ history = history.merge(result, historySampleCount, history.startNanos());
+ return new TimingResult<>(extra, result);
}
- // build a new timer and add it to the set of running timers
- public Timer newTimer(int sampleCount)
+ // build a new timer and add it to the set of running timers.
+ public Timer newTimer(String opType, int sampleCount)
{
final Timer timer = new Timer(sampleCount);
- timers.add(timer);
+
+ if (!timers.containsKey(opType))
+ timers.put(opType, new ArrayList<Timer>());
+
+ timers.get(opType).add(timer);
return timer;
}
public void start()
{
- history = new TimingInterval(System.nanoTime());
+ history = new TimingIntervals(timers.keySet());
}
public boolean done()
@@ -121,9 +143,8 @@ public class Timing
return done;
}
- public TimingInterval getHistory()
+ public TimingIntervals getHistory()
{
return history;
}
-
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6bbfb557/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java b/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java
index 065ea52..6be71c8 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/TimingInterval.java
@@ -20,10 +20,7 @@ package org.apache.cassandra.stress.util;
*
*/
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
+import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
// represents measurements taken over an interval of time
@@ -42,18 +39,28 @@ public final class TimingInterval
public final long partitionCount;
public final long rowCount;
public final long operationCount;
+ public final long errorCount;
final SampleOfLongs sample;
+ public String toString()
+ {
+ return String.format("Start: %d end: %d maxLatency: %d pauseLength: %d pauseStart: %d totalLatency: %d" +
+ " pCount: %d rcount: %d opCount: %d errors: %d", start, end, maxLatency, pauseLength,
+ pauseStart, totalLatency, partitionCount, rowCount, operationCount, errorCount);
+ }
+
TimingInterval(long time)
{
start = end = time;
maxLatency = totalLatency = 0;
- partitionCount = rowCount = operationCount = 0;
+ partitionCount = rowCount = operationCount = errorCount = 0;
pauseStart = pauseLength = 0;
sample = new SampleOfLongs(new long[0], 1d);
}
- TimingInterval(long start, long end, long maxLatency, long pauseStart, long pauseLength, long partitionCount, long rowCount, long totalLatency, long operationCount, SampleOfLongs sample)
+
+ TimingInterval(long start, long end, long maxLatency, long pauseStart, long pauseLength, long partitionCount,
+ long rowCount, long totalLatency, long operationCount, long errorCount, SampleOfLongs sample)
{
this.start = start;
this.end = Math.max(end, start);
@@ -61,6 +68,7 @@ public final class TimingInterval
this.partitionCount = partitionCount;
this.rowCount = rowCount;
this.totalLatency = totalLatency;
+ this.errorCount = errorCount;
this.operationCount = operationCount;
this.pauseStart = pauseStart;
this.pauseLength = pauseLength;
@@ -68,33 +76,41 @@ public final class TimingInterval
}
// merge multiple timer intervals together
- static TimingInterval merge(List<TimingInterval> intervals, int maxSamples, long start)
+ static TimingInterval merge(Iterable<TimingInterval> intervals, int maxSamples, long start)
{
ThreadLocalRandom rnd = ThreadLocalRandom.current();
- long operationCount = 0, partitionCount = 0, rowCount = 0;
+ long operationCount = 0, partitionCount = 0, rowCount = 0, errorCount = 0;
long maxLatency = 0, totalLatency = 0;
List<SampleOfLongs> latencies = new ArrayList<>();
long end = 0;
long pauseStart = 0, pauseEnd = Long.MAX_VALUE;
for (TimingInterval interval : intervals)
{
- end = Math.max(end, interval.end);
- operationCount += interval.operationCount;
- maxLatency = Math.max(interval.maxLatency, maxLatency);
- totalLatency += interval.totalLatency;
- partitionCount += interval.partitionCount;
- rowCount += interval.rowCount;
- latencies.addAll(Arrays.asList(interval.sample));
- if (interval.pauseLength > 0)
+ if (interval != null)
{
- pauseStart = Math.max(pauseStart, interval.pauseStart);
- pauseEnd = Math.min(pauseEnd, interval.pauseStart + interval.pauseLength);
+ end = Math.max(end, interval.end);
+ operationCount += interval.operationCount;
+ maxLatency = Math.max(interval.maxLatency, maxLatency);
+ totalLatency += interval.totalLatency;
+ partitionCount += interval.partitionCount;
+ rowCount += interval.rowCount;
+ errorCount += interval.errorCount;
+ latencies.addAll(Arrays.asList(interval.sample));
+ if (interval.pauseLength > 0)
+ {
+ pauseStart = Math.max(pauseStart, interval.pauseStart);
+ pauseEnd = Math.min(pauseEnd, interval.pauseStart + interval.pauseLength);
+ }
}
}
- if (pauseEnd < pauseStart)
+
+ if (pauseEnd < pauseStart || pauseStart <= 0)
+ {
pauseEnd = pauseStart = 0;
- return new TimingInterval(start, end, maxLatency, pauseStart, pauseEnd - pauseStart, partitionCount, rowCount, totalLatency, operationCount,
- SampleOfLongs.merge(rnd, latencies, maxSamples));
+ }
+
+ return new TimingInterval(start, end, maxLatency, pauseStart, pauseEnd - pauseStart, partitionCount, rowCount,
+ totalLatency, operationCount, errorCount, SampleOfLongs.merge(rnd, latencies, maxSamples));
}
@@ -128,11 +144,6 @@ public final class TimingInterval
return maxLatency * 0.000001d;
}
- public long runTime()
- {
- return (end - start) / 1000000;
- }
-
public double medianLatency()
{
return sample.medianLatency();
@@ -144,19 +155,48 @@ public final class TimingInterval
return sample.rankLatency(rank);
}
- public final long endNanos()
+ public long runTime()
{
- return end;
+ return (end - start) / 1000000;
}
- public final long endMillis()
+ public final long endNanos()
{
- return end / 1000000;
+ return end;
}
public long startNanos()
{
return start;
}
-}
+
+ public static enum TimingParameter
+ {
+ OPRATE, ROWRATE, ADJROWRATE, PARTITIONRATE, MEANLATENCY, MAXLATENCY, MEDIANLATENCY, RANKLATENCY,
+ ERRORCOUNT, PARTITIONCOUNT
+ }
+
+ String getStringValue(TimingParameter value)
+ {
+ return getStringValue(value, Float.NaN);
+ }
+
+ String getStringValue(TimingParameter value, float rank)
+ {
+ switch (value)
+ {
+ case OPRATE: return String.format("%.0f", opRate());
+ case ROWRATE: return String.format("%.0f", rowRate());
+ case ADJROWRATE: return String.format("%.0f", adjustedRowRate());
+ case PARTITIONRATE: return String.format("%.0f", partitionRate());
+ case MEANLATENCY: return String.format("%.1f", meanLatency());
+ case MAXLATENCY: return String.format("%.1f", maxLatency());
+ case MEDIANLATENCY: return String.format("%.1f", medianLatency());
+ case RANKLATENCY: return String.format("%.1f", rankLatency(rank));
+ case ERRORCOUNT: return String.format("%d", errorCount);
+ case PARTITIONCOUNT: return String.format("%d", partitionCount);
+ default: throw new IllegalStateException();
+ }
+ }
+ }
[2/2] cassandra git commit: Merge branch 'cassandra-2.1' into trunk
Posted by be...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Conflicts:
tools/stress/src/org/apache/cassandra/stress/settings/OptionDistribution.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a0586f69
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a0586f69
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a0586f69
Branch: refs/heads/trunk
Commit: a0586f69291af79079db0f5cc495806a28fca7a6
Parents: bf9c503 6bbfb55
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Tue Mar 10 14:26:30 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Tue Mar 10 14:26:30 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/stress/Operation.java | 2 +-
.../apache/cassandra/stress/StressAction.java | 46 +++++----
.../apache/cassandra/stress/StressMetrics.java | 83 ++++++++++-----
.../apache/cassandra/stress/StressProfile.java | 15 ++-
.../stress/operations/FixedOpDistribution.java | 10 ++
.../stress/operations/OpDistribution.java | 2 +
.../operations/OpDistributionFactory.java | 6 +-
.../operations/SampledOpDistribution.java | 17 ++++
.../SampledOpDistributionFactory.java | 10 +-
.../cassandra/stress/settings/Command.java | 27 +++--
.../cassandra/stress/settings/Option.java | 1 +
.../stress/settings/OptionAnyProbabilities.java | 5 +
.../stress/settings/OptionDistribution.java | 17 ++--
.../cassandra/stress/settings/OptionMulti.java | 15 ++-
.../settings/OptionRatioDistribution.java | 5 +
.../stress/settings/SettingsCommand.java | 48 ++++++---
.../settings/SettingsCommandPreDefined.java | 12 ++-
.../stress/settings/SettingsCommandUser.java | 5 +
.../org/apache/cassandra/stress/util/Timer.java | 10 +-
.../apache/cassandra/stress/util/Timing.java | 75 +++++++++-----
.../cassandra/stress/util/TimingInterval.java | 102 +++++++++++++------
22 files changed, 354 insertions(+), 160 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0586f69/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index a6adfe0,af5206b..b2b33b0
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,68 -1,5 +1,69 @@@
+3.0
+ * Avoid memory allocation when searching index summary (CASSANDRA-8793)
+ * Optimise (Time)?UUIDType Comparisons (CASSANDRA-8730)
+ * Make CRC32Ex into a separate maven dependency (CASSANDRA-8836)
+ * Use preloaded jemalloc w/ Unsafe (CASSANDRA-8714)
+ * Add role based access control (CASSANDRA-7653, 8650, 7216, 8760, 8849, 8761)
+ * Avoid accessing partitioner through StorageProxy (CASSANDRA-8244, 8268)
+ * Upgrade Metrics library and remove depricated metrics (CASSANDRA-5657)
+ * Serializing Row cache alternative, fully off heap (CASSANDRA-7438)
+ * Duplicate rows returned when in clause has repeated values (CASSANDRA-6707)
+ * Make CassandraException unchecked, extend RuntimeException (CASSANDRA-8560)
+ * Support direct buffer decompression for reads (CASSANDRA-8464)
+ * DirectByteBuffer compatible LZ4 methods (CASSANDRA-7039)
+ * Group sstables for anticompaction correctly (CASSANDRA-8578)
+ * Add ReadFailureException to native protocol, respond
+ immediately when replicas encounter errors while handling
+ a read request (CASSANDRA-7886)
+ * Switch CommitLogSegment from RandomAccessFile to nio (CASSANDRA-8308)
+ * Allow mixing token and partition key restrictions (CASSANDRA-7016)
+ * Support index key/value entries on map collections (CASSANDRA-8473)
+ * Modernize schema tables (CASSANDRA-8261)
+ * Support for user-defined aggregation functions (CASSANDRA-8053)
+ * Fix NPE in SelectStatement with empty IN values (CASSANDRA-8419)
+ * Refactor SelectStatement, return IN results in natural order instead
+ of IN value list order and ignore duplicate values in partition key IN restrictions (CASSANDRA-7981)
+ * Support UDTs, tuples, and collections in user-defined
+ functions (CASSANDRA-7563)
+ * Fix aggregate fn results on empty selection, result column name,
+ and cqlsh parsing (CASSANDRA-8229)
+ * Mark sstables as repaired after full repair (CASSANDRA-7586)
+ * Extend Descriptor to include a format value and refactor reader/writer
+ APIs (CASSANDRA-7443)
+ * Integrate JMH for microbenchmarks (CASSANDRA-8151)
+ * Keep sstable levels when bootstrapping (CASSANDRA-7460)
+ * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838)
+ * Support for aggregation functions (CASSANDRA-4914)
+ * Remove cassandra-cli (CASSANDRA-7920)
+ * Accept dollar quoted strings in CQL (CASSANDRA-7769)
+ * Make assassinate a first class command (CASSANDRA-7935)
+ * Support IN clause on any partition key column (CASSANDRA-7855)
+ * Support IN clause on any clustering column (CASSANDRA-4762)
+ * Improve compaction logging (CASSANDRA-7818)
+ * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
+ * Do anticompaction in groups (CASSANDRA-6851)
+ * Support user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929,
+ 7924, 7812, 8063, 7813, 7708)
+ * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
+ * Move sstable RandomAccessReader to nio2, which allows using the
+ FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
+ * Remove CQL2 (CASSANDRA-5918)
+ * Add Thrift get_multi_slice call (CASSANDRA-6757)
+ * Optimize fetching multiple cells by name (CASSANDRA-6933)
+ * Allow compilation in java 8 (CASSANDRA-7028)
+ * Make incremental repair default (CASSANDRA-7250)
+ * Enable code coverage thru JaCoCo (CASSANDRA-7226)
+ * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369)
+ * Shorten SSTable path (CASSANDRA-6962)
+ * Use unsafe mutations for most unit tests (CASSANDRA-6969)
+ * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
+ * Fail on very large batch sizes (CASSANDRA-8011)
+ * Improve concurrency of repair (CASSANDRA-6455, 8208)
+ * Select optimal CRC32 implementation at runtime (CASSANDRA-8614)
+ * Evaluate MurmurHash of Token once per query (CASSANDRA-7096)
+
2.1.4
+ * cassandra-stress reports per-operation statistics, plus misc (CASSANDRA-8769)
* Add SimpleDate (cql date) and Time (cql time) types (CASSANDRA-7523)
* Use long for key count in cfstats (CASSANDRA-8913)
* Make SSTableRewriter.abort() more robust to failure (CASSANDRA-8832)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0586f69/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
----------------------------------------------------------------------
diff --cc tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
index 12bdc3e,46ca488..a640058
--- a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
@@@ -28,13 -29,11 +29,10 @@@ import java.util.concurrent.Callable
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
+ import org.apache.cassandra.stress.util.*;
import org.apache.commons.lang3.time.DurationFormatUtils;
-
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.stress.settings.StressSettings;
- import org.apache.cassandra.stress.util.JmxCollector;
- import org.apache.cassandra.stress.util.Timing;
- import org.apache.cassandra.stress.util.TimingInterval;
- import org.apache.cassandra.stress.util.Uncertainty;
public class StressMetrics
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0586f69/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0586f69/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0586f69/tools/stress/src/org/apache/cassandra/stress/settings/OptionAnyProbabilities.java
----------------------------------------------------------------------