You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2016/09/07 14:15:57 UTC

[2/2] cassandra git commit: Fix Cassandra Stress reporting thread model and precision

Fix Cassandra Stress reporting thread model and precision

Patch by Nitsan Wakart; reviewed by tjake for CASSANDRA-12585


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e73633cd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e73633cd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e73633cd

Branch: refs/heads/trunk
Commit: e73633cd8129b18e706e7ee8bdb05d802111c7e3
Parents: 1d74664
Author: nitsanw <ni...@yahoo.com>
Authored: Thu Sep 1 01:27:55 2016 +0200
Committer: T Jake Luciani <ja...@apache.org>
Committed: Wed Sep 7 10:14:20 2016 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NOTICE.txt                                      |   3 +
 build.xml                                       |   2 +
 lib/jctools-core-1.2.1.jar                      | Bin 0 -> 106714 bytes
 lib/licenses/jctools-core-1.2.1.txt             | 202 ++++++++
 .../org/apache/cassandra/stress/Operation.java  |   9 +-
 .../apache/cassandra/stress/StressAction.java   |  67 ++-
 .../apache/cassandra/stress/StressGraph.java    |   2 +-
 .../apache/cassandra/stress/StressMetrics.java  | 345 --------------
 .../apache/cassandra/stress/StressProfile.java  |   2 +-
 .../stress/operations/FixedOpDistribution.java  |   5 -
 .../stress/operations/OpDistribution.java       |   3 -
 .../operations/OpDistributionFactory.java       |   4 +-
 .../stress/operations/PartitionOperation.java   |   2 +-
 .../operations/SampledOpDistribution.java       |   8 -
 .../SampledOpDistributionFactory.java           |  24 +-
 .../operations/predefined/CqlCounterAdder.java  |   2 +-
 .../operations/predefined/CqlCounterGetter.java |   2 +-
 .../operations/predefined/CqlInserter.java      |   2 +-
 .../operations/predefined/CqlOperation.java     |   2 +-
 .../stress/operations/predefined/CqlReader.java |   2 +-
 .../predefined/PredefinedOperation.java         |   2 +-
 .../predefined/ThriftCounterAdder.java          |   2 +-
 .../predefined/ThriftCounterGetter.java         |   2 +-
 .../operations/predefined/ThriftInserter.java   |   2 +-
 .../operations/predefined/ThriftReader.java     |   2 +-
 .../operations/userdefined/SchemaInsert.java    |   2 +-
 .../operations/userdefined/SchemaQuery.java     |   2 +-
 .../operations/userdefined/SchemaStatement.java |   2 +-
 .../operations/userdefined/TokenRangeQuery.java |   2 +-
 .../userdefined/ValidatingSchemaQuery.java      |   2 +-
 .../cassandra/stress/report/StressMetrics.java  | 457 +++++++++++++++++++
 .../apache/cassandra/stress/report/Timer.java   |  63 +++
 .../cassandra/stress/report/TimingInterval.java | 234 ++++++++++
 .../stress/report/TimingIntervals.java          | 128 ++++++
 .../settings/SettingsCommandPreDefined.java     |   9 +-
 .../SettingsCommandPreDefinedMixed.java         |   2 +-
 .../stress/settings/SettingsCommandUser.java    |   2 +-
 .../org/apache/cassandra/stress/util/Timer.java | 167 -------
 .../apache/cassandra/stress/util/Timing.java    | 147 ------
 .../cassandra/stress/util/TimingInterval.java   | 234 ----------
 .../cassandra/stress/util/TimingIntervals.java  | 152 ------
 42 files changed, 1186 insertions(+), 1118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2e8fa4e..54ffcf1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.10
+ * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
  * Add JMH benchmarks.jar (CASSANDRA-12586)
  * Add row offset support to SASI (CASSANDRA-11990)
  * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/NOTICE.txt
----------------------------------------------------------------------
diff --git a/NOTICE.txt b/NOTICE.txt
index 1c552fc..dcdf03d 100644
--- a/NOTICE.txt
+++ b/NOTICE.txt
@@ -86,3 +86,6 @@ Copyright (c) 2000-2011 INRIA, France Telecom
 
 HdrHistogram
 http://hdrhistogram.org
+
+JCTools
+http://jctools.github.io/JCTools/

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 858030a..6954e04 100644
--- a/build.xml
+++ b/build.xml
@@ -452,6 +452,7 @@
           <dependency groupId="com.github.rholder" artifactId="snowball-stemmer" version="1.3.0.581.1" />
           <dependency groupId="com.googlecode.concurrent-trees" artifactId="concurrent-trees" version="2.4.0" />
 	  <dependency groupId="com.github.ben-manes.caffeine" artifactId="caffeine" version="2.2.6" />
+	  <dependency groupId="org.jctools" artifactId="jctools-core" version="1.2.1"/>
         </dependencyManagement>
         <developer id="alakshman" name="Avinash Lakshman"/>
         <developer id="aleksey" name="Aleksey Yeschenko"/>
@@ -618,6 +619,7 @@
         <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/>
         <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/>
 	<dependency groupId="com.github.ben-manes.caffeine" artifactId="caffeine" />
+	<dependency groupId="org.jctools" artifactId="jctools-core"/>
       </artifact:pom>
       <artifact:pom id="thrift-pom"
                     artifactId="cassandra-thrift"

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/lib/jctools-core-1.2.1.jar
----------------------------------------------------------------------
diff --git a/lib/jctools-core-1.2.1.jar b/lib/jctools-core-1.2.1.jar
new file mode 100644
index 0000000..fa1137b
Binary files /dev/null and b/lib/jctools-core-1.2.1.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/lib/licenses/jctools-core-1.2.1.txt
----------------------------------------------------------------------
diff --git a/lib/licenses/jctools-core-1.2.1.txt b/lib/licenses/jctools-core-1.2.1.txt
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/lib/licenses/jctools-core-1.2.1.txt
@@ -0,0 +1,202 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/Operation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/Operation.java b/tools/stress/src/org/apache/cassandra/stress/Operation.java
index 92a54f9..dc5bd2f 100644
--- a/tools/stress/src/org/apache/cassandra/stress/Operation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/Operation.java
@@ -20,11 +20,11 @@ package org.apache.cassandra.stress;
 
 import java.io.IOException;
 
+import org.apache.cassandra.stress.report.Timer;
 import org.apache.cassandra.stress.settings.SettingsLog;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.JavaDriverClient;
 import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.stress.util.Timer;
 import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.transport.SimpleClient;
 
@@ -106,7 +106,7 @@ public abstract class Operation
                 exceptionMessage = getExceptionMessage(e);
             }
         }
-        
+
         timer.stop(run.partitionCount(), run.rowCount(), !success);
 
         if (!success)
@@ -138,11 +138,6 @@ public abstract class Operation
             System.err.println(message);
     }
 
-    public void close()
-    {
-        timer.close();
-    }
-
     public void intendedStartNs(long intendedTime)
     {
         timer.intendedTimeNs(intendedTime);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/StressAction.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressAction.java b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
index 7c37ef8..f3d9cbd 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressAction.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressAction.java
@@ -22,6 +22,7 @@ import java.io.OutputStream;
 import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -29,12 +30,15 @@ import java.util.concurrent.locks.LockSupport;
 
 import org.apache.cassandra.stress.operations.OpDistribution;
 import org.apache.cassandra.stress.operations.OpDistributionFactory;
+import org.apache.cassandra.stress.report.StressMetrics;
 import org.apache.cassandra.stress.settings.ConnectionAPI;
 import org.apache.cassandra.stress.settings.SettingsCommand;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.JavaDriverClient;
 import org.apache.cassandra.stress.util.ThriftClient;
 import org.apache.cassandra.transport.SimpleClient;
+import org.jctools.queues.SpscArrayQueue;
+import org.jctools.queues.SpscUnboundedArrayQueue;
 
 import com.google.common.util.concurrent.Uninterruptibles;
 
@@ -183,8 +187,8 @@ public class StressAction implements Runnable
         double improvement = 0;
         for (int i = results.size() - count ; i < results.size() ; i++)
         {
-            double prev = results.get(i - 1).getTiming().getHistory().opRate();
-            double cur = results.get(i).getTiming().getHistory().opRate();
+            double prev = results.get(i - 1).opRate();
+            double cur = results.get(i).opRate();
             improvement += (cur - prev) / prev;
         }
         return improvement / count;
@@ -219,7 +223,7 @@ public class StressAction implements Runnable
         final Consumer[] consumers = new Consumer[threadCount];
         for (int i = 0; i < threadCount; i++)
         {
-            consumers[i] = new Consumer(operations.get(metrics.getTiming(), isWarmup),
+            consumers[i] = new Consumer(operations, isWarmup,
                                         done, start, releaseConsumers, workManager, metrics, rateLimiter);
         }
 
@@ -238,7 +242,9 @@ public class StressAction implements Runnable
         }
         // start counting from NOW!
         if(rateLimiter != null)
+        {
             rateLimiter.start();
+        }
         // release the hounds!!!
         releaseConsumers.countDown();
 
@@ -353,18 +359,28 @@ public class StressAction implements Runnable
             return op;
         }
 
-        void close()
-        {
-            operations.closeTimers();
-        }
-
         void abort()
         {
             workManager.stop();
         }
     }
-
-    private class Consumer extends Thread
+    public static class OpMeasurement
+    {
+        public String opType;
+        public long intended,started,ended,rowCnt,partitionCnt;
+        public boolean err;
+        @Override
+        public String toString()
+        {
+            return "OpMeasurement [opType=" + opType + ", intended=" + intended + ", started=" + started + ", ended="
+                    + ended + ", rowCnt=" + rowCnt + ", partitionCnt=" + partitionCnt + ", err=" + err + "]";
+        }
+    }
+    public interface MeasurementSink
+    {
+        void record(String opType,long intended, long started, long ended, long rowCnt, long partitionCnt, boolean err);
+    }
+    public class Consumer extends Thread implements MeasurementSink
     {
         private final StreamOfOperations opStream;
         private final StressMetrics metrics;
@@ -372,8 +388,10 @@ public class StressAction implements Runnable
         private final CountDownLatch done;
         private final CountDownLatch start;
         private final CountDownLatch releaseConsumers;
-
-        public Consumer(OpDistribution operations,
+        public final Queue<OpMeasurement> measurementsRecycling;
+        public final Queue<OpMeasurement> measurementsReporting;
+        public Consumer(OpDistributionFactory operations,
+                        boolean isWarmup,
                         CountDownLatch done,
                         CountDownLatch start,
                         CountDownLatch releaseConsumers,
@@ -381,13 +399,18 @@ public class StressAction implements Runnable
                         StressMetrics metrics,
                         UniformRateLimiter rateLimiter)
         {
+            OpDistribution opDistribution = operations.get(isWarmup, this);
             this.done = done;
             this.start = start;
             this.releaseConsumers = releaseConsumers;
             this.metrics = metrics;
-            this.opStream = new StreamOfOperations(operations, rateLimiter, workManager);
+            this.opStream = new StreamOfOperations(opDistribution, rateLimiter, workManager);
+            this.measurementsRecycling =  new SpscArrayQueue<OpMeasurement>(8*1024);
+            this.measurementsReporting =  new SpscUnboundedArrayQueue<OpMeasurement>(2048);
+            metrics.add(this);
         }
 
+
         public void run()
         {
             try
@@ -464,8 +487,24 @@ public class StressAction implements Runnable
             finally
             {
                 done.countDown();
-                opStream.close();
             }
         }
+
+        @Override
+        public void record(String opType, long intended, long started, long ended, long rowCnt, long partitionCnt, boolean err)
+        {
+            OpMeasurement opMeasurement = measurementsRecycling.poll();
+            if(opMeasurement == null) {
+                opMeasurement = new OpMeasurement();
+            }
+            opMeasurement.opType = opType;
+            opMeasurement.intended = intended;
+            opMeasurement.started = started;
+            opMeasurement.ended = ended;
+            opMeasurement.rowCnt = rowCnt;
+            opMeasurement.partitionCnt = partitionCnt;
+            opMeasurement.err = err;
+            measurementsReporting.offer(opMeasurement);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/StressGraph.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressGraph.java b/tools/stress/src/org/apache/cassandra/stress/StressGraph.java
index 3b383fa..196964c 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressGraph.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressGraph.java
@@ -35,7 +35,7 @@ import java.util.regex.Pattern;
 
 import com.google.common.io.ByteStreams;
 import org.apache.commons.lang3.StringUtils;
-
+import org.apache.cassandra.stress.report.StressMetrics;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java b/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
deleted file mode 100644
index 86e9a7a..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/StressMetrics.java
+++ /dev/null
@@ -1,345 +0,0 @@
-package org.apache.cassandra.stress;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
-
-import java.io.FileNotFoundException;
-import java.io.PrintStream;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-
-import org.HdrHistogram.Histogram;
-import org.HdrHistogram.HistogramLogWriter;
-import org.apache.cassandra.stress.settings.StressSettings;
-import org.apache.cassandra.stress.util.JmxCollector;
-import org.apache.cassandra.stress.util.Timing;
-import org.apache.cassandra.stress.util.TimingInterval;
-import org.apache.cassandra.stress.util.TimingIntervals;
-import org.apache.cassandra.stress.util.Uncertainty;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.commons.lang3.time.DurationFormatUtils;
-
-public class StressMetrics
-{
-
-    private final PrintStream output;
-    private final Thread thread;
-    private final Uncertainty rowRateUncertainty = new Uncertainty();
-    private final CountDownLatch stopped = new CountDownLatch(1);
-    private final Timing timing;
-    private final Callable<JmxCollector.GcStats> gcStatsCollector;
-    private final HistogramLogWriter histogramWriter;
-    private final long epochNs = System.nanoTime();
-    private final long epochMs = System.currentTimeMillis();
-
-    private volatile JmxCollector.GcStats totalGcStats;
-
-    private volatile boolean stop = false;
-    private volatile boolean cancelled = false;
-
-    public StressMetrics(PrintStream output, final long logIntervalMillis, StressSettings settings)
-    {
-        this.output = output;
-        if(settings.log.hdrFile != null)
-        {
-            try
-            {
-                histogramWriter = new HistogramLogWriter(settings.log.hdrFile);
-                histogramWriter.outputComment("Logging op latencies for Cassandra Stress");
-                histogramWriter.outputLogFormatVersion();
-                histogramWriter.outputBaseTime(epochMs);
-                histogramWriter.setBaseTime(epochMs);
-                histogramWriter.outputStartTime(epochMs);
-                histogramWriter.outputLegend();
-            }
-            catch (FileNotFoundException e)
-            {
-                throw new IllegalArgumentException(e);
-            }
-        }
-        else
-        {
-            histogramWriter = null;
-        }
-        Callable<JmxCollector.GcStats> gcStatsCollector;
-        totalGcStats = new JmxCollector.GcStats(0);
-        try
-        {
-            gcStatsCollector = new JmxCollector(settings.node.resolveAllPermitted(settings), settings.port.jmxPort);
-        }
-        catch (Throwable t)
-        {
-            switch (settings.log.level)
-            {
-                case VERBOSE:
-                    t.printStackTrace();
-            }
-            System.err.println("Failed to connect over JMX; not collecting these stats");
-            gcStatsCollector = new Callable<JmxCollector.GcStats>()
-            {
-                public JmxCollector.GcStats call() throws Exception
-                {
-                    return totalGcStats;
-                }
-            };
-        }
-        this.gcStatsCollector = gcStatsCollector;
-        this.timing = new Timing(settings.rate.isFixed);
-
-        printHeader("", output);
-        thread = new Thread(new Runnable()
-        {
-            @Override
-            public void run()
-            {
-                timing.start();
-                try {
-
-                    while (!stop)
-                    {
-                        try
-                        {
-                            long sleepNanos = timing.getHistory().endNanos() - System.nanoTime();
-                            long sleep = (sleepNanos / 1000000) + logIntervalMillis;
-
-                            if (sleep < logIntervalMillis >>> 3)
-                                // if had a major hiccup, sleep full interval
-                                Thread.sleep(logIntervalMillis);
-                            else
-                                Thread.sleep(sleep);
-
-                            update();
-                        } catch (InterruptedException e)
-                        {
-                            break;
-                        }
-                    }
-
-                    update();
-                }
-                catch (InterruptedException e)
-                {}
-                catch (Exception e)
-                {
-                    cancel();
-                    e.printStackTrace(StressMetrics.this.output);
-                }
-                finally
-                {
-                    rowRateUncertainty.wakeAll();
-                    stopped.countDown();
-                }
-            }
-        });
-        thread.setName("StressMetrics");
-    }
-
-    public void start()
-    {
-        thread.start();
-    }
-
-    public void waitUntilConverges(double targetUncertainty, int minMeasurements, int maxMeasurements) throws InterruptedException
-    {
-        rowRateUncertainty.await(targetUncertainty, minMeasurements, maxMeasurements);
-    }
-
-    public void cancel()
-    {
-        cancelled = true;
-        stop = true;
-        thread.interrupt();
-        rowRateUncertainty.wakeAll();
-    }
-
-    public void stop() throws InterruptedException
-    {
-        stop = true;
-        thread.interrupt();
-        stopped.await();
-    }
-
-    private void update() throws InterruptedException
-    {
-        Timing.TimingResult<JmxCollector.GcStats> result = timing.snap(gcStatsCollector);
-        totalGcStats = JmxCollector.GcStats.aggregate(Arrays.asList(totalGcStats, result.extra));
-        TimingInterval current = result.intervals.combine();
-        TimingInterval history = timing.getHistory().combine();
-        rowRateUncertainty.update(current.adjustedRowRate());
-        if (current.operationCount() != 0)
-        {
-            // if there's a single operation we only print the total
-            final boolean logPerOpSummaryLine = result.intervals.intervals().size() > 1;
-
-            for (Map.Entry<String, TimingInterval> type : result.intervals.intervals().entrySet())
-            {
-                final String opName = type.getKey();
-                final TimingInterval opInterval = type.getValue();
-                if (logPerOpSummaryLine)
-                {
-                    printRow("", opName, opInterval, timing.getHistory().get(type.getKey()), result.extra, rowRateUncertainty, output);
-                }
-                logHistograms(opName, opInterval);
-            }
-
-            printRow("", "total", current, history, result.extra, rowRateUncertainty, output);
-        }
-        if (timing.done())
-            stop = true;
-    }
-
-
-    private void logHistograms(String opName, TimingInterval opInterval)
-    {
-        if (histogramWriter == null)
-            return;
-        final long startNs = opInterval.startNanos();
-        final long endNs = opInterval.endNanos();
-
-        logHistogram(opName + "-st", startNs, endNs, opInterval.serviceTime());
-        logHistogram(opName + "-rt", startNs, endNs, opInterval.responseTime());
-        logHistogram(opName + "-wt", startNs, endNs, opInterval.waitTime());
-    }
-
-    private void logHistogram(String opName, final long startNs, final long endNs, final Histogram histogram)
-    {
-        if (histogram.getTotalCount() != 0)
-        {
-            histogram.setTag(opName);
-            histogram.setStartTimeStamp(epochMs + NANOSECONDS.toMillis(startNs - epochNs));
-            histogram.setEndTimeStamp(epochMs + NANOSECONDS.toMillis(endNs - epochNs));
-            histogramWriter.outputIntervalHistogram(histogram);
-        }
-    }
-
-
-    // PRINT FORMATTING
-
-    public static final String HEADFORMAT = "%-10s%10s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%7s,%9s,%7s,%7s,%8s,%8s,%8s,%8s";
-    public static final String ROWFORMAT =  "%-10s%10d,%8.0f,%8.0f,%8.0f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%7.1f,%9.5f,%7d,%7.0f,%8.0f,%8.0f,%8.0f,%8.0f";
-    public static final String[] HEADMETRICS = new String[]{"type", "total ops","op/s","pk/s","row/s","mean","med",".95",".99",".999","max","time","stderr", "errors", "gc: #", "max ms", "sum ms", "sdv ms", "mb"};
-    public static final String HEAD = String.format(HEADFORMAT, (Object[]) HEADMETRICS);
-
-    private static void printHeader(String prefix, PrintStream output)
-    {
-        output.println(prefix + HEAD);
-    }
-
-    private static void printRow(String prefix, String type, TimingInterval interval, TimingInterval total, JmxCollector.GcStats gcStats, Uncertainty opRateUncertainty, PrintStream output)
-    {
-        output.println(prefix + String.format(ROWFORMAT,
-                type + ",",
-                total.operationCount(),
-                interval.opRate(),
-                interval.partitionRate(),
-                interval.rowRate(),
-                interval.meanLatencyMs(),
-                interval.medianLatencyMs(),
-                interval.latencyAtPercentileMs(95.0),
-                interval.latencyAtPercentileMs(99.0),
-                interval.latencyAtPercentileMs(99.9),
-                interval.maxLatencyMs(),
-                total.runTimeMs() / 1000f,
-                opRateUncertainty.getUncertainty(),
-                interval.errorCount,
-                gcStats.count,
-                gcStats.maxms,
-                gcStats.summs,
-                gcStats.sdvms,
-                gcStats.bytes / (1 << 20)
-        ));
-    }
-
-    public void summarise()
-    {
-        output.println("\n");
-        output.println("Results:");
-
-        TimingIntervals opHistory = timing.getHistory();
-        TimingInterval history = opHistory.combine();
-        output.println(String.format("Op rate                   : %,8.0f op/s  %s", history.opRate(), opHistory.opRates()));
-        output.println(String.format("Partition rate            : %,8.0f pk/s  %s", history.partitionRate(), opHistory.partitionRates()));
-        output.println(String.format("Row rate                  : %,8.0f row/s %s", history.rowRate(), opHistory.rowRates()));
-        output.println(String.format("Latency mean              : %6.1f ms %s", history.meanLatencyMs(), opHistory.meanLatencies()));
-        output.println(String.format("Latency median            : %6.1f ms %s", history.medianLatencyMs(), opHistory.medianLatencies()));
-        output.println(String.format("Latency 95th percentile   : %6.1f ms %s", history.latencyAtPercentileMs(95.0), opHistory.latenciesAtPercentile(95.0)));
-        output.println(String.format("Latency 99th percentile   : %6.1f ms %s", history.latencyAtPercentileMs(99.0), opHistory.latenciesAtPercentile(99.0)));
-        output.println(String.format("Latency 99.9th percentile : %6.1f ms %s", history.latencyAtPercentileMs(99.9), opHistory.latenciesAtPercentile(99.9)));
-        output.println(String.format("Latency max               : %6.1f ms %s", history.maxLatencyMs(), opHistory.maxLatencies()));
-        output.println(String.format("Total partitions          : %,10d %s",   history.partitionCount, opHistory.partitionCounts()));
-        output.println(String.format("Total errors              : %,10d %s",   history.errorCount, opHistory.errorCounts()));
-        output.println(String.format("Total GC count            : %,1.0f", totalGcStats.count));
-        output.println(String.format("Total GC memory           : %s", FBUtilities.prettyPrintMemory((long)totalGcStats.bytes, true)));
-        output.println(String.format("Total GC time             : %,6.1f seconds", totalGcStats.summs / 1000));
-        output.println(String.format("Avg GC time               : %,6.1f ms", totalGcStats.summs / totalGcStats.count));
-        output.println(String.format("StdDev GC time            : %,6.1f ms", totalGcStats.sdvms));
-        output.println("Total operation time      : " + DurationFormatUtils.formatDuration(
-                history.runTimeMs(), "HH:mm:ss", true));
-        output.println(""); // Newline is important here to separate the aggregates section from the END or the next stress iteration
-    }
-
-    public static void summarise(List<String> ids, List<StressMetrics> summarise, PrintStream out)
-    {
-        int idLen = 0;
-        for (String id : ids)
-            idLen = Math.max(id.length(), idLen);
-        String formatstr = "%" + idLen + "s, ";
-        printHeader(String.format(formatstr, "id"), out);
-        for (int i = 0 ; i < ids.size() ; i++)
-        {
-            for (Map.Entry<String, TimingInterval> type : summarise.get(i).timing.getHistory().intervals().entrySet())
-            {
-                printRow(String.format(formatstr, ids.get(i)),
-                         type.getKey(),
-                         type.getValue(),
-                         type.getValue(),
-                         summarise.get(i).totalGcStats,
-                         summarise.get(i).rowRateUncertainty,
-                         out);
-            }
-            TimingInterval hist = summarise.get(i).timing.getHistory().combine();
-            printRow(String.format(formatstr, ids.get(i)),
-                    "total",
-                    hist,
-                    hist,
-                    summarise.get(i).totalGcStats,
-                    summarise.get(i).rowRateUncertainty,
-                    out
-            );
-        }
-    }
-
-    public Timing getTiming()
-    {
-        return timing;
-    }
-
-    public boolean wasCancelled()
-    {
-        return cancelled;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
index 1343cf1..b45462f 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
@@ -50,11 +50,11 @@ import org.apache.cassandra.stress.operations.userdefined.TokenRangeQuery;
 import org.apache.cassandra.stress.operations.userdefined.SchemaInsert;
 import org.apache.cassandra.stress.operations.userdefined.SchemaQuery;
 import org.apache.cassandra.stress.operations.userdefined.ValidatingSchemaQuery;
+import org.apache.cassandra.stress.report.Timer;
 import org.apache.cassandra.stress.settings.*;
 import org.apache.cassandra.stress.util.JavaDriverClient;
 import org.apache.cassandra.stress.util.MultiPrintStream;
 import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.stress.util.Timer;
 import org.apache.cassandra.thrift.Compression;
 import org.apache.cassandra.thrift.ThriftConversion;
 import org.apache.thrift.TException;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java b/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
index 9a3522c..aa5ddfd 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/FixedOpDistribution.java
@@ -36,9 +36,4 @@ public class FixedOpDistribution implements OpDistribution
     {
         return operation;
     }
-
-    public void closeTimers()
-    {
-        operation.close();
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
index 33a0c93..bef8954 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistribution.java
@@ -25,8 +25,5 @@ import org.apache.cassandra.stress.Operation;
 
 public interface OpDistribution
 {
-
     Operation next();
-
-    public void closeTimers();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
index 14e6dfb..2477c36 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/OpDistributionFactory.java
@@ -21,11 +21,11 @@ package org.apache.cassandra.stress.operations;
  */
 
 
-import org.apache.cassandra.stress.util.Timing;
+import org.apache.cassandra.stress.StressAction.MeasurementSink;
 
 public interface OpDistributionFactory
 {
-    public OpDistribution get(Timing timing, boolean isWarmup);
+    public OpDistribution get(boolean isWarmup, MeasurementSink sink);
     public String desc();
     Iterable<OpDistributionFactory> each();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java
index 93290fc..bad0a94 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java
@@ -29,9 +29,9 @@ import org.apache.cassandra.stress.generate.PartitionIterator;
 import org.apache.cassandra.stress.generate.RatioDistribution;
 import org.apache.cassandra.stress.generate.Seed;
 import org.apache.cassandra.stress.generate.SeedManager;
+import org.apache.cassandra.stress.report.Timer;
 import org.apache.cassandra.stress.settings.OptionRatioDistribution;
 import org.apache.cassandra.stress.settings.StressSettings;
-import org.apache.cassandra.stress.util.Timer;
 
 public abstract class PartitionOperation extends Operation
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
index fc0229e..6d7f9e4 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistribution.java
@@ -51,12 +51,4 @@ public class SampledOpDistribution implements OpDistribution
         remaining--;
         return cur;
     }
-
-    public void closeTimers()
-    {
-        for (Pair<Operation, Double> op : operations.getPmf())
-        {
-            op.getFirst().close();
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
index 0b206f9..1800039 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/SampledOpDistributionFactory.java
@@ -21,17 +21,19 @@ package org.apache.cassandra.stress.operations;
  */
 
 
-import java.util.*;
-
-import org.apache.cassandra.stress.generate.*;
-import org.apache.cassandra.stress.util.Timing;
-import org.apache.commons.math3.distribution.EnumeratedDistribution;
-import org.apache.commons.math3.util.Pair;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.cassandra.stress.Operation;
+import org.apache.cassandra.stress.StressAction.MeasurementSink;
 import org.apache.cassandra.stress.generate.DistributionFactory;
+import org.apache.cassandra.stress.generate.DistributionFixed;
 import org.apache.cassandra.stress.generate.PartitionGenerator;
-import org.apache.cassandra.stress.util.Timer;
+import org.apache.cassandra.stress.report.Timer;
+import org.apache.commons.math3.distribution.EnumeratedDistribution;
+import org.apache.commons.math3.util.Pair;
 
 public abstract class SampledOpDistributionFactory<T> implements OpDistributionFactory
 {
@@ -47,13 +49,13 @@ public abstract class SampledOpDistributionFactory<T> implements OpDistributionF
     protected abstract List<? extends Operation> get(Timer timer, PartitionGenerator generator, T key, boolean isWarmup);
     protected abstract PartitionGenerator newGenerator();
 
-    public OpDistribution get(Timing timing, boolean isWarmup)
+    public OpDistribution get(boolean isWarmup, MeasurementSink sink)
     {
         PartitionGenerator generator = newGenerator();
         List<Pair<Operation, Double>> operations = new ArrayList<>();
         for (Map.Entry<T, Double> ratio : ratios.entrySet())
         {
-            List<? extends Operation> ops = get(timing.newTimer(ratio.getKey().toString()),
+            List<? extends Operation> ops = get(new Timer(ratio.getKey().toString(), sink),
                                                 generator, ratio.getKey(), isWarmup);
             for (Operation op : ops)
                 operations.add(new Pair<>(op, ratio.getValue() / ops.size()));
@@ -76,9 +78,9 @@ public abstract class SampledOpDistributionFactory<T> implements OpDistributionF
         {
             out.add(new OpDistributionFactory()
             {
-                public OpDistribution get(Timing timing, boolean isWarmup)
+                public OpDistribution get(boolean isWarmup, MeasurementSink sink)
                 {
-                    List<? extends Operation> ops = SampledOpDistributionFactory.this.get(timing.newTimer(ratio.getKey().toString()),
+                    List<? extends Operation> ops = SampledOpDistributionFactory.this.get(new Timer(ratio.getKey().toString(), sink),
                                                                                           newGenerator(),
                                                                                           ratio.getKey(),
                                                                                           isWarmup);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java
index bb8135c..2874b4e 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterAdder.java
@@ -29,9 +29,9 @@ import org.apache.cassandra.stress.generate.Distribution;
 import org.apache.cassandra.stress.generate.DistributionFactory;
 import org.apache.cassandra.stress.generate.PartitionGenerator;
 import org.apache.cassandra.stress.generate.SeedManager;
+import org.apache.cassandra.stress.report.Timer;
 import org.apache.cassandra.stress.settings.Command;
 import org.apache.cassandra.stress.settings.StressSettings;
-import org.apache.cassandra.stress.util.Timer;
 
 public class CqlCounterAdder extends CqlOperation<Integer>
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java
index d91ab37..eb908b0 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlCounterGetter.java
@@ -27,9 +27,9 @@ import java.util.List;
 
 import org.apache.cassandra.stress.generate.PartitionGenerator;
 import org.apache.cassandra.stress.generate.SeedManager;
+import org.apache.cassandra.stress.report.Timer;
 import org.apache.cassandra.stress.settings.Command;
 import org.apache.cassandra.stress.settings.StressSettings;
-import org.apache.cassandra.stress.util.Timer;
 
 public class CqlCounterGetter extends CqlOperation<Integer>
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java
index fdf5007..3fda6c2 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlInserter.java
@@ -27,9 +27,9 @@ import java.util.List;
 
 import org.apache.cassandra.stress.generate.PartitionGenerator;
 import org.apache.cassandra.stress.generate.SeedManager;
+import org.apache.cassandra.stress.report.Timer;
 import org.apache.cassandra.stress.settings.Command;
 import org.apache.cassandra.stress.settings.StressSettings;
-import org.apache.cassandra.stress.util.Timer;
 
 public class CqlInserter extends CqlOperation<Integer>
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
index 097c1a0..647ba87 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlOperation.java
@@ -31,12 +31,12 @@ import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import org.apache.cassandra.stress.generate.PartitionGenerator;
 import org.apache.cassandra.stress.generate.SeedManager;
+import org.apache.cassandra.stress.report.Timer;
 import org.apache.cassandra.stress.settings.Command;
 import org.apache.cassandra.stress.settings.ConnectionStyle;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.JavaDriverClient;
 import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.stress.util.Timer;
 import org.apache.cassandra.thrift.Compression;
 import org.apache.cassandra.thrift.CqlResult;
 import org.apache.cassandra.thrift.CqlRow;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java
index 0ee17e9..61c6553 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/CqlReader.java
@@ -28,9 +28,9 @@ import java.util.List;
 
 import org.apache.cassandra.stress.generate.PartitionGenerator;
 import org.apache.cassandra.stress.generate.SeedManager;
+import org.apache.cassandra.stress.report.Timer;
 import org.apache.cassandra.stress.settings.Command;
 import org.apache.cassandra.stress.settings.StressSettings;
-import org.apache.cassandra.stress.util.Timer;
 
 public class CqlReader extends CqlOperation<ByteBuffer[][]>
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
index 1f9a2c8..db35504 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/PredefinedOperation.java
@@ -26,10 +26,10 @@ import java.util.concurrent.ThreadLocalRandom;
 import org.apache.cassandra.stress.Operation;
 import org.apache.cassandra.stress.generate.*;
 import org.apache.cassandra.stress.operations.PartitionOperation;
+import org.apache.cassandra.stress.report.Timer;
 import org.apache.cassandra.stress.settings.Command;
 import org.apache.cassandra.stress.settings.CqlVersion;
 import org.apache.cassandra.stress.settings.StressSettings;
-import org.apache.cassandra.stress.util.Timer;
 import org.apache.cassandra.thrift.SlicePredicate;
 import org.apache.cassandra.thrift.SliceRange;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java
index be34a07..42f8bc9 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterAdder.java
@@ -28,10 +28,10 @@ import org.apache.cassandra.stress.generate.Distribution;
 import org.apache.cassandra.stress.generate.DistributionFactory;
 import org.apache.cassandra.stress.generate.PartitionGenerator;
 import org.apache.cassandra.stress.generate.SeedManager;
+import org.apache.cassandra.stress.report.Timer;
 import org.apache.cassandra.stress.settings.Command;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.stress.util.Timer;
 import org.apache.cassandra.thrift.ColumnOrSuperColumn;
 import org.apache.cassandra.thrift.CounterColumn;
 import org.apache.cassandra.thrift.Mutation;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java
index ca81fe9..4bec3b2 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftCounterGetter.java
@@ -23,10 +23,10 @@ import java.util.List;
 
 import org.apache.cassandra.stress.generate.PartitionGenerator;
 import org.apache.cassandra.stress.generate.SeedManager;
+import org.apache.cassandra.stress.report.Timer;
 import org.apache.cassandra.stress.settings.Command;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.stress.util.Timer;
 import org.apache.cassandra.thrift.ColumnParent;
 import org.apache.cassandra.thrift.SlicePredicate;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java
index 1827c06..ecaa140 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftInserter.java
@@ -26,10 +26,10 @@ import java.util.Map;
 
 import org.apache.cassandra.stress.generate.PartitionGenerator;
 import org.apache.cassandra.stress.generate.SeedManager;
+import org.apache.cassandra.stress.report.Timer;
 import org.apache.cassandra.stress.settings.Command;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.stress.util.Timer;
 import org.apache.cassandra.thrift.Column;
 import org.apache.cassandra.thrift.ColumnOrSuperColumn;
 import org.apache.cassandra.thrift.Mutation;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java
index d77dc6a..4d530b9 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/predefined/ThriftReader.java
@@ -23,10 +23,10 @@ import java.util.List;
 
 import org.apache.cassandra.stress.generate.PartitionGenerator;
 import org.apache.cassandra.stress.generate.SeedManager;
+import org.apache.cassandra.stress.report.Timer;
 import org.apache.cassandra.stress.settings.Command;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.stress.util.Timer;
 import org.apache.cassandra.thrift.ColumnOrSuperColumn;
 import org.apache.cassandra.thrift.ColumnParent;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
index 9eebce2..96b3392 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaInsert.java
@@ -43,10 +43,10 @@ import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.io.sstable.CQLSSTableWriter;
 import org.apache.cassandra.stress.WorkManager;
 import org.apache.cassandra.stress.generate.*;
+import org.apache.cassandra.stress.report.Timer;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.JavaDriverClient;
 import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.stress.util.Timer;
 
 public class SchemaInsert extends SchemaStatement
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
index 0d8e756..2764704 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
@@ -33,10 +33,10 @@ import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.ResultSet;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.stress.generate.*;
+import org.apache.cassandra.stress.report.Timer;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.JavaDriverClient;
 import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.stress.util.Timer;
 import org.apache.cassandra.thrift.CqlResult;
 import org.apache.cassandra.thrift.ThriftConversion;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
index c83787b..ca1f5fa 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
@@ -32,9 +32,9 @@ import com.datastax.driver.core.PreparedStatement;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.stress.generate.Row;
 import org.apache.cassandra.stress.operations.PartitionOperation;
+import org.apache.cassandra.stress.report.Timer;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.JavaDriverClient;
-import org.apache.cassandra.stress.util.Timer;
 
 public abstract class SchemaStatement extends PartitionOperation
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
index 7a0f02d..f561f61 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
@@ -39,10 +39,10 @@ import org.apache.cassandra.stress.Operation;
 import org.apache.cassandra.stress.StressYaml;
 import org.apache.cassandra.stress.WorkManager;
 import org.apache.cassandra.stress.generate.TokenRangeIterator;
+import org.apache.cassandra.stress.report.Timer;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.JavaDriverClient;
 import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.stress.util.Timer;
 
 public class TokenRangeQuery extends Operation
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
index 4547a37..a731b99 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/ValidatingSchemaQuery.java
@@ -33,10 +33,10 @@ import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.stress.generate.*;
 import org.apache.cassandra.stress.generate.Row;
 import org.apache.cassandra.stress.operations.PartitionOperation;
+import org.apache.cassandra.stress.report.Timer;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.JavaDriverClient;
 import org.apache.cassandra.stress.util.ThriftClient;
-import org.apache.cassandra.stress.util.Timer;
 import org.apache.cassandra.thrift.Compression;
 import org.apache.cassandra.thrift.CqlResult;
 import org.apache.cassandra.thrift.CqlRow;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e73633cd/tools/stress/src/org/apache/cassandra/stress/report/StressMetrics.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/report/StressMetrics.java b/tools/stress/src/org/apache/cassandra/stress/report/StressMetrics.java
new file mode 100644
index 0000000..a596547
--- /dev/null
+++ b/tools/stress/src/org/apache/cassandra/stress/report/StressMetrics.java
@@ -0,0 +1,457 @@
+package org.apache.cassandra.stress.report;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import java.io.FileNotFoundException;
+import java.io.PrintStream;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramLogWriter;
+import org.apache.cassandra.stress.StressAction.Consumer;
+import org.apache.cassandra.stress.StressAction.MeasurementSink;
+import org.apache.cassandra.stress.StressAction.OpMeasurement;
+import org.apache.cassandra.stress.settings.SettingsLog.Level;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.JmxCollector;
+import org.apache.cassandra.stress.util.JmxCollector.GcStats;
+import org.apache.cassandra.stress.util.Uncertainty;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.commons.lang3.time.DurationFormatUtils;
+
+public class StressMetrics implements MeasurementSink
+{
+    private final List<Consumer> consumers = new ArrayList<>();
+    private final PrintStream output;
+    private final Thread thread;
+    private final Uncertainty rowRateUncertainty = new Uncertainty();
+    private final CountDownLatch stopped = new CountDownLatch(1);
+    private final Callable<JmxCollector.GcStats> gcStatsCollector;
+    private final HistogramLogWriter histogramWriter;
+    private final long epochNs = System.nanoTime();
+    private final long epochMs = System.currentTimeMillis();
+
+    private volatile JmxCollector.GcStats totalGcStats = new GcStats(0);
+
+    private volatile boolean stop = false;
+    private volatile boolean cancelled = false;
+
+
+    // collected data for intervals and summary
+    private final Map<String, TimingInterval> opTypeToCurrentTimingInterval = new TreeMap<>();
+    private final Map<String, TimingInterval> opTypeToSummaryTimingInterval = new TreeMap<>();
+    private final Queue<OpMeasurement> leftovers = new ArrayDeque<>();
+    private final TimingInterval totalCurrentInterval;
+    private final TimingInterval totalSummaryInterval;
+
+    public StressMetrics(PrintStream output, final long logIntervalMillis, StressSettings settings)
+    {
+        this.output = output;
+        if(settings.log.hdrFile != null)
+        {
+            try
+            {
+                histogramWriter = new HistogramLogWriter(settings.log.hdrFile);
+                histogramWriter.outputComment("Logging op latencies for Cassandra Stress");
+                histogramWriter.outputLogFormatVersion();
+                final long roundedEpoch = epochMs - (epochMs%1000);
+                histogramWriter.outputBaseTime(roundedEpoch);
+                histogramWriter.setBaseTime(roundedEpoch);
+                histogramWriter.outputStartTime(roundedEpoch);
+                histogramWriter.outputLegend();
+            }
+            catch (FileNotFoundException e)
+            {
+                throw new IllegalArgumentException(e);
+            }
+        }
+        else
+        {
+            histogramWriter = null;
+        }
+        Callable<JmxCollector.GcStats> gcStatsCollector;
+        totalGcStats = new JmxCollector.GcStats(0);
+        try
+        {
+            gcStatsCollector = new JmxCollector(settings.node.resolveAllPermitted(settings), settings.port.jmxPort);
+        }
+        catch (Throwable t)
+        {
+            if (settings.log.level == Level.VERBOSE)
+            {
+                t.printStackTrace();
+            }
+            System.err.println("Failed to connect over JMX; not collecting these stats");
+            gcStatsCollector = new Callable<JmxCollector.GcStats>()
+            {
+                public JmxCollector.GcStats call() throws Exception
+                {
+                    return totalGcStats;
+                }
+            };
+        }
+        this.gcStatsCollector = gcStatsCollector;
+        this.totalCurrentInterval = new TimingInterval(settings.rate.isFixed);
+        this.totalSummaryInterval = new TimingInterval(settings.rate.isFixed);
+        printHeader("", output);
+        thread = new Thread(() -> {
+            reportingLoop(logIntervalMillis);
+        });
+        thread.setName("StressMetrics");
+    }
+    public void start()
+    {
+        thread.start();
+    }
+
+    public void waitUntilConverges(double targetUncertainty, int minMeasurements, int maxMeasurements) throws InterruptedException
+    {
+        rowRateUncertainty.await(targetUncertainty, minMeasurements, maxMeasurements);
+    }
+
+    public void cancel()
+    {
+        cancelled = true;
+        stop = true;
+        thread.interrupt();
+        rowRateUncertainty.wakeAll();
+    }
+
+    public void stop() throws InterruptedException
+    {
+        stop = true;
+        thread.interrupt();
+        stopped.await();
+    }
+
+
+    private void reportingLoop(final long logIntervalMillis)
+    {
+        // align report timing to the nearest second
+        final long currentTimeMs = System.currentTimeMillis();
+        final long startTimeMs = currentTimeMs - (currentTimeMs % 1000);
+        // reporting interval starts rounded to the second
+        long reportingStartNs = (System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(currentTimeMs - startTimeMs));
+        final long parkIntervalNs = TimeUnit.MILLISECONDS.toNanos(logIntervalMillis);
+        try
+        {
+            while (!stop)
+            {
+                final long wakupTarget = reportingStartNs + parkIntervalNs;
+                sleepUntil(wakupTarget);
+                if (stop)
+                {
+                    break;
+                }
+                recordInterval(wakupTarget, parkIntervalNs);
+                reportingStartNs += parkIntervalNs;
+            }
+
+            final long end = System.nanoTime();
+            recordInterval(end, end - reportingStartNs);
+        }
+        catch (Exception e)
+        {
+            e.printStackTrace();
+            cancel();
+        }
+        finally
+        {
+            rowRateUncertainty.wakeAll();
+            stopped.countDown();
+        }
+    }
+
+
+    private void sleepUntil(final long until)
+    {
+        long parkFor;
+        while (!stop &&
+               (parkFor = until - System.nanoTime()) > 0)
+        {
+            LockSupport.parkNanos(parkFor);
+        }
+    }
+
+    @Override
+    public void record(String opType, long intended, long started, long ended, long rowCnt, long partitionCnt, boolean err)
+    {
+        TimingInterval current = opTypeToCurrentTimingInterval.computeIfAbsent(opType, k -> new TimingInterval(totalCurrentInterval.isFixed));
+        record(current, intended, started, ended, rowCnt, partitionCnt, err);
+    }
+
+    private void record(TimingInterval t, long intended, long started, long ended, long rowCnt, long partitionCnt, boolean err)
+    {
+        t.rowCount += rowCnt;
+        t.partitionCount += partitionCnt;
+        if (err)
+            t.errorCount++;
+        if (intended != 0) {
+            t.responseTime().recordValue(ended-intended);
+            t.waitTime().recordValue(started-intended);
+        }
+        final long sTime = ended-started;
+        t.serviceTime().recordValue(sTime);
+    }
+
+    private void recordInterval(long intervalEnd, long parkIntervalNs) throws InterruptedException
+    {
+
+        drainConsumerMeasurements(intervalEnd, parkIntervalNs);
+
+        GcStats gcStats = null;
+        try
+        {
+            gcStats = gcStatsCollector.call();
+        }
+        catch (Exception e)
+        {
+            gcStats = new GcStats(0);
+        }
+        totalGcStats = JmxCollector.GcStats.aggregate(Arrays.asList(totalGcStats, gcStats));
+
+        rowRateUncertainty.update(totalCurrentInterval.adjustedRowRate());
+        if (totalCurrentInterval.operationCount() != 0)
+        {
+            // if there's a single operation we only print the total
+            final boolean logPerOpSummaryLine = opTypeToCurrentTimingInterval.size() > 1;
+
+            for (Map.Entry<String, TimingInterval> type : opTypeToCurrentTimingInterval.entrySet())
+            {
+                final String opName = type.getKey();
+                final TimingInterval opInterval = type.getValue();
+                if (logPerOpSummaryLine)
+                {
+                    printRow("", opName, opInterval, opTypeToSummaryTimingInterval.get(opName), gcStats, rowRateUncertainty, output);
+                }
+                logHistograms(opName, opInterval);
+                opInterval.reset();
+            }
+
+            printRow("", "total", totalCurrentInterval, totalSummaryInterval, gcStats, rowRateUncertainty, output);
+            totalCurrentInterval.reset();
+        }
+    }
+
+    private void drainConsumerMeasurements(long intervalEnd, long parkIntervalNs)
+    {
+        // record leftover measurements if any
+        int leftoversSize = leftovers.size();
+        for (int i=0;i<leftoversSize;i++)
+        {
+            OpMeasurement last = leftovers.poll();
+            if (last.ended <= intervalEnd)
+            {
+                record(last.opType, last.intended, last.started, last.ended, last.rowCnt, last.partitionCnt, last.err);
+                // round robin-ish redistribution of leftovers
+                consumers.get(i%consumers.size()).measurementsRecycling.offer(last);
+            }
+            else
+            {
+                // no record for you! wait one interval!
+                leftovers.offer(last);
+            }
+        }
+        // record interval collected measurements
+        for (Consumer c: consumers) {
+            Queue<OpMeasurement> in = c.measurementsReporting;
+            Queue<OpMeasurement> out = c.measurementsRecycling;
+            OpMeasurement last;
+            while ((last = in.poll()) != null)
+            {
+                if (last.ended > intervalEnd)
+                {
+                    // measurements for any given consumer are ordered, we stop when we stop.
+                    leftovers.add(last);
+                    break;
+                }
+                record(last.opType, last.intended, last.started, last.ended, last.rowCnt, last.partitionCnt, last.err);
+                out.offer(last);
+            }
+        }
+        // set timestamps and summarize
+        for (Entry<String, TimingInterval> currPerOp : opTypeToCurrentTimingInterval.entrySet()) {
+            currPerOp.getValue().endNanos(intervalEnd);
+            currPerOp.getValue().startNanos(intervalEnd-parkIntervalNs);
+            TimingInterval summaryPerOp = opTypeToSummaryTimingInterval.computeIfAbsent(currPerOp.getKey(), k -> new TimingInterval(totalCurrentInterval.isFixed));
+            summaryPerOp.add(currPerOp.getValue());
+            totalCurrentInterval.add(currPerOp.getValue());
+        }
+        totalCurrentInterval.endNanos(intervalEnd);
+        totalCurrentInterval.startNanos(intervalEnd-parkIntervalNs);
+
+        totalSummaryInterval.add(totalCurrentInterval);
+    }
+
+
+    private void logHistograms(String opName, TimingInterval opInterval)
+    {
+        if (histogramWriter == null)
+            return;
+        final long startNs = opInterval.startNanos();
+        final long endNs = opInterval.endNanos();
+
+        logHistogram(opName + "-st", startNs, endNs, opInterval.serviceTime());
+        logHistogram(opName + "-rt", startNs, endNs, opInterval.responseTime());
+        logHistogram(opName + "-wt", startNs, endNs, opInterval.waitTime());
+    }
+
+    private void logHistogram(String opName, final long startNs, final long endNs, final Histogram histogram)
+    {
+        if (histogram.getTotalCount() != 0)
+        {
+            histogram.setTag(opName);
+            final long relativeStartNs = startNs - epochNs;
+            final long startMs = (long) (1000 *((epochMs + NANOSECONDS.toMillis(relativeStartNs))/1000.0));
+            histogram.setStartTimeStamp(startMs);
+            final long relativeEndNs = endNs - epochNs;
+            final long endMs = (long) (1000 *((epochMs + NANOSECONDS.toMillis(relativeEndNs))/1000.0));
+            histogram.setEndTimeStamp(endMs);
+            histogramWriter.outputIntervalHistogram(histogram);
+        }
+    }
+
+
+    // PRINT FORMATTING
+
+    public static final String HEADFORMAT = "%-10s%10s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%8s,%7s,%9s,%7s,%7s,%8s,%8s,%8s,%8s";
+    public static final String ROWFORMAT =  "%-10s%10d,%8.0f,%8.0f,%8.0f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%8.1f,%7.1f,%9.5f,%7d,%7.0f,%8.0f,%8.0f,%8.0f,%8.0f";
+    public static final String[] HEADMETRICS = new String[]{"type", "total ops","op/s","pk/s","row/s","mean","med",".95",".99",".999","max","time","stderr", "errors", "gc: #", "max ms", "sum ms", "sdv ms", "mb"};
+    public static final String HEAD = String.format(HEADFORMAT, (Object[]) HEADMETRICS);
+
+    private static void printHeader(String prefix, PrintStream output)
+    {
+        output.println(prefix + HEAD);
+    }
+
+    private static void printRow(String prefix, String type, TimingInterval interval, TimingInterval total, JmxCollector.GcStats gcStats, Uncertainty opRateUncertainty, PrintStream output)
+    {
+        output.println(prefix + String.format(ROWFORMAT,
+                type + ",",
+                total.operationCount(),
+                interval.opRate(),
+                interval.partitionRate(),
+                interval.rowRate(),
+                interval.meanLatencyMs(),
+                interval.medianLatencyMs(),
+                interval.latencyAtPercentileMs(95.0),
+                interval.latencyAtPercentileMs(99.0),
+                interval.latencyAtPercentileMs(99.9),
+                interval.maxLatencyMs(),
+                total.runTimeMs() / 1000f,
+                opRateUncertainty.getUncertainty(),
+                interval.errorCount,
+                gcStats.count,
+                gcStats.maxms,
+                gcStats.summs,
+                gcStats.sdvms,
+                gcStats.bytes / (1 << 20)
+        ));
+    }
+
+    public void summarise()
+    {
+        output.println("\n");
+        output.println("Results:");
+
+        TimingIntervals opHistory = new TimingIntervals(opTypeToSummaryTimingInterval);
+        TimingInterval history = this.totalSummaryInterval;
+        output.println(String.format("Op rate                   : %,8.0f op/s  %s", history.opRate(), opHistory.opRates()));
+        output.println(String.format("Partition rate            : %,8.0f pk/s  %s", history.partitionRate(), opHistory.partitionRates()));
+        output.println(String.format("Row rate                  : %,8.0f row/s %s", history.rowRate(), opHistory.rowRates()));
+        output.println(String.format("Latency mean              : %6.1f ms %s", history.meanLatencyMs(), opHistory.meanLatencies()));
+        output.println(String.format("Latency median            : %6.1f ms %s", history.medianLatencyMs(), opHistory.medianLatencies()));
+        output.println(String.format("Latency 95th percentile   : %6.1f ms %s", history.latencyAtPercentileMs(95.0), opHistory.latenciesAtPercentile(95.0)));
+        output.println(String.format("Latency 99th percentile   : %6.1f ms %s", history.latencyAtPercentileMs(99.0), opHistory.latenciesAtPercentile(99.0)));
+        output.println(String.format("Latency 99.9th percentile : %6.1f ms %s", history.latencyAtPercentileMs(99.9), opHistory.latenciesAtPercentile(99.9)));
+        output.println(String.format("Latency max               : %6.1f ms %s", history.maxLatencyMs(), opHistory.maxLatencies()));
+        output.println(String.format("Total partitions          : %,10d %s",   history.partitionCount, opHistory.partitionCounts()));
+        output.println(String.format("Total errors              : %,10d %s",   history.errorCount, opHistory.errorCounts()));
+        output.println(String.format("Total GC count            : %,1.0f", totalGcStats.count));
+        output.println(String.format("Total GC memory           : %s", FBUtilities.prettyPrintMemory((long)totalGcStats.bytes, true)));
+        output.println(String.format("Total GC time             : %,6.1f seconds", totalGcStats.summs / 1000));
+        output.println(String.format("Avg GC time               : %,6.1f ms", totalGcStats.summs / totalGcStats.count));
+        output.println(String.format("StdDev GC time            : %,6.1f ms", totalGcStats.sdvms));
+        output.println("Total operation time      : " + DurationFormatUtils.formatDuration(
+                history.runTimeMs(), "HH:mm:ss", true));
+        output.println(""); // Newline is important here to separate the aggregates section from the END or the next stress iteration
+    }
+
+    public static void summarise(List<String> ids, List<StressMetrics> summarise, PrintStream out)
+    {
+        int idLen = 0;
+        for (String id : ids)
+            idLen = Math.max(id.length(), idLen);
+        String formatstr = "%" + idLen + "s, ";
+        printHeader(String.format(formatstr, "id"), out);
+        for (int i = 0 ; i < ids.size() ; i++)
+        {
+            for (Map.Entry<String, TimingInterval> type : summarise.get(i).opTypeToSummaryTimingInterval.entrySet())
+            {
+                printRow(String.format(formatstr, ids.get(i)),
+                         type.getKey(),
+                         type.getValue(),
+                         type.getValue(),
+                         summarise.get(i).totalGcStats,
+                         summarise.get(i).rowRateUncertainty,
+                         out);
+            }
+            TimingInterval hist = summarise.get(i).totalSummaryInterval;
+            printRow(String.format(formatstr, ids.get(i)),
+                    "total",
+                    hist,
+                    hist,
+                    summarise.get(i).totalGcStats,
+                    summarise.get(i).rowRateUncertainty,
+                    out
+            );
+        }
+    }
+
+    public boolean wasCancelled()
+    {
+        return cancelled;
+    }
+
+    public void add(Consumer consumer)
+    {
+        consumers.add(consumer);
+    }
+
+    public double opRate()
+    {
+        return totalSummaryInterval.opRate();
+    }
+}