You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/03/18 11:44:58 UTC

[2/4] cassandra git commit: Introduce intra-cluster message coalescing

Introduce intra-cluster message coalescing

patch by ariel; reviewed by benedict for CASSANDRA-8692


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/82849649
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/82849649
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/82849649

Branch: refs/heads/trunk
Commit: 828496492c51d7437b690999205ecc941f41a0a9
Parents: db1a741
Author: Ariel Weisberg <ar...@datastax.com>
Authored: Wed Mar 18 10:37:28 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Mar 18 10:38:04 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/config/Config.java     |  22 +
 .../cassandra/config/DatabaseDescriptor.java    |  10 +
 .../cassandra/net/IncomingTcpConnection.java    |   5 +-
 .../cassandra/net/OutboundTcpConnection.java    | 117 +++-
 .../cassandra/utils/CoalescingStrategies.java   | 544 +++++++++++++++++++
 .../utils/NanoTimeToCurrentTimeMillis.java      |  88 +++
 .../utils/CoalescingStrategiesTest.java         | 445 +++++++++++++++
 .../utils/NanoTimeToCurrentTimeMillisTest.java  |  52 ++
 9 files changed, 1254 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/82849649/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 56a7164..68df77e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.4
+ * Introduce intra-cluster message coalescing (CASSANDRA-8692)
  * DatabaseDescriptor throws NPE when rpc_interface is used (CASSANDRA-8839)
  * Don't check if an sstable is live for offline compactions (CASSANDRA-8841)
  * Don't set clientMode in SSTableLoader (CASSANDRA-8238)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/82849649/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index fbbd1dd..378a1ad 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -39,6 +39,12 @@ import org.apache.cassandra.utils.FBUtilities;
  */
 public class Config
 {
+    /*
+     * Prefix for Java properties for internal Cassandra configuration options
+     */
+    public static final String PROPERTY_PREFIX = "cassandra.";
+
+
     public String cluster_name = "Test Cluster";
     public String authenticator;
     public String authorizer;
@@ -223,6 +229,22 @@ public class Config
     private static final CsvPreference STANDARD_SURROUNDING_SPACES_NEED_QUOTES = new CsvPreference.Builder(CsvPreference.STANDARD_PREFERENCE)
                                                                                                   .surroundingSpacesNeedQuotes(true).build();
 
+    /*
+     * Strategy to use for coalescing messages in OutboundTcpConnection.
+     * Can be fixed, movingaverage, timehorizon, disabled. Setting is case and leading/trailing
+     * whitespace insensitive. You can also specify a subclass of CoalescingStrategies.CoalescingStrategy by name.
+     */
+    public String otc_coalescing_strategy = "DISABLED";
+
+    /*
+     * How many microseconds to wait for coalescing. For fixed strategy this is the amount of time after the first
+     * messgae is received before it will be sent with any accompanying messages. For moving average this is the
+     * maximum amount of time that will be waited as well as the interval at which messages must arrive on average
+     * for coalescing to be enabled.
+     */
+    public static final int otc_coalescing_window_us_default = 200;
+    public int otc_coalescing_window_us = otc_coalescing_window_us_default;
+
     public static boolean getOutboundBindAny()
     {
         return outboundBindAny;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/82849649/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 65cec9c..d0db9f4 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1682,4 +1682,14 @@ public class DatabaseDescriptor
         String arch = System.getProperty("os.arch");
         return arch.contains("64") || arch.contains("sparcv9");
     }
+
+    public static String getOtcCoalescingStrategy()
+    {
+        return conf.otc_coalescing_strategy;
+    }
+
+    public static int getOtcCoalescingWindow()
+    {
+        return conf.otc_coalescing_window_us;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/82849649/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index ee44493..e7d434b 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -32,6 +32,7 @@ import net.jpountz.lz4.LZ4Factory;
 import net.jpountz.xxhash.XXHashFactory;
 import org.xerial.snappy.SnappyInputStream;
 
+import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.UnknownColumnFamilyException;
 import org.apache.cassandra.gms.Gossiper;
@@ -40,6 +41,8 @@ public class IncomingTcpConnection extends Thread
 {
     private static final Logger logger = LoggerFactory.getLogger(IncomingTcpConnection.class);
 
+    private static final int BUFFER_SIZE = Integer.getInteger(Config.PROPERTY_PREFIX + ".itc_buffer_size", 1024 * 4);
+
     private final int version;
     private final boolean compressed;
     private final Socket socket;
@@ -132,7 +135,7 @@ public class IncomingTcpConnection extends Thread
         }
         else
         {
-            in = new DataInputStream(new BufferedInputStream(socket.getInputStream(), 4096));
+            in = new DataInputStream(new BufferedInputStream(socket.getInputStream(), BUFFER_SIZE));
         }
 
         if (version > MessagingService.current_version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/82849649/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 4586667..2d6d4fe 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -25,10 +25,7 @@ import java.net.InetAddress;
 import java.net.Socket;
 import java.net.SocketException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -44,14 +41,18 @@ import net.jpountz.lz4.LZ4BlockOutputStream;
 import net.jpountz.lz4.LZ4Compressor;
 import net.jpountz.lz4.LZ4Factory;
 import net.jpountz.xxhash.XXHashFactory;
+
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.CoalescingStrategies;
+import org.apache.cassandra.utils.CoalescingStrategies.Coalescable;
+import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.NanoTimeToCurrentTimeMillis;
 import org.apache.cassandra.utils.UUIDGen;
 import org.xerial.snappy.SnappyOutputStream;
-
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 
@@ -61,6 +62,54 @@ public class OutboundTcpConnection extends Thread
 {
     private static final Logger logger = LoggerFactory.getLogger(OutboundTcpConnection.class);
 
+    private static final String PREFIX = Config.PROPERTY_PREFIX;
+
+    /*
+     * Enabled/disable TCP_NODELAY for intradc connections. Defaults to enabled.
+     */
+    private static final String INTRADC_TCP_NODELAY_PROPERTY = PREFIX + "otc_intradc_tcp_nodelay";
+    private static final boolean INTRADC_TCP_NODELAY = Boolean.valueOf(System.getProperty(INTRADC_TCP_NODELAY_PROPERTY, "true"));
+
+    /*
+     * Size of buffer in output stream
+     */
+    private static final String BUFFER_SIZE_PROPERTY = PREFIX + "otc_buffer_size";
+    private static final int BUFFER_SIZE = Integer.getInteger(BUFFER_SIZE_PROPERTY, 1024 * 64);
+
+    private static CoalescingStrategy newCoalescingStrategy(String displayName)
+    {
+        return CoalescingStrategies.newCoalescingStrategy(DatabaseDescriptor.getOtcCoalescingStrategy(),
+                                                          DatabaseDescriptor.getOtcCoalescingWindow(),
+                                                          logger,
+                                                          displayName);
+    }
+
+    static
+    {
+        String strategy = DatabaseDescriptor.getOtcCoalescingStrategy();
+        switch (strategy)
+        {
+        case "TIMEHORIZON":
+            break;
+        case "MOVINGAVERAGE":
+        case "FIXED":
+        case "DISABLED":
+            logger.info("OutboundTcpConnection using coalescing strategy " + strategy);
+            break;
+            default:
+                //Check that it can be loaded
+                newCoalescingStrategy("dummy");
+        }
+
+        int coalescingWindow = DatabaseDescriptor.getOtcCoalescingWindow();
+        if (coalescingWindow != Config.otc_coalescing_window_us_default)
+            logger.info("OutboundTcpConnection coalescing window set to " + coalescingWindow + "μs");
+
+        if (coalescingWindow < 0)
+            throw new ExceptionInInitializerError(
+                    "Value provided for coalescing window must be greather than 0: " + coalescingWindow);
+    }
+
     private static final MessageOut CLOSE_SENTINEL = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE);
     private volatile boolean isStopped = false;
 
@@ -74,6 +123,7 @@ public class OutboundTcpConnection extends Thread
 
     private final OutboundTcpConnectionPool poolReference;
 
+    private final CoalescingStrategy cs;
     private DataOutputStreamPlus out;
     private Socket socket;
     private volatile long completed;
@@ -85,6 +135,7 @@ public class OutboundTcpConnection extends Thread
     {
         super("WRITE-" + pool.endPoint());
         this.poolReference = pool;
+        cs = newCoalescingStrategy(pool.endPoint().getHostAddress());
     }
 
     private static boolean isLocalDC(InetAddress targetHost)
@@ -127,26 +178,27 @@ public class OutboundTcpConnection extends Thread
 
     public void run()
     {
+        final int drainedMessageSize = 128;
         // keeping list (batch) size small for now; that way we don't have an unbounded array (that we never resize)
-        final List<QueuedMessage> drainedMessages = new ArrayList<>(128);
+        final List<QueuedMessage> drainedMessages = new ArrayList<>(drainedMessageSize);
+
         outer:
         while (true)
         {
-            if (backlog.drainTo(drainedMessages, drainedMessages.size()) == 0)
+            try
             {
-                try
-                {
-                    drainedMessages.add(backlog.take());
-                }
-                catch (InterruptedException e)
-                {
-                    throw new AssertionError(e);
-                }
-
+                cs.coalesce(backlog, drainedMessages, drainedMessageSize);
             }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(e);
+            }
+
             currentMsgBufferCount = drainedMessages.size();
 
             int count = drainedMessages.size();
+            //The timestamp of the first message has already been provided to the coalescing strategy
+            //so skip logging it.
             for (QueuedMessage qm : drainedMessages)
             {
                 try
@@ -159,10 +211,11 @@ public class OutboundTcpConnection extends Thread
                             break outer;
                         continue;
                     }
-                    if (qm.isTimedOut(m.getTimeout()))
+
+                    if (qm.isTimedOut(TimeUnit.MILLISECONDS.toNanos(m.getTimeout()), System.nanoTime()))
                         dropped.incrementAndGet();
                     else if (socket != null || connect())
-                        writeConnected(qm, count == 1 && backlog.size() == 0);
+                        writeConnected(qm, count == 1 && backlog.isEmpty());
                     else
                         // clear out the queue, else gossip messages back up.
                         backlog.clear();
@@ -225,7 +278,8 @@ public class OutboundTcpConnection extends Thread
                 }
             }
 
-            writeInternal(qm.message, qm.id, qm.timestamp);
+            long timestampMillis = NanoTimeToCurrentTimeMillis.convert(qm.timestampNanos);
+            writeInternal(qm.message, qm.id, timestampMillis);
 
             completed++;
             if (flush)
@@ -325,7 +379,7 @@ public class OutboundTcpConnection extends Thread
                 socket.setKeepAlive(true);
                 if (isLocalDC(poolReference.endPoint()))
                 {
-                    socket.setTcpNoDelay(true);
+                    socket.setTcpNoDelay(INTRADC_TCP_NODELAY);
                 }
                 else
                 {
@@ -342,7 +396,7 @@ public class OutboundTcpConnection extends Thread
                         logger.warn("Failed to set send buffer size on internode socket.", se);
                     }
                 }
-                out = new DataOutputStreamPlus(new BufferedOutputStream(socket.getOutputStream(), 4096));
+                out = new DataOutputStreamPlus(new BufferedOutputStream(socket.getOutputStream(), BUFFER_SIZE));
 
                 out.writeInt(MessagingService.PROTOCOL_MAGIC);
                 writeHeader(out, targetVersion, shouldCompressConnection());
@@ -416,7 +470,7 @@ public class OutboundTcpConnection extends Thread
         }
         return false;
     }
-    
+
     private int handshakeVersion(final DataInputStream inputStream)
     {
         final AtomicInteger version = new AtomicInteger(NO_VERSION);
@@ -431,7 +485,7 @@ public class OutboundTcpConnection extends Thread
                     logger.info("Handshaking version with {}", poolReference.endPoint());
                     version.set(inputStream.readInt());
                 }
-                catch (IOException ex) 
+                catch (IOException ex)
                 {
                     final String msg = "Cannot handshake version with " + poolReference.endPoint();
                     if (logger.isTraceEnabled())
@@ -464,7 +518,7 @@ public class OutboundTcpConnection extends Thread
         while (iter.hasNext())
         {
             QueuedMessage qm = iter.next();
-            if (qm.timestamp >= System.currentTimeMillis() - qm.message.getTimeout())
+            if (qm.timestampNanos >= System.nanoTime() - qm.message.getTimeout())
                 return;
             iter.remove();
             dropped.incrementAndGet();
@@ -472,31 +526,36 @@ public class OutboundTcpConnection extends Thread
     }
 
     /** messages that have not been retried yet */
-    private static class QueuedMessage
+    private static class QueuedMessage implements Coalescable
     {
         final MessageOut<?> message;
         final int id;
-        final long timestamp;
+        final long timestampNanos;
         final boolean droppable;
 
         QueuedMessage(MessageOut<?> message, int id)
         {
             this.message = message;
             this.id = id;
-            this.timestamp = System.currentTimeMillis();
+            this.timestampNanos = System.nanoTime();
             this.droppable = MessagingService.DROPPABLE_VERBS.contains(message.verb);
         }
 
         /** don't drop a non-droppable message just because it's timestamp is expired */
-        boolean isTimedOut(long maxTime)
+        boolean isTimedOut(long maxTimeNanos, long nowNanos)
         {
-            return droppable && timestamp < System.currentTimeMillis() - maxTime;
+            return droppable && timestampNanos < nowNanos - maxTimeNanos;
         }
 
         boolean shouldRetry()
         {
             return !droppable;
         }
+
+        public long timestampNanos()
+        {
+            return timestampNanos;
+        }
     }
 
     private static class RetriedQueuedMessage extends QueuedMessage

http://git-wip-us.apache.org/repos/asf/cassandra/blob/82849649/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
new file mode 100644
index 0000000..ca1399b
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.io.util.FileUtils;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel.MapMode;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+public class CoalescingStrategies
+{
+
+    /*
+     * Log debug information at info level about what the average is and when coalescing is enabled/disabled
+     */
+    private static final String DEBUG_COALESCING_PROPERTY = Config.PROPERTY_PREFIX + "coalescing_debug";
+    private static final boolean DEBUG_COALESCING = Boolean.getBoolean(DEBUG_COALESCING_PROPERTY);
+
+    private static final String DEBUG_COALESCING_PATH_PROPERTY = Config.PROPERTY_PREFIX + "coalescing_debug_path";
+    private static final String DEBUG_COALESCING_PATH = System.getProperty(DEBUG_COALESCING_PATH_PROPERTY, "/tmp/coleascing_debug");
+
+    static {
+        if (DEBUG_COALESCING)
+        {
+            File directory = new File(DEBUG_COALESCING_PATH);
+
+            if (directory.exists())
+                FileUtils.deleteRecursive(directory);
+
+            if (!directory.mkdirs())
+                throw new ExceptionInInitializerError("Couldn't create log dir");
+        }
+    }
+
+    @VisibleForTesting
+    interface Clock
+    {
+        long nanoTime();
+    }
+
+    @VisibleForTesting
+    static Clock CLOCK = new Clock()
+    {
+        public long nanoTime()
+        {
+            return System.nanoTime();
+        }
+    };
+
+    public static interface Coalescable {
+        long timestampNanos();
+    }
+
+    @VisibleForTesting
+    static void parkLoop(long nanos)
+    {
+        long now = System.nanoTime();
+        final long timer = now + nanos;
+        do
+        {
+            LockSupport.parkNanos(timer - now);
+        }
+        while (timer - (now = System.nanoTime()) > nanos / 16);
+    }
+
+    private static boolean maybeSleep(int messages, long averageGap, long maxCoalesceWindow, Parker parker)
+    {
+        // only sleep if we can expect to double the number of messages we're sending in the time interval
+        long sleep = messages * averageGap;
+        if (sleep > maxCoalesceWindow)
+            return false;
+
+        // assume we receive as many messages as we expect; apply the same logic to the future batch:
+        // expect twice as many messages to consider sleeping for "another" interval; this basically translates
+        // to doubling our sleep period until we exceed our max sleep window
+        while (sleep * 2 < maxCoalesceWindow)
+            sleep *= 2;
+        parker.park(sleep);
+        return true;
+    }
+
+    public static abstract class CoalescingStrategy
+    {
+        protected final Parker parker;
+        protected final Logger logger;
+        protected volatile boolean shouldLogAverage = false;
+        protected final ByteBuffer logBuffer;
+        private RandomAccessFile ras;
+        private final String displayName;
+
+        protected CoalescingStrategy(Parker parker, Logger logger, String displayName)
+        {
+            this.parker = parker;
+            this.logger = logger;
+            this.displayName = displayName;
+            if (DEBUG_COALESCING)
+            {
+                new Thread(displayName + " debug thread") {
+                    @Override
+                    public void run() {
+                        while (true) {
+                            try
+                            {
+                                Thread.sleep(5000);
+                            }
+                            catch (InterruptedException e)
+                            {
+                                throw new AssertionError();
+                            }
+                            shouldLogAverage = true;
+                        }
+                    }
+                }.start();
+            }
+            RandomAccessFile rasTemp = null;
+            ByteBuffer logBufferTemp = null;
+            if (DEBUG_COALESCING)
+            {
+                try
+                {
+                    File outFile = File.createTempFile("coalescing_" + this.displayName + "_", ".log", new File(DEBUG_COALESCING_PATH));
+                    rasTemp = new RandomAccessFile(outFile, "rw");
+                    logBufferTemp = ras.getChannel().map(MapMode.READ_WRITE, 0, Integer.MAX_VALUE);
+                    logBufferTemp.putLong(0);
+                }
+                catch (Exception e)
+                {
+                    logger.error("Unable to create output file for debugging coalescing", e);
+                }
+            }
+            ras = rasTemp;
+            logBuffer = logBufferTemp;
+        }
+
+        /*
+         * If debugging is enabled log to the logger the current average gap calculation result.
+         */
+        final protected void debugGap(long averageGap)
+        {
+            if (DEBUG_COALESCING && shouldLogAverage)
+            {
+                shouldLogAverage = false;
+                logger.info(toString() + " gap " + TimeUnit.NANOSECONDS.toMicros(averageGap) + "μs");
+            }
+        }
+
+        /*
+         * If debugging is enabled log the provided nanotime timestamp to a file.
+         */
+        final protected void debugTimestamp(long timestamp)
+        {
+            if(DEBUG_COALESCING && logBuffer != null)
+            {
+                logBuffer.putLong(0, logBuffer.getLong(0) + 1);
+                logBuffer.putLong(timestamp);
+            }
+        }
+
+        /*
+         * If debugging is enabled log the timestamps of all the items in the provided collection
+         * to a file.
+         */
+        final protected <C extends Coalescable> void debugTimestamps(Collection<C> coalescables) {
+            if (DEBUG_COALESCING) {
+                for (C coalescable : coalescables) {
+                    debugTimestamp(coalescable.timestampNanos());
+                }
+            }
+        }
+
+        /**
+         * Drain from the input blocking queue to the output list up to maxItems elements.
+         *
+         * The coalescing strategy may choose to park the current thread if it thinks it will
+         * be able to produce an output list with more elements.
+         *
+         * @param input Blocking queue to retrieve elements from
+         * @param out Output list to place retrieved elements in. Must be empty.
+         * @param maxItems Maximum number of elements to place in the output list
+         */
+        public <C extends Coalescable> void coalesce(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException
+        {
+            Preconditions.checkArgument(out.isEmpty(), "out list should be empty");
+            coalesceInternal(input, out, maxItems);
+        }
+
+        protected abstract <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException;
+
+    }
+
+    @VisibleForTesting
+    interface Parker
+    {
+        void park(long nanos);
+    }
+
+    private static final Parker PARKER = new Parker()
+    {
+        @Override
+        public void park(long nanos)
+        {
+            parkLoop(nanos);
+        }
+    };
+
+    @VisibleForTesting
+    static class TimeHorizonMovingAverageCoalescingStrategy extends CoalescingStrategy
+    {
+        // for now we'll just use 64ms per bucket; this can be made configurable, but results in ~1s for 16 samples
+        private static final int INDEX_SHIFT = 26;
+        private static final long BUCKET_INTERVAL = 1L << 26;
+        private static final int BUCKET_COUNT = 16;
+        private static final long INTERVAL = BUCKET_INTERVAL * BUCKET_COUNT;
+        private static final long MEASURED_INTERVAL = BUCKET_INTERVAL * (BUCKET_COUNT - 1);
+
+        // the minimum timestamp we will now accept updates for; only moves forwards, never backwards
+        private long epoch = CLOCK.nanoTime();
+        // the buckets, each following on from epoch; the measurements run from ix(epoch) to ix(epoch - 1)
+        // ix(epoch-1) is a partial result, that is never actually part of the calculation, and most updates
+        // are expected to hit this bucket
+        private final int samples[] = new int[BUCKET_COUNT];
+        private long sum = 0;
+        private final long maxCoalesceWindow;
+
+        public TimeHorizonMovingAverageCoalescingStrategy(int maxCoalesceWindow, Parker parker, Logger logger, String displayName)
+        {
+            super(parker, logger, displayName);
+            this.maxCoalesceWindow = TimeUnit.MICROSECONDS.toNanos(maxCoalesceWindow);
+            sum = 0;
+        }
+
+        private void logSample(long nanos)
+        {
+            debugTimestamp(nanos);
+            long epoch = this.epoch;
+            long delta = nanos - epoch;
+            if (delta < 0)
+                // have to simply ignore, but would be a bit crazy to get such reordering
+                return;
+
+            if (delta > INTERVAL)
+                epoch = rollepoch(delta, epoch, nanos);
+
+            int ix = ix(nanos);
+            samples[ix]++;
+
+            // if we've updated an old bucket, we need to update the sum to match
+            if (ix != ix(epoch - 1))
+                sum++;
+        }
+
+        private long averageGap()
+        {
+            if (sum == 0)
+                return Integer.MAX_VALUE;
+            return MEASURED_INTERVAL / sum;
+        }
+
+        // this sample extends past the end of the range we cover, so rollover
+        private long rollepoch(long delta, long epoch, long nanos)
+        {
+            if (delta > 2 * INTERVAL)
+            {
+                // this sample is more than twice our interval ahead, so just clear our counters completely
+                epoch = epoch(nanos);
+                sum = 0;
+                Arrays.fill(samples, 0);
+            }
+            else
+            {
+                // ix(epoch - 1) => last index; this is our partial result bucket, so we add this to the sum
+                sum += samples[ix(epoch - 1)];
+                // then we roll forwards, clearing buckets, until our interval covers the new sample time
+                while (epoch + INTERVAL < nanos)
+                {
+                    int index = ix(epoch);
+                    sum -= samples[index];
+                    samples[index] = 0;
+                    epoch += BUCKET_INTERVAL;
+                }
+            }
+            // store the new epoch
+            this.epoch = epoch;
+            return epoch;
+        }
+
+        private long epoch(long latestNanos)
+        {
+            return (latestNanos - MEASURED_INTERVAL) & ~(BUCKET_INTERVAL - 1);
+        }
+
+        private int ix(long nanos)
+        {
+            return (int) ((nanos >>> INDEX_SHIFT) & 15);
+        }
+
+        @Override
+        protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out,  int maxItems) throws InterruptedException
+        {
+            if (input.drainTo(out, maxItems) == 0)
+            {
+                out.add(input.take());
+                input.drainTo(out, maxItems - 1);
+            }
+
+            for (Coalescable qm : out)
+                logSample(qm.timestampNanos());
+
+            long averageGap = averageGap();
+            debugGap(averageGap);
+
+            int count = out.size();
+            if (maybeSleep(count, averageGap, maxCoalesceWindow, parker))
+            {
+                input.drainTo(out, maxItems - out.size());
+                int prevCount = count;
+                count = out.size();
+                for (int  i = prevCount; i < count; i++)
+                    logSample(out.get(i).timestampNanos());
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "Time horizon moving average";
+        }
+    }
+
+    /*
+     * Start coalescing by sleeping if the moving average is < the requested window.
+     * The actual time spent waiting to coalesce will be the min( window, moving average * 2)
+     * The actual amount of time spent waiting can be greater then the window. For instance
+     * observed time spent coalescing was 400 microseconds with the window set to 200 in one benchmark.
+     */
+    @VisibleForTesting
+    static class MovingAverageCoalescingStrategy extends CoalescingStrategy
+    {
+        private final int samples[] = new int[16];
+        private long lastSample = 0;
+        private int index = 0;
+        private long sum = 0;
+
+        private final long maxCoalesceWindow;
+
+        public MovingAverageCoalescingStrategy(int maxCoalesceWindow, Parker parker, Logger logger, String displayName)
+        {
+            super(parker, logger, displayName);
+            this.maxCoalesceWindow = TimeUnit.MICROSECONDS.toNanos(maxCoalesceWindow);
+            for (int ii = 0; ii < samples.length; ii++)
+                samples[ii] = Integer.MAX_VALUE;
+            sum = Integer.MAX_VALUE * (long)samples.length;
+        }
+
+        private long logSample(int value)
+        {
+            sum -= samples[index];
+            sum += value;
+            samples[index] = value;
+            index++;
+            index = index & ((1 << 4) - 1);
+            return sum / 16;
+        }
+
+        private long notifyOfSample(long sample)
+        {
+            debugTimestamp(sample);
+            if (sample > lastSample)
+            {
+                final int delta = (int)(Math.min(Integer.MAX_VALUE, sample - lastSample));
+                lastSample = sample;
+                return logSample(delta);
+            }
+            else
+            {
+                return logSample(1);
+            }
+        }
+
+        @Override
+        protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out,  int maxItems) throws InterruptedException
+        {
+            if (input.drainTo(out, maxItems) == 0)
+            {
+                out.add(input.take());
+            }
+
+            long average = notifyOfSample(out.get(0).timestampNanos());
+
+            debugGap(average);
+
+            maybeSleep(out.size(), average, maxCoalesceWindow, parker);
+
+            input.drainTo(out, maxItems - out.size());
+            for (int ii = 1; ii < out.size(); ii++)
+                notifyOfSample(out.get(ii).timestampNanos());
+        }
+
+        @Override
+        public String toString() {
+            return "Moving average";
+        }
+    }
+
+    /*
+     * A fixed strategy as a backup in case MovingAverage or TimeHorizongMovingAverage fails in some scenario
+     */
+    @VisibleForTesting
+    static class FixedCoalescingStrategy extends CoalescingStrategy
+    {
+        private final long coalesceWindow;
+
+        public FixedCoalescingStrategy(int coalesceWindowMicros, Parker parker, Logger logger, String displayName)
+        {
+            super(parker, logger, displayName);
+            coalesceWindow = TimeUnit.MICROSECONDS.toNanos(coalesceWindowMicros);
+        }
+
+        @Override
+        protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out,  int maxItems) throws InterruptedException
+        {
+            if (input.drainTo(out, maxItems) == 0)
+            {
+                out.add(input.take());
+                parker.park(coalesceWindow);
+                input.drainTo(out, maxItems - 1);
+            }
+            debugTimestamps(out);
+        }
+
+        @Override
+        public String toString() {
+            return "Fixed";
+        }
+    }
+
+    /*
+     * A coalesscing strategy that just returns all currently available elements
+     */
+    @VisibleForTesting
+    static class DisabledCoalescingStrategy extends CoalescingStrategy
+    {
+
+        public DisabledCoalescingStrategy(int coalesceWindowMicros, Parker parker, Logger logger, String displayName)
+        {
+            super(parker, logger, displayName);
+        }
+
+        @Override
+        protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out,  int maxItems) throws InterruptedException
+        {
+            if (input.drainTo(out, maxItems) == 0)
+            {
+                out.add(input.take());
+                input.drainTo(out, maxItems - 1);
+            }
+            debugTimestamps(out);
+        }
+
+        @Override
+        public String toString() {
+            return "Disabled";
+        }
+    }
+
+    @VisibleForTesting
+    static CoalescingStrategy newCoalescingStrategy(String strategy,
+                                                    int coalesceWindow,
+                                                    Parker parker,
+                                                    Logger logger,
+                                                    String displayName)
+    {
+        String classname = null;
+        String strategyCleaned = strategy.trim().toUpperCase();
+        switch(strategyCleaned)
+        {
+        case "MOVINGAVERAGE":
+            classname = MovingAverageCoalescingStrategy.class.getName();
+            break;
+        case "FIXED":
+            classname = FixedCoalescingStrategy.class.getName();
+            break;
+        case "TIMEHORIZON":
+            classname = TimeHorizonMovingAverageCoalescingStrategy.class.getName();
+            break;
+        case "DISABLED":
+            classname = DisabledCoalescingStrategy.class.getName();
+            break;
+        default:
+            classname = strategy;
+        }
+
+        try
+        {
+            Class<?> clazz = Class.forName(classname);
+
+            if (!CoalescingStrategy.class.isAssignableFrom(clazz))
+            {
+                throw new RuntimeException(classname + " is not an instance of CoalescingStrategy");
+            }
+
+            Constructor<?> constructor = clazz.getConstructor(int.class, Parker.class, Logger.class, String.class);
+
+            return (CoalescingStrategy)constructor.newInstance(coalesceWindow, parker, logger, displayName);
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static CoalescingStrategy newCoalescingStrategy(String strategy, int coalesceWindow, Logger logger, String displayName)
+    {
+        return newCoalescingStrategy(strategy, coalesceWindow, PARKER, logger, displayName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/82849649/src/java/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillis.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillis.java b/src/java/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillis.java
new file mode 100644
index 0000000..a6c5d28
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillis.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.config.Config;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/*
+ * Convert from nanotime to non-monotonic current time millis. Beware of weaker ordering guarantees.
+ */
+public class NanoTimeToCurrentTimeMillis
+{
+    /*
+     * How often to pull a new timestamp from the system.
+     */
+    private static final String TIMESTAMP_UPDATE_INTERVAL_PROPERTY = Config.PROPERTY_PREFIX + "NANOTIMETOMILLIS_TIMESTAMP_UPDATE_INTERVAL";
+    private static final long TIMESTAMP_UPDATE_INTERVAL = Long.getLong(TIMESTAMP_UPDATE_INTERVAL_PROPERTY, 10000);
+
+    private static volatile long TIMESTAMP_BASE[] = new long[] { System.currentTimeMillis(), System.nanoTime() };
+
+    @VisibleForTesting
+    public static final Object TIMESTAMP_UPDATE = new Object();
+
+    /*
+     * System.currentTimeMillis() is 25 nanoseconds. This is 2 nanoseconds (maybe) according to JMH.
+     * Faster than calling both currentTimeMillis() and nanoTime().
+     *
+     * There is also the issue of how scalable nanoTime() and currentTimeMillis() are which is a moving target.
+     *
+     * These timestamps don't order with System.currentTimeMillis() because currentTimeMillis() can tick over
+     * before this one does. I have seen it behind by as much as 2 milliseconds.
+     */
+    public static final long convert(long nanoTime)
+    {
+        final long timestampBase[] = TIMESTAMP_BASE;
+        return timestampBase[0] + TimeUnit.NANOSECONDS.toMillis(nanoTime - timestampBase[1]);
+    }
+
+    static
+    {
+        //Pick up updates from NTP periodically
+        Thread t = new Thread("NanoTimeToCurrentTimeMillis updater")
+        {
+            @Override
+            public void run()
+            {
+                while (true)
+                {
+                    try
+                    {
+                        synchronized (TIMESTAMP_UPDATE)
+                        {
+                            TIMESTAMP_UPDATE.wait(TIMESTAMP_UPDATE_INTERVAL);
+                        }
+                    }
+                    catch (InterruptedException e)
+                    {
+                        return;
+                    }
+
+                    TIMESTAMP_BASE = new long[] {
+                            Math.max(TIMESTAMP_BASE[0], System.currentTimeMillis()),
+                            Math.max(TIMESTAMP_BASE[1], System.nanoTime()) };
+                }
+            }
+        };
+        t.setDaemon(true);
+        t.start();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/82849649/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
new file mode 100644
index 0000000..97d15fe
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import org.apache.cassandra.utils.CoalescingStrategies.Clock;
+import org.apache.cassandra.utils.CoalescingStrategies.Coalescable;
+import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy;
+import org.apache.cassandra.utils.CoalescingStrategies.Parker;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+
+import static org.junit.Assert.*;
+
+public class CoalescingStrategiesTest
+{
+
+    static final ExecutorService ex = Executors.newSingleThreadExecutor();
+
+    private static final Logger logger = LoggerFactory.getLogger(CoalescingStrategiesTest.class);
+
+    static class MockParker implements Parker
+    {
+        Queue<Long> parks = new ArrayDeque<Long>();
+        Semaphore permits = new Semaphore(0);
+
+        Semaphore parked = new Semaphore(0);
+
+        public void park(long nanos)
+        {
+            parks.offer(nanos);
+            parked.release();
+            try
+            {
+                permits.acquire();
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    static class SimpleCoalescable implements Coalescable
+    {
+        final long timestampNanos;
+
+        SimpleCoalescable(long timestampNanos)
+        {
+            this.timestampNanos = timestampNanos;
+        }
+
+        public long timestampNanos()
+        {
+            return timestampNanos;
+        }
+    }
+
+
+    static long toNanos(long micros)
+    {
+        return TimeUnit.MICROSECONDS.toNanos(micros);
+    }
+
+    MockParker parker;
+
+    BlockingQueue<SimpleCoalescable> input;
+    List<SimpleCoalescable> output;
+
+    CoalescingStrategy cs;
+
+    Semaphore queueParked = new Semaphore(0);
+    Semaphore queueRelease = new Semaphore(0);
+
+    @SuppressWarnings({ "serial" })
+    @Before
+    public void setUp() throws Exception
+    {
+        cs = null;
+        CoalescingStrategies.CLOCK = new Clock()
+        {
+            @Override
+            public long nanoTime()
+            {
+                return 0;
+            }
+        };
+
+        parker = new MockParker();
+        input = new LinkedBlockingQueue<SimpleCoalescable>()
+                {
+            @Override
+            public SimpleCoalescable take() throws InterruptedException
+            {
+                queueParked.release();
+                queueRelease.acquire();
+                return super.take();
+            }
+        };
+        output = new ArrayList<>(128);
+
+        clear();
+    }
+
+    CoalescingStrategy newStrategy(String name, int window)
+    {
+        return CoalescingStrategies.newCoalescingStrategy(name, window, parker, logger, "Stupendopotamus");
+    }
+
+    void add(long whenMicros)
+    {
+        input.offer(new SimpleCoalescable(toNanos(whenMicros)));
+    }
+
+    void clear()
+    {
+        output.clear();
+        input.clear();
+        parker.parks.clear();
+        parker.parked.drainPermits();
+        parker.permits.drainPermits();
+        queueParked.drainPermits();
+        queueRelease.drainPermits();
+    }
+
+    void release() throws Exception
+    {
+        queueRelease.release();
+        parker.permits.release();
+        fut.get();
+    }
+
+    Future<?> fut;
+    void runBlocker(Semaphore waitFor) throws Exception
+    {
+        fut = ex.submit(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                try
+                {
+                    cs.coalesce(input, output, 128);
+                }
+                catch (Exception ex)
+                {
+                    ex.printStackTrace();
+                    throw new RuntimeException(ex);
+                }
+            }
+        });
+        waitFor.acquire();
+    }
+
+    @Test
+    public void testFixedCoalescingStrategy() throws Exception
+    {
+        cs = newStrategy("FIXED", 200);
+
+        //Test that when a stream of messages continues arriving it keeps sending until all are drained
+        //It does this because it is already awake and sending messages
+        add(42);
+        add(42);
+        cs.coalesce(input, output, 128);
+        assertEquals( 2, output.size());
+        assertNull(parker.parks.poll());
+
+        clear();
+
+        runBlocker(queueParked);
+        add(42);
+        add(42);
+        add(42);
+        release();
+        assertEquals( 3, output.size());
+        assertEquals(toNanos(200), parker.parks.poll().longValue());
+
+    }
+
+    @Test
+    public void testDisabledCoalescingStrateg() throws Exception
+    {
+        cs = newStrategy("DISABLED", 200);
+
+        add(42);
+        add(42);
+        cs.coalesce(input, output, 128);
+        assertEquals( 2, output.size());
+        assertNull(parker.parks.poll());
+
+        clear();
+
+        runBlocker(queueParked);
+        add(42);
+        add(42);
+        release();
+        assertEquals( 2, output.size());
+        assertNull(parker.parks.poll());
+    }
+
+    @Test
+    public void parkLoop() throws Exception
+   {
+        final Thread current = Thread.currentThread();
+        final Semaphore helperReady = new Semaphore(0);
+        final Semaphore helperGo = new Semaphore(0);
+
+        new Thread()
+        {
+            @Override
+            public void run()
+            {
+                try
+                {
+                    helperReady.release();
+                    helperGo.acquire();
+                    Thread.sleep(50);
+                    LockSupport.unpark(current);
+                }
+                catch (Exception e)
+                {
+                    e.printStackTrace();
+                    logger.error("Error", e);
+                    System.exit(-1);
+                }
+            }
+        }.start();
+
+        long start = System.nanoTime();
+        helperGo.release();
+
+        long parkNanos = TimeUnit.MILLISECONDS.toNanos(500);
+
+        CoalescingStrategies.parkLoop(parkNanos);
+        long delta = System.nanoTime() - start;
+
+        assertTrue (delta >= (parkNanos - (parkNanos / 16)));
+    }
+
+    @Test
+    public void testMovingAverageCoalescingStrategy() throws Exception
+    {
+        cs = newStrategy("org.apache.cassandra.utils.CoalescingStrategies$MovingAverageCoalescingStrategy", 200);
+
+
+        //Test that things can be pulled out of the queue if it is non-empty
+        add(201);
+        add(401);
+        cs.coalesce(input, output, 128);
+        assertEquals( 2, output.size());
+        assertNull(parker.parks.poll());
+
+        //Test that blocking on the queue results in everything drained
+        clear();
+
+        runBlocker(queueParked);
+        add(601);
+        add(801);
+        release();
+        assertEquals( 2, output.size());
+        assertNull(parker.parks.poll());
+
+        clear();
+
+        //Test that out of order samples still flow
+        runBlocker(queueParked);
+        add(0);
+        release();
+        assertEquals( 1, output.size());
+        assertNull(parker.parks.poll());
+
+        clear();
+
+        add(0);
+        cs.coalesce(input, output, 128);
+        assertEquals( 1, output.size());
+        assertNull(parker.parks.poll());
+
+        clear();
+
+        //Test that too high an average doesn't coalesce
+        for (long ii = 0; ii < 128; ii++)
+            add(ii * 1000);
+        cs.coalesce(input, output, 128);
+        assertEquals(output.size(), 128);
+        assertTrue(parker.parks.isEmpty());
+
+        clear();
+
+        runBlocker(queueParked);
+        add(129 * 1000);
+        release();
+        assertTrue(parker.parks.isEmpty());
+
+        clear();
+
+        //Test that a low enough average coalesces
+        cs = newStrategy("MOVINGAVERAGE", 200);
+        for (long ii = 0; ii < 128; ii++)
+            add(ii * 99);
+        cs.coalesce(input, output, 128);
+        assertEquals(output.size(), 128);
+        assertTrue(parker.parks.isEmpty());
+
+        clear();
+
+        runBlocker(queueParked);
+        add(128 * 99);
+        add(129 * 99);
+        release();
+        assertEquals(2, output.size());
+        assertEquals(toNanos(198), parker.parks.poll().longValue());
+    }
+
+    @Test
+    public void testTimeHorizonStrategy() throws Exception
+    {
+        cs = newStrategy("TIMEHORIZON", 200);
+
+        //Test that things can be pulled out of the queue if it is non-empty
+        add(201);
+        add(401);
+        cs.coalesce(input, output, 128);
+        assertEquals( 2, output.size());
+        assertNull(parker.parks.poll());
+
+        //Test that blocking on the queue results in everything drained
+        clear();
+
+        runBlocker(queueParked);
+        add(601);
+        add(801);
+        release();
+        assertEquals( 2, output.size());
+        assertNull(parker.parks.poll());
+
+        clear();
+
+        //Test that out of order samples still flow
+        runBlocker(queueParked);
+        add(0);
+        release();
+        assertEquals( 1, output.size());
+        assertNull(parker.parks.poll());
+
+        clear();
+
+        add(0);
+        cs.coalesce(input, output, 128);
+        assertEquals( 1, output.size());
+        assertNull(parker.parks.poll());
+
+        clear();
+
+        //Test that too high an average doesn't coalesce
+        for (long ii = 0; ii < 128; ii++)
+            add(ii * 1000);
+        cs.coalesce(input, output, 128);
+        assertEquals(output.size(), 128);
+        assertTrue(parker.parks.isEmpty());
+
+        clear();
+
+        runBlocker(queueParked);
+        add(129 * 1000);
+        release();
+        assertTrue(parker.parks.isEmpty());
+
+        clear();
+
+        //Test that a low enough average coalesces
+        cs = newStrategy("TIMEHORIZON", 200);
+        primeTimeHorizonAverage(99);
+
+        clear();
+
+        runBlocker(queueParked);
+        add(100000 * 99);
+        queueRelease.release();
+        parker.parked.acquire();
+        add(100001 * 99);
+        parker.permits.release();
+        fut.get();
+        assertEquals(2, output.size());
+        assertEquals(toNanos(198), parker.parks.poll().longValue());
+
+        clear();
+
+        //Test far future
+        add(Integer.MAX_VALUE);
+        cs.coalesce(input, output, 128);
+        assertEquals(1, output.size());
+        assertTrue(parker.parks.isEmpty());
+
+        clear();
+
+        //Distant past
+        add(0);
+        cs.coalesce(input, output, 128);
+        assertEquals(1, output.size());
+        assertTrue(parker.parks.isEmpty());
+    }
+
+    void primeTimeHorizonAverage(long micros) throws Exception
+    {
+        for (long ii = 0; ii < 100000; ii++)
+        {
+            add(ii * micros);
+            if (ii % 128 == 0)
+            {
+                cs.coalesce(input, output, 128);
+                output.clear();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/82849649/test/unit/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillisTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillisTest.java b/test/unit/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillisTest.java
new file mode 100644
index 0000000..5c025cf
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillisTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class NanoTimeToCurrentTimeMillisTest
+{
+    @Test
+    public void testTimestampOrdering() throws Exception
+    {
+        long nowNanos = System.nanoTime();
+        long now = System.currentTimeMillis();
+        long lastConverted = 0;
+        for (long ii = 0; ii < 10000000; ii++)
+        {
+            now = Math.max(now, System.currentTimeMillis());
+            if (ii % 10000 == 0)
+            {
+                synchronized (NanoTimeToCurrentTimeMillis.TIMESTAMP_UPDATE)
+                {
+                    NanoTimeToCurrentTimeMillis.TIMESTAMP_UPDATE.notify();
+                }
+                Thread.sleep(1);
+            }
+            nowNanos = Math.max(now, System.nanoTime());
+            long convertedNow = NanoTimeToCurrentTimeMillis.convert(nowNanos);
+            assertTrue("convertedNow = " + convertedNow + " lastConverted = " + lastConverted + " in iteration " + ii, convertedNow >= (lastConverted - 1));
+            lastConverted = convertedNow;
+            //Seems to be off by as much as two milliseconds sadly
+            assertTrue("now = " + now + " convertedNow = " + convertedNow + " in iteration " + ii, (now - 2) <= convertedNow);
+
+        }
+    }
+}