You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2016/09/07 14:15:57 UTC
[2/2] cassandra git commit: Fix Cassandra Stress reporting thread
model and precision
Fix Cassandra Stress reporting thread model and precision
Patch by Nitsan Wakart; reviewed by tjake for CASSANDRA-12585
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e73633cd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e73633cd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e73633cd
Branch: refs/heads/trunk
Commit: e73633cd8129b18e706e7ee8bdb05d802111c7e3
Parents: 1d74664
Author: nitsanw <ni...@yahoo.com>
Authored: Thu Sep 1 01:27:55 2016 +0200
Committer: T Jake Luciani <ja...@apache.org>
Committed: Wed Sep 7 10:14:20 2016 -0400
----------------------------------------------------------------------
CHANGES.txt | 1 +
NOTICE.txt | 3 +
build.xml | 2 +
lib/jctools-core-1.2.1.jar | Bin 0 -> 106714 bytes
lib/licenses/jctools-core-1.2.1.txt | 202 ++++++++
.../org/apache/cassandra/stress/Operation.java | 9 +-
.../apache/cassandra/stress/StressAction.java | 67 ++-
.../apache/cassandra/stress/StressGraph.java | 2 +-
.../apache/cassandra/stress/StressMetrics.java | 345 --------------
.../apache/cassandra/stress/StressProfile.java | 2 +-
.../stress/operations/FixedOpDistribution.java | 5 -
.../stress/operations/OpDistribution.java | 3 -
.../operations/OpDistributionFactory.java | 4 +-
.../stress/operations/PartitionOperation.java | 2 +-
.../operations/SampledOpDistribution.java | 8 -
.../SampledOpDistributionFactory.java | 24 +-
.../operations/predefined/CqlCounterAdder.java | 2 +-
.../operations/predefined/CqlCounterGetter.java | 2 +-
.../operations/predefined/CqlInserter.java | 2 +-
.../operations/predefined/CqlOperation.java | 2 +-
.../stress/operations/predefined/CqlReader.java | 2 +-
.../predefined/PredefinedOperation.java | 2 +-
.../predefined/ThriftCounterAdder.java | 2 +-
.../predefined/ThriftCounterGetter.java | 2 +-
.../operations/predefined/ThriftInserter.java | 2 +-
.../operations/predefined/ThriftReader.java | 2 +-
.../operations/userdefined/SchemaInsert.java | 2 +-
.../operations/userdefined/SchemaQuery.java | 2 +-
.../operations/userdefined/SchemaStatement.java | 2 +-
.../operations/userdefined/TokenRangeQuery.java | 2 +-
.../userdefined/ValidatingSchemaQuery.java | 2 +-
.../cassandra/stress/report/StressMetrics.java | 457 +++++++++++++++++++
.../apache/cassandra/stress/report/Timer.java | 63 +++
.../cassandra/stress/report/TimingInterval.java | 234 ++++++++++
.../stress/report/TimingIntervals.java | 128 ++++++
.../settings/SettingsCommandPreDefined.java | 9 +-
.../SettingsCommandPreDefinedMixed.java | 2 +-
.../stress/settings/SettingsCommandUser.java | 2 +-
.../org/apache/cassandra/stress/util/Timer.java | 167 -------
.../apache/cassandra/stress/util/Timing.java | 147 ------
.../cassandra/stress/util/TimingInterval.java | 234 ----------
.../cassandra/stress/util/TimingIntervals.java | 152 ------
42 files changed, 1186 insertions(+), 1118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2e8fa4e..54ffcf1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.10
+ * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
* Add JMH benchmarks.jar (CASSANDRA-12586)
* Add row offset support to SASI (CASSANDRA-11990)
* Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/NOTICE.txt
----------------------------------------------------------------------
diff --git a/NOTICE.txt b/NOTICE.txt
index 1c552fc..dcdf03d 100644
--- a/NOTICE.txt
+++ b/NOTICE.txt
@@ -86,3 +86,6 @@ Copyright (c) 2000-2011 INRIA, France Telecom
HdrHistogram
http://hdrhistogram.org
+
+JCTools
+http://jctools.github.io/JCTools/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 858030a..6954e04 100644
--- a/build.xml
+++ b/build.xml
@@ -452,6 +452,7 @@
<dependency groupId="com.github.rholder" artifactId="snowball-stemmer" version="1.3.0.581.1" />
<dependency groupId="com.googlecode.concurrent-trees" artifactId="concurrent-trees" version="2.4.0" />
<dependency groupId="com.github.ben-manes.caffeine" artifactId="caffeine" version="2.2.6" />
+ <dependency groupId="org.jctools" artifactId="jctools-core" version="1.2.1"/>
</dependencyManagement>
<developer id="alakshman" name="Avinash Lakshman"/>
<developer id="aleksey" name="Aleksey Yeschenko"/>
@@ -618,6 +619,7 @@
<dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/>
<dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/>
<dependency groupId="com.github.ben-manes.caffeine" artifactId="caffeine" />
+ <dependency groupId="org.jctools" artifactId="jctools-core"/>
</artifact:pom>
<artifact:pom id="thrift-pom"
artifactId="cassandra-thrift"
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/lib/jctools-core-1.2.1.jar
----------------------------------------------------------------------
diff --git a/lib/jctools-core-1.2.1.jar b/lib/jctools-core-1.2.1.jar
new file mode 100644
index 0000000..fa1137b
Binary files /dev/null and b/lib/jctools-core-1.2.1.jar differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/lib/licenses/jctools-core-1.2.1.txt
----------------------------------------------------------------------
diff --git a/lib/licenses/jctools-core-1.2.1.txt b/lib/licenses/jctools-core-1.2.1.txt
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/lib/licenses/jctools-core-1.2.1.txt
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/Operation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Operation.java b/tools/stress/src/org/apache/cassandra/stress/Operation.java
index 92a54f9..dc5bd2f 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Operation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Operation.java
@@ -20,11 +20,11 @@ package org.apache.cassandra.stress;
import java.io.IOException;
+import org.apache.cassandra.stress.report.Timer;
import org.apache.cassandra.stress.settings.SettingsLog;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.JavaDriverClient;
import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.stress.util.Timer;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.transport.SimpleClient;
@@ -106,7 +106,7 @@ public abstract class Operation
exceptionMessage = getExceptionMessage(e);
}
}
-
+
timer.stop(run.partitionCount(), run.rowCount(), !success);
if (!success)
@@ -138,11 +138,6 @@ public abstract class Operation
System.err.println(message);
}
- public void close()
- {
- timer.close();
- }
-
public void intendedStartNs(long intendedTime)
{
timer.intendedTimeNs(intendedTime);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/StressAction.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
index 7c37ef8..f3d9cbd 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
@@ -22,6 +22,7 @@ import java.io.OutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
+import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -29,12 +30,15 @@ import java.util.concurrent.locks.LockSupport;
import org.apache.cassandra.stress.operations.OpDistribution;
import org.apache.cassandra.stress.operations.OpDistributionFactory;
+import org.apache.cassandra.stress.report.StressMetrics;
import org.apache.cassandra.stress.settings.ConnectionAPI;
import org.apache.cassandra.stress.settings.SettingsCommand;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.JavaDriverClient;
import org.apache.cassandra.stress.util.ThriftClient;
import org.apache.cassandra.transport.SimpleClient;
+import org.jctools.queues.SpscArrayQueue;
+import org.jctools.queues.SpscUnboundedArrayQueue;
import com.google.common.util.concurrent.Uninterruptibles;
@@ -183,8 +187,8 @@ public class StressAction implements Runnable
double improvement = 0;
for (int i = results.size() - count ; i < results.size() ; i++)
{
- double prev = results.get(i - 1).getTiming().getHistory().opRate();
- double cur = results.get(i).getTiming().getHistory().opRate();
+ double prev = results.get(i - 1).opRate();
+ double cur = results.get(i).opRate();
improvement += (cur - prev) / prev;
}
return improvement / count;
@@ -219,7 +223,7 @@ public class StressAction implements Runnable
final Consumer[] consumers = new Consumer[threadCount];
for (int i = 0; i < threadCount; i++)
{
- consumers[i] = new Consumer(operations.get(metrics.getTiming(), isWarmup),
+ consumers[i] = new Consumer(operations, isWarmup,
done, start, releaseConsumers, workManager, metrics, rateLimiter);
}
@@ -238,7 +242,9 @@ public class StressAction implements Runnable
}
// start counting from NOW!
if(rateLimiter != null)
+ {
rateLimiter.start();
+ }
// release the hounds!!!
releaseConsumers.countDown();
@@ -353,18 +359,28 @@ public class StressAction implements Runnable
return op;
}
- void close()
- {
- operations.closeTimers();
- }
-
void abort()
{
workManager.stop();
}
}
-
- private class Consumer extends Thread
+ public static class OpMeasurement
+ {
+ public String opType;
+ public long intended,started,ended,rowCnt,partitionCnt;
+ public boolean err;
+ @Override
+ public String toString()
+ {
+ return "OpMeasurement [opType=" + opType + ", intended=" + intended + ", started=" + started + ", ended="
+ + ended + ", rowCnt=" + rowCnt + ", partitionCnt=" + partitionCnt + ", err=" + err + "]";
+ }
+ }
+ public interface MeasurementSink
+ {
+ void record(String opType,long intended, long started, long ended, long rowCnt, long partitionCnt, boolean err);
+ }
+ public class Consumer extends Thread implements MeasurementSink
{
private final StreamOfOperations opStream;
private final StressMetrics metrics;
@@ -372,8 +388,10 @@ public class StressAction implements Runnable
private final CountDownLatch done;
private final CountDownLatch start;
private final CountDownLatch releaseConsumers;
-
- public Consumer(OpDistribution operations,
+ public final Queue<OpMeasurement> measurementsRecycling;
+ public final Queue<OpMeasurement> measurementsReporting;
+ public Consumer(OpDistributionFactory operations,
+ boolean isWarmup,
CountDownLatch done,
CountDownLatch start,
CountDownLatch releaseConsumers,
@@ -381,13 +399,18 @@ public class StressAction implements Runnable
StressMetrics metrics,
UniformRateLimiter rateLimiter)
{
+ OpDistribution opDistribution = operations.get(isWarmup, this);
this.done = done;
this.start = start;
this.releaseConsumers = releaseConsumers;
this.metrics = metrics;
- this.opStream = new StreamOfOperations(operations, rateLimiter, workManager);
+ this.opStream = new StreamOfOperations(opDistribution, rateLimiter, workManager);
+ this.measurementsRecycling = new SpscArrayQueue<OpMeasurement>(8*1024);
+ this.measurementsReporting = new SpscUnboundedArrayQueue<OpMeasurement>(2048);
+ metrics.add(this);
}
+
public void run()
{
try
@@ -464,8 +487,24 @@ public class StressAction implements Runnable
finally
{
done.countDown();
- opStream.close();
}
}
+
+ @Override
+ public void record(String opType, long intended, long started, long ended, long rowCnt, long partitionCnt, boolean err)
+ {
+ OpMeasurement opMeasurement = measurementsRecycling.poll();
+ if(opMeasurement == null) {
+ opMeasurement = new OpMeasurement();
+ }
+ opMeasurement.opType = opType;
+ opMeasurement.intended = intended;
+ opMeasurement.started = started;
+ opMeasurement.ended = ended;
+ opMeasurement.rowCnt = rowCnt;
+ opMeasurement.partitionCnt = partitionCnt;
+ opMeasurement.err = err;
+ measurementsReporting.offer(opMeasurement);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/StressGraph.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressGraph.java b/tools/stress/src/org/apache/cassandra/stress/StressGraph.java
index 3b383fa..196964c 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressGraph.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressGraph.java
@@ -35,7 +35,7 @@ import java.util.regex.Pattern;
import com.google.common.io.ByteStreams;
import org.apache.commons.lang3.StringUtils;
-
+import org.apache.cassandra.stress.report.StressMetrics;
import org.apache.cassandra.stress.settings.StressSettings;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
deleted file mode 100644
index 86e9a7a..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
+++ /dev/null
@@ -1,345 +0,0 @@
-package org.apache.cassandra.stress;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
-
-import java.io.FileNotFoundException;
-import java.io.PrintStream;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-
-import org.HdrHistogram.Histogram;
-import org.HdrHistogram.HistogramLogWriter;
-import org.apache.cassandra.stress.settings.StressSettings;
-import org.apache.cassandra.stress.util.JmxCollector;
-import org.apache.cassandra.stress.util.Timing;
-import org.apache.cassandra.stress.util.TimingInterval;
-import org.apache.cassandra.stress.util.TimingIntervals;
-import org.apache.cassandra.stress.util.Uncertainty;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.commons.lang3.time.DurationFormatUtils;
-
-public class StressMetrics
-{
-
- private final PrintStream output;
- private final Thread thread;
- private final Uncertainty rowRateUncertainty = new Uncertainty();
- private final CountDownLatch stopped = new CountDownLatch(1);
- private final Timing timing;
- private final Callable<JmxCollector.GcStats> gcStatsCollector;
- private final HistogramLogWriter histogramWriter;
- private final long epochNs = System.nanoTime();
- private final long epochMs = System.currentTimeMillis();
-
- private volatile JmxCollector.GcStats totalGcStats;
-
- private volatile boolean stop = false;
- private volatile boolean cancelled = false;
-
- public StressMetrics(PrintStream output, final long logIntervalMillis, StressSettings settings)
- {
- this.output = output;
- if(settings.log.hdrFile != null)
- {
- try
- {
- histogramWriter = new HistogramLogWriter(settings.log.hdrFile);
- histogramWriter.outputComment("Logging op latencies for Cassandra Stress");
- histogramWriter.outputLogFormatVersion();
- histogramWriter.outputBaseTime(epochMs);
- histogramWriter.setBaseTime(epochMs);
- histogramWriter.outputStartTime(epochMs);
- histogramWriter.outputLegend();
- }
- catch (FileNotFoundException e)
- {
- throw new IllegalArgumentException(e);
- }
- }
- else
- {
- histogramWriter = null;
- }
- Callable<JmxCollector.GcStats> gcStatsCollector;
- totalGcStats = new JmxCollector.GcStats(0);
- try
- {
- gcStatsCollector = new JmxCollector(settings.node.resolveAllPermitted(settings), settings.port.jmxPort);
- }
- catch (Throwable t)
- {
- switch (settings.log.level)
- {
- case VERBOSE:
- t.printStackTrace();
- }
- System.err.println("Failed to connect over JMX; not collecting these stats");
- gcStatsCollector = new Callable<JmxCollector.GcStats>()
- {
- public JmxCollector.GcStats call() throws Exception
- {
- return totalGcStats;
- }
- };
- }
- this.gcStatsCollector = gcStatsCollector;
- this.timing = new Timing(settings.rate.isFixed);
-
- printHeader("", output);
- thread = new Thread(new Runnable()
- {
- @Override
- public void run()
- {
- timing.start();
- try {
-
- while (!stop)
- {
- try
- {
- long sleepNanos = timing.getHistory().endNanos() - System.nanoTime();
- long sleep = (sleepNanos / 1000000) + logIntervalMillis;
-
- if (sleep < logIntervalMillis >>> 3)
- // if had a major hiccup, sleep full interval
- Thread.sleep(logIntervalMillis);
- else
- Thread.sleep(sleep);
-
- update();
- } catch (InterruptedException e)
- {
- break;
- }
- }
-
- update();
- }
- catch (InterruptedException e)
- {}
- catch (Exception e)
- {
- cancel();
- e.printStackTrace(StressMetrics.this.output);
- }
- finally
- {
- rowRateUncertainty.wakeAll();
- stopped.countDown();
- }
- }
- });
- thread.setName("StressMetrics");
- }
-
- public void start()
- {
- thread.start();
- }
-
- public void waitUntilConverges(double targetUncertainty, int minMeasurements, int maxMeasurements) throws InterruptedException
- {
- rowRateUncertainty.await(targetUncertainty, minMeasurements, maxMeasurements);
- }
-
- public void cancel()
- {
- cancelled = true;
- stop = true;
- thread.interrupt();
- rowRateUncertainty.wakeAll();
- }
-
- public void stop() throws InterruptedException
- {
- stop = true;
- thread.interrupt();
- stopped.await();
- }
-
- private void update() throws InterruptedException
- {
- Timing.TimingResult<JmxCollector.GcStats> result = timing.snap(gcStatsCollector);
- totalGcStats = JmxCollector.GcStats.aggregate(Arrays.asList(totalGcStats, result.extra));
- TimingInterval current = result.intervals.combine();
- TimingInterval history = timing.getHistory().combine();
- rowRateUncertainty.update(current.adjustedRowRate());
- if (current.operationCount() != 0)
- {
- // if there's a single operation we only print the total
- final boolean logPerOpSummaryLine = result.intervals.intervals().size() > 1;
-
- for (Map.Entry<String, TimingInterval> type : result.intervals.intervals().entrySet())
- {
- final String opName = type.getKey();
- final TimingInterval opInterval = type.getValue();
- if (logPerOpSummaryLine)
- {
- printRow("", opName, opInterval, timing.getHistory().get(type.getKey()), result.extra, rowRateUncertainty, output);
- }
- logHistograms(opName, opInterval);
- }
-
- printRow("", "total", current, history, result.extra, rowRateUncertainty, output);
- }
- if (timing.done())
- stop = true;
- }
-
-
- private void logHistograms(String opName, TimingInterval opInterval)
- {
- if (histogramWriter == null)
- return;
- final long startNs = opInterval.startNanos();
- final long endNs = opInterval.endNanos();
-
- logHistogram(opName + "-st", startNs, endNs, opInterval.serviceTime());
- logHistogram(opName + "-rt", startNs, endNs, opInterval.responseTime());
- logHistogram(opName + "-wt", startNs, endNs, opInterval.waitTime());
- }
-
- private void logHistogram(String opName, final long startNs, final long endNs, final Histogram histogram)
- {
- if (histogram.getTotalCount() != 0)
- {
- histogram.setTag(opName);
- histogram.setStartTimeStamp(epochMs + NANOSECONDS.toMillis(startNs - epochNs));
- histogram.setEndTimeStamp(epochMs + NANOSECONDS.toMillis(endNs - epochNs));
- histogramWriter.outputIntervalHistogram(histogram);
- }
- }
-
-
- // PRINT FORMATTING
-
- public static final String HEADFORMAT = "%-10s%10s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%7s,%9s,%7s,%7s,%8s,%8s,%8s,%8s";
- public static final String ROWFORMAT = "%-10s%10d,%8.0f,%8.0f,%8.0f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%7.1f,%9.5f,%7d,%7.0f,%8.0f,%8.0f,%8.0f,%8.0f";
- public static final String[] HEADMETRICS = new String[]{"type", "total ops","op/s","pk/s","row/s","mean","med",".95",".99",".999","max","time","stderr", "errors", "gc: #", "max ms", "sum ms", "sdv ms", "mb"};
- public static final String HEAD = String.format(HEADFORMAT, (Object[]) HEADMETRICS);
-
- private static void printHeader(String prefix, PrintStream output)
- {
- output.println(prefix + HEAD);
- }
-
- private static void printRow(String prefix, String type, TimingInterval interval, TimingInterval total, JmxCollector.GcStats gcStats, Uncertainty opRateUncertainty, PrintStream output)
- {
- output.println(prefix + String.format(ROWFORMAT,
- type + ",",
- total.operationCount(),
- interval.opRate(),
- interval.partitionRate(),
- interval.rowRate(),
- interval.meanLatencyMs(),
- interval.medianLatencyMs(),
- interval.latencyAtPercentileMs(95.0),
- interval.latencyAtPercentileMs(99.0),
- interval.latencyAtPercentileMs(99.9),
- interval.maxLatencyMs(),
- total.runTimeMs() / 1000f,
- opRateUncertainty.getUncertainty(),
- interval.errorCount,
- gcStats.count,
- gcStats.maxms,
- gcStats.summs,
- gcStats.sdvms,
- gcStats.bytes / (1 << 20)
- ));
- }
-
- public void summarise()
- {
- output.println("\n");
- output.println("Results:");
-
- TimingIntervals opHistory = timing.getHistory();
- TimingInterval history = opHistory.combine();
- output.println(String.format("Op rate : %,8.0f op/s %s", history.opRate(), opHistory.opRates()));
- output.println(String.format("Partition rate : %,8.0f pk/s %s", history.partitionRate(), opHistory.partitionRates()));
- output.println(String.format("Row rate : %,8.0f row/s %s", history.rowRate(), opHistory.rowRates()));
- output.println(String.format("Latency mean : %6.1f ms %s", history.meanLatencyMs(), opHistory.meanLatencies()));
- output.println(String.format("Latency median : %6.1f ms %s", history.medianLatencyMs(), opHistory.medianLatencies()));
- output.println(String.format("Latency 95th percentile : %6.1f ms %s", history.latencyAtPercentileMs(95.0), opHistory.latenciesAtPercentile(95.0)));
- output.println(String.format("Latency 99th percentile : %6.1f ms %s", history.latencyAtPercentileMs(99.0), opHistory.latenciesAtPercentile(99.0)));
- output.println(String.format("Latency 99.9th percentile : %6.1f ms %s", history.latencyAtPercentileMs(99.9), opHistory.latenciesAtPercentile(99.9)));
- output.println(String.format("Latency max : %6.1f ms %s", history.maxLatencyMs(), opHistory.maxLatencies()));
- output.println(String.format("Total partitions : %,10d %s", history.partitionCount, opHistory.partitionCounts()));
- output.println(String.format("Total errors : %,10d %s", history.errorCount, opHistory.errorCounts()));
- output.println(String.format("Total GC count : %,1.0f", totalGcStats.count));
- output.println(String.format("Total GC memory : %s", FBUtilities.prettyPrintMemory((long)totalGcStats.bytes, true)));
- output.println(String.format("Total GC time : %,6.1f seconds", totalGcStats.summs / 1000));
- output.println(String.format("Avg GC time : %,6.1f ms", totalGcStats.summs / totalGcStats.count));
- output.println(String.format("StdDev GC time : %,6.1f ms", totalGcStats.sdvms));
- output.println("Total operation time : " + DurationFormatUtils.formatDuration(
- history.runTimeMs(), "HH:mm:ss", true));
- output.println(""); // Newline is important here to separate the aggregates section from the END or the next stress iteration
- }
-
- public static void summarise(List<String> ids, List<StressMetrics> summarise, PrintStream out)
- {
- int idLen = 0;
- for (String id : ids)
- idLen = Math.max(id.length(), idLen);
- String formatstr = "%" + idLen + "s, ";
- printHeader(String.format(formatstr, "id"), out);
- for (int i = 0 ; i < ids.size() ; i++)
- {
- for (Map.Entry<String, TimingInterval> type : summarise.get(i).timing.getHistory().intervals().entrySet())
- {
- printRow(String.format(formatstr, ids.get(i)),
- type.getKey(),
- type.getValue(),
- type.getValue(),
- summarise.get(i).totalGcStats,
- summarise.get(i).rowRateUncertainty,
- out);
- }
- TimingInterval hist = summarise.get(i).timing.getHistory().combine();
- printRow(String.format(formatstr, ids.get(i)),
- "total",
- hist,
- hist,
- summarise.get(i).totalGcStats,
- summarise.get(i).rowRateUncertainty,
- out
- );
- }
- }
-
- public Timing getTiming()
- {
- return timing;
- }
-
- public boolean wasCancelled()
- {
- return cancelled;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
index 1343cf1..b45462f 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
@@ -50,11 +50,11 @@ import org.apache.cassandra.stress.operations.userdefined.TokenRangeQuery;
import org.apache.cassandra.stress.operations.userdefined.SchemaInsert;
import org.apache.cassandra.stress.operations.userdefined.SchemaQuery;
import org.apache.cassandra.stress.operations.userdefined.ValidatingSchemaQuery;
+import org.apache.cassandra.stress.report.Timer;
import org.apache.cassandra.stress.settings.*;
import org.apache.cassandra.stress.util.JavaDriverClient;
import org.apache.cassandra.stress.util.MultiPrintStream;
import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.stress.util.Timer;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.ThriftConversion;
import org.apache.thrift.TException;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java b/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
index 9a3522c..aa5ddfd 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
@@ -36,9 +36,4 @@ public class FixedOpDistribution implements OpDistribution
{
return operation;
}
-
- public void closeTimers()
- {
- operation.close();
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
index 33a0c93..bef8954 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
@@ -25,8 +25,5 @@ import org.apache.cassandra.stress.Operation;
public interface OpDistribution
{
-
Operation next();
-
- public void closeTimers();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
index 14e6dfb..2477c36 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
@@ -21,11 +21,11 @@ package org.apache.cassandra.stress.operations;
*/
-import org.apache.cassandra.stress.util.Timing;
+import org.apache.cassandra.stress.StressAction.MeasurementSink;
public interface OpDistributionFactory
{
- public OpDistribution get(Timing timing, boolean isWarmup);
+ public OpDistribution get(boolean isWarmup, MeasurementSink sink);
public String desc();
Iterable<OpDistributionFactory> each();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java
index 93290fc..bad0a94 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java
@@ -29,9 +29,9 @@ import org.apache.cassandra.stress.generate.PartitionIterator;
import org.apache.cassandra.stress.generate.RatioDistribution;
import org.apache.cassandra.stress.generate.Seed;
import org.apache.cassandra.stress.generate.SeedManager;
+import org.apache.cassandra.stress.report.Timer;
import org.apache.cassandra.stress.settings.OptionRatioDistribution;
import org.apache.cassandra.stress.settings.StressSettings;
-import org.apache.cassandra.stress.util.Timer;
public abstract class PartitionOperation extends Operation
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
index fc0229e..6d7f9e4 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
@@ -51,12 +51,4 @@ public class SampledOpDistribution implements OpDistribution
remaining--;
return cur;
}
-
- public void closeTimers()
- {
- for (Pair<Operation, Double> op : operations.getPmf())
- {
- op.getFirst().close();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
index 0b206f9..1800039 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
@@ -21,17 +21,19 @@ package org.apache.cassandra.stress.operations;
*/
-import java.util.*;
-
-import org.apache.cassandra.stress.generate.*;
-import org.apache.cassandra.stress.util.Timing;
-import org.apache.commons.math3.distribution.EnumeratedDistribution;
-import org.apache.commons.math3.util.Pair;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.StressAction.MeasurementSink;
import org.apache.cassandra.stress.generate.DistributionFactory;
+import org.apache.cassandra.stress.generate.DistributionFixed;
import org.apache.cassandra.stress.generate.PartitionGenerator;
-import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.stress.report.Timer;
+import org.apache.commons.math3.distribution.EnumeratedDistribution;
+import org.apache.commons.math3.util.Pair;
public abstract class SampledOpDistributionFactory<T> implements OpDistributionFactory
{
@@ -47,13 +49,13 @@ public abstract class SampledOpDistributionFactory<T> implements OpDistributionF
protected abstract List<? extends Operation> get(Timer timer, PartitionGenerator generator, T key, boolean isWarmup);
protected abstract PartitionGenerator newGenerator();
- public OpDistribution get(Timing timing, boolean isWarmup)
+ public OpDistribution get(boolean isWarmup, MeasurementSink sink)
{
PartitionGenerator generator = newGenerator();
List<Pair<Operation, Double>> operations = new ArrayList<>();
for (Map.Entry<T, Double> ratio : ratios.entrySet())
{
- List<? extends Operation> ops = get(timing.newTimer(ratio.getKey().toString()),
+ List<? extends Operation> ops = get(new Timer(ratio.getKey().toString(), sink),
generator, ratio.getKey(), isWarmup);
for (Operation op : ops)
operations.add(new Pair<>(op, ratio.getValue() / ops.size()));
@@ -76,9 +78,9 @@ public abstract class SampledOpDistributionFactory<T> implements OpDistributionF
{
out.add(new OpDistributionFactory()
{
- public OpDistribution get(Timing timing, boolean isWarmup)
+ public OpDistribution get(boolean isWarmup, MeasurementSink sink)
{
- List<? extends Operation> ops = SampledOpDistributionFactory.this.get(timing.newTimer(ratio.getKey().toString()),
+ List<? extends Operation> ops = SampledOpDistributionFactory.this.get(new Timer(ratio.getKey().toString(), sink),
newGenerator(),
ratio.getKey(),
isWarmup);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java
index bb8135c..2874b4e 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java
@@ -29,9 +29,9 @@ import org.apache.cassandra.stress.generate.Distribution;
import org.apache.cassandra.stress.generate.DistributionFactory;
import org.apache.cassandra.stress.generate.PartitionGenerator;
import org.apache.cassandra.stress.generate.SeedManager;
+import org.apache.cassandra.stress.report.Timer;
import org.apache.cassandra.stress.settings.Command;
import org.apache.cassandra.stress.settings.StressSettings;
-import org.apache.cassandra.stress.util.Timer;
public class CqlCounterAdder extends CqlOperation<Integer>
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java
index d91ab37..eb908b0 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java
@@ -27,9 +27,9 @@ import java.util.List;
import org.apache.cassandra.stress.generate.PartitionGenerator;
import org.apache.cassandra.stress.generate.SeedManager;
+import org.apache.cassandra.stress.report.Timer;
import org.apache.cassandra.stress.settings.Command;
import org.apache.cassandra.stress.settings.StressSettings;
-import org.apache.cassandra.stress.util.Timer;
public class CqlCounterGetter extends CqlOperation<Integer>
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java
index fdf5007..3fda6c2 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java
@@ -27,9 +27,9 @@ import java.util.List;
import org.apache.cassandra.stress.generate.PartitionGenerator;
import org.apache.cassandra.stress.generate.SeedManager;
+import org.apache.cassandra.stress.report.Timer;
import org.apache.cassandra.stress.settings.Command;
import org.apache.cassandra.stress.settings.StressSettings;
-import org.apache.cassandra.stress.util.Timer;
public class CqlInserter extends CqlOperation<Integer>
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
index 097c1a0..647ba87 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
@@ -31,12 +31,12 @@ import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import org.apache.cassandra.stress.generate.PartitionGenerator;
import org.apache.cassandra.stress.generate.SeedManager;
+import org.apache.cassandra.stress.report.Timer;
import org.apache.cassandra.stress.settings.Command;
import org.apache.cassandra.stress.settings.ConnectionStyle;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.JavaDriverClient;
import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.stress.util.Timer;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.CqlResult;
import org.apache.cassandra.thrift.CqlRow;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java
index 0ee17e9..61c6553 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java
@@ -28,9 +28,9 @@ import java.util.List;
import org.apache.cassandra.stress.generate.PartitionGenerator;
import org.apache.cassandra.stress.generate.SeedManager;
+import org.apache.cassandra.stress.report.Timer;
import org.apache.cassandra.stress.settings.Command;
import org.apache.cassandra.stress.settings.StressSettings;
-import org.apache.cassandra.stress.util.Timer;
public class CqlReader extends CqlOperation<ByteBuffer[][]>
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
index 1f9a2c8..db35504 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
@@ -26,10 +26,10 @@ import java.util.concurrent.ThreadLocalRandom;
import org.apache.cassandra.stress.Operation;
import org.apache.cassandra.stress.generate.*;
import org.apache.cassandra.stress.operations.PartitionOperation;
+import org.apache.cassandra.stress.report.Timer;
import org.apache.cassandra.stress.settings.Command;
import org.apache.cassandra.stress.settings.CqlVersion;
import org.apache.cassandra.stress.settings.StressSettings;
-import org.apache.cassandra.stress.util.Timer;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java
index be34a07..42f8bc9 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java
@@ -28,10 +28,10 @@ import org.apache.cassandra.stress.generate.Distribution;
import org.apache.cassandra.stress.generate.DistributionFactory;
import org.apache.cassandra.stress.generate.PartitionGenerator;
import org.apache.cassandra.stress.generate.SeedManager;
+import org.apache.cassandra.stress.report.Timer;
import org.apache.cassandra.stress.settings.Command;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.stress.util.Timer;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.CounterColumn;
import org.apache.cassandra.thrift.Mutation;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java
index ca81fe9..4bec3b2 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java
@@ -23,10 +23,10 @@ import java.util.List;
import org.apache.cassandra.stress.generate.PartitionGenerator;
import org.apache.cassandra.stress.generate.SeedManager;
+import org.apache.cassandra.stress.report.Timer;
import org.apache.cassandra.stress.settings.Command;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.stress.util.Timer;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.SlicePredicate;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java
index 1827c06..ecaa140 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java
@@ -26,10 +26,10 @@ import java.util.Map;
import org.apache.cassandra.stress.generate.PartitionGenerator;
import org.apache.cassandra.stress.generate.SeedManager;
+import org.apache.cassandra.stress.report.Timer;
import org.apache.cassandra.stress.settings.Command;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.stress.util.Timer;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.Mutation;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java
index d77dc6a..4d530b9 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java
@@ -23,10 +23,10 @@ import java.util.List;
import org.apache.cassandra.stress.generate.PartitionGenerator;
import org.apache.cassandra.stress.generate.SeedManager;
+import org.apache.cassandra.stress.report.Timer;
import org.apache.cassandra.stress.settings.Command;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.stress.util.Timer;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnParent;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
index 9eebce2..96b3392 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
@@ -43,10 +43,10 @@ import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.io.sstable.CQLSSTableWriter;
import org.apache.cassandra.stress.WorkManager;
import org.apache.cassandra.stress.generate.*;
+import org.apache.cassandra.stress.report.Timer;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.JavaDriverClient;
import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.stress.util.Timer;
public class SchemaInsert extends SchemaStatement
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
index 0d8e756..2764704 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
@@ -33,10 +33,10 @@ import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.stress.generate.*;
+import org.apache.cassandra.stress.report.Timer;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.JavaDriverClient;
import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.stress.util.Timer;
import org.apache.cassandra.thrift.CqlResult;
import org.apache.cassandra.thrift.ThriftConversion;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
index c83787b..ca1f5fa 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
@@ -32,9 +32,9 @@ import com.datastax.driver.core.PreparedStatement;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.stress.generate.Row;
import org.apache.cassandra.stress.operations.PartitionOperation;
+import org.apache.cassandra.stress.report.Timer;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.JavaDriverClient;
-import org.apache.cassandra.stress.util.Timer;
public abstract class SchemaStatement extends PartitionOperation
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
index 7a0f02d..f561f61 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
@@ -39,10 +39,10 @@ import org.apache.cassandra.stress.Operation;
import org.apache.cassandra.stress.StressYaml;
import org.apache.cassandra.stress.WorkManager;
import org.apache.cassandra.stress.generate.TokenRangeIterator;
+import org.apache.cassandra.stress.report.Timer;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.JavaDriverClient;
import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.stress.util.Timer;
public class TokenRangeQuery extends Operation
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
index 4547a37..a731b99 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
@@ -33,10 +33,10 @@ import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.stress.generate.*;
import org.apache.cassandra.stress.generate.Row;
import org.apache.cassandra.stress.operations.PartitionOperation;
+import org.apache.cassandra.stress.report.Timer;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.JavaDriverClient;
import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.stress.util.Timer;
import org.apache.cassandra.thrift.Compression;
import org.apache.cassandra.thrift.CqlResult;
import org.apache.cassandra.thrift.CqlRow;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/report/StressMetrics.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/report/StressMetrics.java b/tools/stress/src/org/apache/cassandra/stress/report/StressMetrics.java
new file mode 100644
index 0000000..a596547
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/report/StressMetrics.java
@@ -0,0 +1,457 @@
+package org.apache.cassandra.stress.report;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import java.io.FileNotFoundException;
+import java.io.PrintStream;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramLogWriter;
+import org.apache.cassandra.stress.StressAction.Consumer;
+import org.apache.cassandra.stress.StressAction.MeasurementSink;
+import org.apache.cassandra.stress.StressAction.OpMeasurement;
+import org.apache.cassandra.stress.settings.SettingsLog.Level;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.JmxCollector;
+import org.apache.cassandra.stress.util.JmxCollector.GcStats;
+import org.apache.cassandra.stress.util.Uncertainty;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.commons.lang3.time.DurationFormatUtils;
+
+public class StressMetrics implements MeasurementSink
+{
+ private final List<Consumer> consumers = new ArrayList<>();
+ private final PrintStream output;
+ private final Thread thread;
+ private final Uncertainty rowRateUncertainty = new Uncertainty();
+ private final CountDownLatch stopped = new CountDownLatch(1);
+ private final Callable<JmxCollector.GcStats> gcStatsCollector;
+ private final HistogramLogWriter histogramWriter;
+ private final long epochNs = System.nanoTime();
+ private final long epochMs = System.currentTimeMillis();
+
+ private volatile JmxCollector.GcStats totalGcStats = new GcStats(0);
+
+ private volatile boolean stop = false;
+ private volatile boolean cancelled = false;
+
+
+ // collected data for intervals and summary
+ private final Map<String, TimingInterval> opTypeToCurrentTimingInterval = new TreeMap<>();
+ private final Map<String, TimingInterval> opTypeToSummaryTimingInterval = new TreeMap<>();
+ private final Queue<OpMeasurement> leftovers = new ArrayDeque<>();
+ private final TimingInterval totalCurrentInterval;
+ private final TimingInterval totalSummaryInterval;
+
+ public StressMetrics(PrintStream output, final long logIntervalMillis, StressSettings settings)
+ {
+ this.output = output;
+ if(settings.log.hdrFile != null)
+ {
+ try
+ {
+ histogramWriter = new HistogramLogWriter(settings.log.hdrFile);
+ histogramWriter.outputComment("Logging op latencies for Cassandra Stress");
+ histogramWriter.outputLogFormatVersion();
+ final long roundedEpoch = epochMs - (epochMs%1000);
+ histogramWriter.outputBaseTime(roundedEpoch);
+ histogramWriter.setBaseTime(roundedEpoch);
+ histogramWriter.outputStartTime(roundedEpoch);
+ histogramWriter.outputLegend();
+ }
+ catch (FileNotFoundException e)
+ {
+ throw new IllegalArgumentException(e);
+ }
+ }
+ else
+ {
+ histogramWriter = null;
+ }
+ Callable<JmxCollector.GcStats> gcStatsCollector;
+ totalGcStats = new JmxCollector.GcStats(0);
+ try
+ {
+ gcStatsCollector = new JmxCollector(settings.node.resolveAllPermitted(settings), settings.port.jmxPort);
+ }
+ catch (Throwable t)
+ {
+ if (settings.log.level == Level.VERBOSE)
+ {
+ t.printStackTrace();
+ }
+ System.err.println("Failed to connect over JMX; not collecting these stats");
+ gcStatsCollector = new Callable<JmxCollector.GcStats>()
+ {
+ public JmxCollector.GcStats call() throws Exception
+ {
+ return totalGcStats;
+ }
+ };
+ }
+ this.gcStatsCollector = gcStatsCollector;
+ this.totalCurrentInterval = new TimingInterval(settings.rate.isFixed);
+ this.totalSummaryInterval = new TimingInterval(settings.rate.isFixed);
+ printHeader("", output);
+ thread = new Thread(() -> {
+ reportingLoop(logIntervalMillis);
+ });
+ thread.setName("StressMetrics");
+ }
+ public void start()
+ {
+ thread.start();
+ }
+
+ public void waitUntilConverges(double targetUncertainty, int minMeasurements, int maxMeasurements) throws InterruptedException
+ {
+ rowRateUncertainty.await(targetUncertainty, minMeasurements, maxMeasurements);
+ }
+
+ public void cancel()
+ {
+ cancelled = true;
+ stop = true;
+ thread.interrupt();
+ rowRateUncertainty.wakeAll();
+ }
+
+ public void stop() throws InterruptedException
+ {
+ stop = true;
+ thread.interrupt();
+ stopped.await();
+ }
+
+
+ private void reportingLoop(final long logIntervalMillis)
+ {
+ // align report timing to the nearest second
+ final long currentTimeMs = System.currentTimeMillis();
+ final long startTimeMs = currentTimeMs - (currentTimeMs % 1000);
+ // reporting interval starts rounded to the second
+ long reportingStartNs = (System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(currentTimeMs - startTimeMs));
+ final long parkIntervalNs = TimeUnit.MILLISECONDS.toNanos(logIntervalMillis);
+ try
+ {
+ while (!stop)
+ {
+ final long wakupTarget = reportingStartNs + parkIntervalNs;
+ sleepUntil(wakupTarget);
+ if (stop)
+ {
+ break;
+ }
+ recordInterval(wakupTarget, parkIntervalNs);
+ reportingStartNs += parkIntervalNs;
+ }
+
+ final long end = System.nanoTime();
+ recordInterval(end, end - reportingStartNs);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ cancel();
+ }
+ finally
+ {
+ rowRateUncertainty.wakeAll();
+ stopped.countDown();
+ }
+ }
+
+
+ private void sleepUntil(final long until)
+ {
+ long parkFor;
+ while (!stop &&
+ (parkFor = until - System.nanoTime()) > 0)
+ {
+ LockSupport.parkNanos(parkFor);
+ }
+ }
+
+ @Override
+ public void record(String opType, long intended, long started, long ended, long rowCnt, long partitionCnt, boolean err)
+ {
+ TimingInterval current = opTypeToCurrentTimingInterval.computeIfAbsent(opType, k -> new TimingInterval(totalCurrentInterval.isFixed));
+ record(current, intended, started, ended, rowCnt, partitionCnt, err);
+ }
+
+ private void record(TimingInterval t, long intended, long started, long ended, long rowCnt, long partitionCnt, boolean err)
+ {
+ t.rowCount += rowCnt;
+ t.partitionCount += partitionCnt;
+ if (err)
+ t.errorCount++;
+ if (intended != 0) {
+ t.responseTime().recordValue(ended-intended);
+ t.waitTime().recordValue(started-intended);
+ }
+ final long sTime = ended-started;
+ t.serviceTime().recordValue(sTime);
+ }
+
+ private void recordInterval(long intervalEnd, long parkIntervalNs) throws InterruptedException
+ {
+
+ drainConsumerMeasurements(intervalEnd, parkIntervalNs);
+
+ GcStats gcStats = null;
+ try
+ {
+ gcStats = gcStatsCollector.call();
+ }
+ catch (Exception e)
+ {
+ gcStats = new GcStats(0);
+ }
+ totalGcStats = JmxCollector.GcStats.aggregate(Arrays.asList(totalGcStats, gcStats));
+
+ rowRateUncertainty.update(totalCurrentInterval.adjustedRowRate());
+ if (totalCurrentInterval.operationCount() != 0)
+ {
+ // if there's a single operation we only print the total
+ final boolean logPerOpSummaryLine = opTypeToCurrentTimingInterval.size() > 1;
+
+ for (Map.Entry<String, TimingInterval> type : opTypeToCurrentTimingInterval.entrySet())
+ {
+ final String opName = type.getKey();
+ final TimingInterval opInterval = type.getValue();
+ if (logPerOpSummaryLine)
+ {
+ printRow("", opName, opInterval, opTypeToSummaryTimingInterval.get(opName), gcStats, rowRateUncertainty, output);
+ }
+ logHistograms(opName, opInterval);
+ opInterval.reset();
+ }
+
+ printRow("", "total", totalCurrentInterval, totalSummaryInterval, gcStats, rowRateUncertainty, output);
+ totalCurrentInterval.reset();
+ }
+ }
+
+ private void drainConsumerMeasurements(long intervalEnd, long parkIntervalNs)
+ {
+ // record leftover measurements if any
+ int leftoversSize = leftovers.size();
+ for (int i=0;i<leftoversSize;i++)
+ {
+ OpMeasurement last = leftovers.poll();
+ if (last.ended <= intervalEnd)
+ {
+ record(last.opType, last.intended, last.started, last.ended, last.rowCnt, last.partitionCnt, last.err);
+ // round robin-ish redistribution of leftovers
+ consumers.get(i%consumers.size()).measurementsRecycling.offer(last);
+ }
+ else
+ {
+ // no record for you! wait one interval!
+ leftovers.offer(last);
+ }
+ }
+ // record interval collected measurements
+ for (Consumer c: consumers) {
+ Queue<OpMeasurement> in = c.measurementsReporting;
+ Queue<OpMeasurement> out = c.measurementsRecycling;
+ OpMeasurement last;
+ while ((last = in.poll()) != null)
+ {
+ if (last.ended > intervalEnd)
+ {
+ // measurements for any given consumer are ordered, we stop when we stop.
+ leftovers.add(last);
+ break;
+ }
+ record(last.opType, last.intended, last.started, last.ended, last.rowCnt, last.partitionCnt, last.err);
+ out.offer(last);
+ }
+ }
+ // set timestamps and summarize
+ for (Entry<String, TimingInterval> currPerOp : opTypeToCurrentTimingInterval.entrySet()) {
+ currPerOp.getValue().endNanos(intervalEnd);
+ currPerOp.getValue().startNanos(intervalEnd-parkIntervalNs);
+ TimingInterval summaryPerOp = opTypeToSummaryTimingInterval.computeIfAbsent(currPerOp.getKey(), k -> new TimingInterval(totalCurrentInterval.isFixed));
+ summaryPerOp.add(currPerOp.getValue());
+ totalCurrentInterval.add(currPerOp.getValue());
+ }
+ totalCurrentInterval.endNanos(intervalEnd);
+ totalCurrentInterval.startNanos(intervalEnd-parkIntervalNs);
+
+ totalSummaryInterval.add(totalCurrentInterval);
+ }
+
+
+ private void logHistograms(String opName, TimingInterval opInterval)
+ {
+ if (histogramWriter == null)
+ return;
+ final long startNs = opInterval.startNanos();
+ final long endNs = opInterval.endNanos();
+
+ logHistogram(opName + "-st", startNs, endNs, opInterval.serviceTime());
+ logHistogram(opName + "-rt", startNs, endNs, opInterval.responseTime());
+ logHistogram(opName + "-wt", startNs, endNs, opInterval.waitTime());
+ }
+
+ private void logHistogram(String opName, final long startNs, final long endNs, final Histogram histogram)
+ {
+ if (histogram.getTotalCount() != 0)
+ {
+ histogram.setTag(opName);
+ final long relativeStartNs = startNs - epochNs;
+ final long startMs = (long) (1000 *((epochMs + NANOSECONDS.toMillis(relativeStartNs))/1000.0));
+ histogram.setStartTimeStamp(startMs);
+ final long relativeEndNs = endNs - epochNs;
+ final long endMs = (long) (1000 *((epochMs + NANOSECONDS.toMillis(relativeEndNs))/1000.0));
+ histogram.setEndTimeStamp(endMs);
+ histogramWriter.outputIntervalHistogram(histogram);
+ }
+ }
+
+
+ // PRINT FORMATTING
+
+ public static final String HEADFORMAT = "%-10s%10s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%7s,%9s,%7s,%7s,%8s,%8s,%8s,%8s";
+ public static final String ROWFORMAT = "%-10s%10d,%8.0f,%8.0f,%8.0f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%7.1f,%9.5f,%7d,%7.0f,%8.0f,%8.0f,%8.0f,%8.0f";
+ public static final String[] HEADMETRICS = new String[]{"type", "total ops","op/s","pk/s","row/s","mean","med",".95",".99",".999","max","time","stderr", "errors", "gc: #", "max ms", "sum ms", "sdv ms", "mb"};
+ public static final String HEAD = String.format(HEADFORMAT, (Object[]) HEADMETRICS);
+
+ private static void printHeader(String prefix, PrintStream output)
+ {
+ output.println(prefix + HEAD);
+ }
+
+ private static void printRow(String prefix, String type, TimingInterval interval, TimingInterval total, JmxCollector.GcStats gcStats, Uncertainty opRateUncertainty, PrintStream output)
+ {
+ output.println(prefix + String.format(ROWFORMAT,
+ type + ",",
+ total.operationCount(),
+ interval.opRate(),
+ interval.partitionRate(),
+ interval.rowRate(),
+ interval.meanLatencyMs(),
+ interval.medianLatencyMs(),
+ interval.latencyAtPercentileMs(95.0),
+ interval.latencyAtPercentileMs(99.0),
+ interval.latencyAtPercentileMs(99.9),
+ interval.maxLatencyMs(),
+ total.runTimeMs() / 1000f,
+ opRateUncertainty.getUncertainty(),
+ interval.errorCount,
+ gcStats.count,
+ gcStats.maxms,
+ gcStats.summs,
+ gcStats.sdvms,
+ gcStats.bytes / (1 << 20)
+ ));
+ }
+
+ public void summarise()
+ {
+ output.println("\n");
+ output.println("Results:");
+
+ TimingIntervals opHistory = new TimingIntervals(opTypeToSummaryTimingInterval);
+ TimingInterval history = this.totalSummaryInterval;
+ output.println(String.format("Op rate : %,8.0f op/s %s", history.opRate(), opHistory.opRates()));
+ output.println(String.format("Partition rate : %,8.0f pk/s %s", history.partitionRate(), opHistory.partitionRates()));
+ output.println(String.format("Row rate : %,8.0f row/s %s", history.rowRate(), opHistory.rowRates()));
+ output.println(String.format("Latency mean : %6.1f ms %s", history.meanLatencyMs(), opHistory.meanLatencies()));
+ output.println(String.format("Latency median : %6.1f ms %s", history.medianLatencyMs(), opHistory.medianLatencies()));
+ output.println(String.format("Latency 95th percentile : %6.1f ms %s", history.latencyAtPercentileMs(95.0), opHistory.latenciesAtPercentile(95.0)));
+ output.println(String.format("Latency 99th percentile : %6.1f ms %s", history.latencyAtPercentileMs(99.0), opHistory.latenciesAtPercentile(99.0)));
+ output.println(String.format("Latency 99.9th percentile : %6.1f ms %s", history.latencyAtPercentileMs(99.9), opHistory.latenciesAtPercentile(99.9)));
+ output.println(String.format("Latency max : %6.1f ms %s", history.maxLatencyMs(), opHistory.maxLatencies()));
+ output.println(String.format("Total partitions : %,10d %s", history.partitionCount, opHistory.partitionCounts()));
+ output.println(String.format("Total errors : %,10d %s", history.errorCount, opHistory.errorCounts()));
+ output.println(String.format("Total GC count : %,1.0f", totalGcStats.count));
+ output.println(String.format("Total GC memory : %s", FBUtilities.prettyPrintMemory((long)totalGcStats.bytes, true)));
+ output.println(String.format("Total GC time : %,6.1f seconds", totalGcStats.summs / 1000));
+ output.println(String.format("Avg GC time : %,6.1f ms", totalGcStats.summs / totalGcStats.count));
+ output.println(String.format("StdDev GC time : %,6.1f ms", totalGcStats.sdvms));
+ output.println("Total operation time : " + DurationFormatUtils.formatDuration(
+ history.runTimeMs(), "HH:mm:ss", true));
+ output.println(""); // Newline is important here to separate the aggregates section from the END or the next stress iteration
+ }
+
+ public static void summarise(List<String> ids, List<StressMetrics> summarise, PrintStream out)
+ {
+ int idLen = 0;
+ for (String id : ids)
+ idLen = Math.max(id.length(), idLen);
+ String formatstr = "%" + idLen + "s, ";
+ printHeader(String.format(formatstr, "id"), out);
+ for (int i = 0 ; i < ids.size() ; i++)
+ {
+ for (Map.Entry<String, TimingInterval> type : summarise.get(i).opTypeToSummaryTimingInterval.entrySet())
+ {
+ printRow(String.format(formatstr, ids.get(i)),
+ type.getKey(),
+ type.getValue(),
+ type.getValue(),
+ summarise.get(i).totalGcStats,
+ summarise.get(i).rowRateUncertainty,
+ out);
+ }
+ TimingInterval hist = summarise.get(i).totalSummaryInterval;
+ printRow(String.format(formatstr, ids.get(i)),
+ "total",
+ hist,
+ hist,
+ summarise.get(i).totalGcStats,
+ summarise.get(i).rowRateUncertainty,
+ out
+ );
+ }
+ }
+
+ public boolean wasCancelled()
+ {
+ return cancelled;
+ }
+
+ public void add(Consumer consumer)
+ {
+ consumers.add(consumer);
+ }
+
+ public double opRate()
+ {
+ return totalSummaryInterval.opRate();
+ }
+}