You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/03/18 11:44:58 UTC
[2/4] cassandra git commit: Introduce intra-cluster message coalescing
Introduce intra-cluster message coalescing
patch by ariel; reviewed by benedict for CASSANDRA-8692
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/82849649
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/82849649
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/82849649
Branch: refs/heads/trunk
Commit: 828496492c51d7437b690999205ecc941f41a0a9
Parents: db1a741
Author: Ariel Weisberg <ar...@datastax.com>
Authored: Wed Mar 18 10:37:28 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Mar 18 10:38:04 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/config/Config.java | 22 +
.../cassandra/config/DatabaseDescriptor.java | 10 +
.../cassandra/net/IncomingTcpConnection.java | 5 +-
.../cassandra/net/OutboundTcpConnection.java | 117 +++-
.../cassandra/utils/CoalescingStrategies.java | 544 +++++++++++++++++++
.../utils/NanoTimeToCurrentTimeMillis.java | 88 +++
.../utils/CoalescingStrategiesTest.java | 445 +++++++++++++++
.../utils/NanoTimeToCurrentTimeMillisTest.java | 52 ++
9 files changed, 1254 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/82849649/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 56a7164..68df77e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.4
+ * Introduce intra-cluster message coalescing (CASSANDRA-8692)
* DatabaseDescriptor throws NPE when rpc_interface is used (CASSANDRA-8839)
* Don't check if an sstable is live for offline compactions (CASSANDRA-8841)
* Don't set clientMode in SSTableLoader (CASSANDRA-8238)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/82849649/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index fbbd1dd..378a1ad 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -39,6 +39,12 @@ import org.apache.cassandra.utils.FBUtilities;
*/
public class Config
{
+ /*
+ * Prefix for Java properties for internal Cassandra configuration options
+ */
+ public static final String PROPERTY_PREFIX = "cassandra.";
+
+
public String cluster_name = "Test Cluster";
public String authenticator;
public String authorizer;
@@ -223,6 +229,22 @@ public class Config
private static final CsvPreference STANDARD_SURROUNDING_SPACES_NEED_QUOTES = new CsvPreference.Builder(CsvPreference.STANDARD_PREFERENCE)
.surroundingSpacesNeedQuotes(true).build();
+ /*
+ * Strategy to use for coalescing messages in OutboundTcpConnection.
+ * Can be fixed, movingaverage, timehorizon, disabled. Setting is case and leading/trailing
+ * whitespace insensitive. You can also specify a subclass of CoalescingStrategies.CoalescingStrategy by name.
+ */
+ public String otc_coalescing_strategy = "DISABLED";
+
+ /*
+ * How many microseconds to wait for coalescing. For fixed strategy this is the amount of time after the first
+ * messgae is received before it will be sent with any accompanying messages. For moving average this is the
+ * maximum amount of time that will be waited as well as the interval at which messages must arrive on average
+ * for coalescing to be enabled.
+ */
+ public static final int otc_coalescing_window_us_default = 200;
+ public int otc_coalescing_window_us = otc_coalescing_window_us_default;
+
public static boolean getOutboundBindAny()
{
return outboundBindAny;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/82849649/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 65cec9c..d0db9f4 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1682,4 +1682,14 @@ public class DatabaseDescriptor
String arch = System.getProperty("os.arch");
return arch.contains("64") || arch.contains("sparcv9");
}
+
+ public static String getOtcCoalescingStrategy()
+ {
+ return conf.otc_coalescing_strategy;
+ }
+
+ public static int getOtcCoalescingWindow()
+ {
+ return conf.otc_coalescing_window_us;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/82849649/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index ee44493..e7d434b 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -32,6 +32,7 @@ import net.jpountz.lz4.LZ4Factory;
import net.jpountz.xxhash.XXHashFactory;
import org.xerial.snappy.SnappyInputStream;
+import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.UnknownColumnFamilyException;
import org.apache.cassandra.gms.Gossiper;
@@ -40,6 +41,8 @@ public class IncomingTcpConnection extends Thread
{
private static final Logger logger = LoggerFactory.getLogger(IncomingTcpConnection.class);
+ private static final int BUFFER_SIZE = Integer.getInteger(Config.PROPERTY_PREFIX + ".itc_buffer_size", 1024 * 4);
+
private final int version;
private final boolean compressed;
private final Socket socket;
@@ -132,7 +135,7 @@ public class IncomingTcpConnection extends Thread
}
else
{
- in = new DataInputStream(new BufferedInputStream(socket.getInputStream(), 4096));
+ in = new DataInputStream(new BufferedInputStream(socket.getInputStream(), BUFFER_SIZE));
}
if (version > MessagingService.current_version)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/82849649/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 4586667..2d6d4fe 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -25,10 +25,7 @@ import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
@@ -44,14 +41,18 @@ import net.jpountz.lz4.LZ4BlockOutputStream;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.xxhash.XXHashFactory;
+
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.CoalescingStrategies;
+import org.apache.cassandra.utils.CoalescingStrategies.Coalescable;
+import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.NanoTimeToCurrentTimeMillis;
import org.apache.cassandra.utils.UUIDGen;
import org.xerial.snappy.SnappyOutputStream;
-
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -61,6 +62,54 @@ public class OutboundTcpConnection extends Thread
{
private static final Logger logger = LoggerFactory.getLogger(OutboundTcpConnection.class);
+ private static final String PREFIX = Config.PROPERTY_PREFIX;
+
+ /*
+ * Enabled/disable TCP_NODELAY for intradc connections. Defaults to enabled.
+ */
+ private static final String INTRADC_TCP_NODELAY_PROPERTY = PREFIX + "otc_intradc_tcp_nodelay";
+ private static final boolean INTRADC_TCP_NODELAY = Boolean.valueOf(System.getProperty(INTRADC_TCP_NODELAY_PROPERTY, "true"));
+
+ /*
+ * Size of buffer in output stream
+ */
+ private static final String BUFFER_SIZE_PROPERTY = PREFIX + "otc_buffer_size";
+ private static final int BUFFER_SIZE = Integer.getInteger(BUFFER_SIZE_PROPERTY, 1024 * 64);
+
+ private static CoalescingStrategy newCoalescingStrategy(String displayName)
+ {
+ return CoalescingStrategies.newCoalescingStrategy(DatabaseDescriptor.getOtcCoalescingStrategy(),
+ DatabaseDescriptor.getOtcCoalescingWindow(),
+ logger,
+ displayName);
+ }
+
+ static
+ {
+ String strategy = DatabaseDescriptor.getOtcCoalescingStrategy();
+ switch (strategy)
+ {
+ case "TIMEHORIZON":
+ break;
+ case "MOVINGAVERAGE":
+ case "FIXED":
+ case "DISABLED":
+ logger.info("OutboundTcpConnection using coalescing strategy " + strategy);
+ break;
+ default:
+ //Check that it can be loaded
+ newCoalescingStrategy("dummy");
+ }
+
+ int coalescingWindow = DatabaseDescriptor.getOtcCoalescingWindow();
+ if (coalescingWindow != Config.otc_coalescing_window_us_default)
+ logger.info("OutboundTcpConnection coalescing window set to " + coalescingWindow + "μs");
+
+ if (coalescingWindow < 0)
+ throw new ExceptionInInitializerError(
+ "Value provided for coalescing window must be greather than 0: " + coalescingWindow);
+ }
+
private static final MessageOut CLOSE_SENTINEL = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE);
private volatile boolean isStopped = false;
@@ -74,6 +123,7 @@ public class OutboundTcpConnection extends Thread
private final OutboundTcpConnectionPool poolReference;
+ private final CoalescingStrategy cs;
private DataOutputStreamPlus out;
private Socket socket;
private volatile long completed;
@@ -85,6 +135,7 @@ public class OutboundTcpConnection extends Thread
{
super("WRITE-" + pool.endPoint());
this.poolReference = pool;
+ cs = newCoalescingStrategy(pool.endPoint().getHostAddress());
}
private static boolean isLocalDC(InetAddress targetHost)
@@ -127,26 +178,27 @@ public class OutboundTcpConnection extends Thread
public void run()
{
+ final int drainedMessageSize = 128;
// keeping list (batch) size small for now; that way we don't have an unbounded array (that we never resize)
- final List<QueuedMessage> drainedMessages = new ArrayList<>(128);
+ final List<QueuedMessage> drainedMessages = new ArrayList<>(drainedMessageSize);
+
outer:
while (true)
{
- if (backlog.drainTo(drainedMessages, drainedMessages.size()) == 0)
+ try
{
- try
- {
- drainedMessages.add(backlog.take());
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
-
+ cs.coalesce(backlog, drainedMessages, drainedMessageSize);
}
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+
currentMsgBufferCount = drainedMessages.size();
int count = drainedMessages.size();
+ //The timestamp of the first message has already been provided to the coalescing strategy
+ //so skip logging it.
for (QueuedMessage qm : drainedMessages)
{
try
@@ -159,10 +211,11 @@ public class OutboundTcpConnection extends Thread
break outer;
continue;
}
- if (qm.isTimedOut(m.getTimeout()))
+
+ if (qm.isTimedOut(TimeUnit.MILLISECONDS.toNanos(m.getTimeout()), System.nanoTime()))
dropped.incrementAndGet();
else if (socket != null || connect())
- writeConnected(qm, count == 1 && backlog.size() == 0);
+ writeConnected(qm, count == 1 && backlog.isEmpty());
else
// clear out the queue, else gossip messages back up.
backlog.clear();
@@ -225,7 +278,8 @@ public class OutboundTcpConnection extends Thread
}
}
- writeInternal(qm.message, qm.id, qm.timestamp);
+ long timestampMillis = NanoTimeToCurrentTimeMillis.convert(qm.timestampNanos);
+ writeInternal(qm.message, qm.id, timestampMillis);
completed++;
if (flush)
@@ -325,7 +379,7 @@ public class OutboundTcpConnection extends Thread
socket.setKeepAlive(true);
if (isLocalDC(poolReference.endPoint()))
{
- socket.setTcpNoDelay(true);
+ socket.setTcpNoDelay(INTRADC_TCP_NODELAY);
}
else
{
@@ -342,7 +396,7 @@ public class OutboundTcpConnection extends Thread
logger.warn("Failed to set send buffer size on internode socket.", se);
}
}
- out = new DataOutputStreamPlus(new BufferedOutputStream(socket.getOutputStream(), 4096));
+ out = new DataOutputStreamPlus(new BufferedOutputStream(socket.getOutputStream(), BUFFER_SIZE));
out.writeInt(MessagingService.PROTOCOL_MAGIC);
writeHeader(out, targetVersion, shouldCompressConnection());
@@ -416,7 +470,7 @@ public class OutboundTcpConnection extends Thread
}
return false;
}
-
+
private int handshakeVersion(final DataInputStream inputStream)
{
final AtomicInteger version = new AtomicInteger(NO_VERSION);
@@ -431,7 +485,7 @@ public class OutboundTcpConnection extends Thread
logger.info("Handshaking version with {}", poolReference.endPoint());
version.set(inputStream.readInt());
}
- catch (IOException ex)
+ catch (IOException ex)
{
final String msg = "Cannot handshake version with " + poolReference.endPoint();
if (logger.isTraceEnabled())
@@ -464,7 +518,7 @@ public class OutboundTcpConnection extends Thread
while (iter.hasNext())
{
QueuedMessage qm = iter.next();
- if (qm.timestamp >= System.currentTimeMillis() - qm.message.getTimeout())
+ if (qm.timestampNanos >= System.nanoTime() - qm.message.getTimeout())
return;
iter.remove();
dropped.incrementAndGet();
@@ -472,31 +526,36 @@ public class OutboundTcpConnection extends Thread
}
/** messages that have not been retried yet */
- private static class QueuedMessage
+ private static class QueuedMessage implements Coalescable
{
final MessageOut<?> message;
final int id;
- final long timestamp;
+ final long timestampNanos;
final boolean droppable;
QueuedMessage(MessageOut<?> message, int id)
{
this.message = message;
this.id = id;
- this.timestamp = System.currentTimeMillis();
+ this.timestampNanos = System.nanoTime();
this.droppable = MessagingService.DROPPABLE_VERBS.contains(message.verb);
}
/** don't drop a non-droppable message just because it's timestamp is expired */
- boolean isTimedOut(long maxTime)
+ boolean isTimedOut(long maxTimeNanos, long nowNanos)
{
- return droppable && timestamp < System.currentTimeMillis() - maxTime;
+ return droppable && timestampNanos < nowNanos - maxTimeNanos;
}
boolean shouldRetry()
{
return !droppable;
}
+
+ public long timestampNanos()
+ {
+ return timestampNanos;
+ }
}
private static class RetriedQueuedMessage extends QueuedMessage
http://git-wip-us.apache.org/repos/asf/cassandra/blob/82849649/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
new file mode 100644
index 0000000..ca1399b
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.io.util.FileUtils;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel.MapMode;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+public class CoalescingStrategies
+{
+
+ /*
+ * Log debug information at info level about what the average is and when coalescing is enabled/disabled
+ */
+ private static final String DEBUG_COALESCING_PROPERTY = Config.PROPERTY_PREFIX + "coalescing_debug";
+ private static final boolean DEBUG_COALESCING = Boolean.getBoolean(DEBUG_COALESCING_PROPERTY);
+
+ private static final String DEBUG_COALESCING_PATH_PROPERTY = Config.PROPERTY_PREFIX + "coalescing_debug_path";
+ private static final String DEBUG_COALESCING_PATH = System.getProperty(DEBUG_COALESCING_PATH_PROPERTY, "/tmp/coleascing_debug");
+
+ static {
+ if (DEBUG_COALESCING)
+ {
+ File directory = new File(DEBUG_COALESCING_PATH);
+
+ if (directory.exists())
+ FileUtils.deleteRecursive(directory);
+
+ if (!directory.mkdirs())
+ throw new ExceptionInInitializerError("Couldn't create log dir");
+ }
+ }
+
+ @VisibleForTesting
+ interface Clock
+ {
+ long nanoTime();
+ }
+
+ @VisibleForTesting
+ static Clock CLOCK = new Clock()
+ {
+ public long nanoTime()
+ {
+ return System.nanoTime();
+ }
+ };
+
+ public static interface Coalescable {
+ long timestampNanos();
+ }
+
+ @VisibleForTesting
+ static void parkLoop(long nanos)
+ {
+ long now = System.nanoTime();
+ final long timer = now + nanos;
+ do
+ {
+ LockSupport.parkNanos(timer - now);
+ }
+ while (timer - (now = System.nanoTime()) > nanos / 16);
+ }
+
+ private static boolean maybeSleep(int messages, long averageGap, long maxCoalesceWindow, Parker parker)
+ {
+ // only sleep if we can expect to double the number of messages we're sending in the time interval
+ long sleep = messages * averageGap;
+ if (sleep > maxCoalesceWindow)
+ return false;
+
+ // assume we receive as many messages as we expect; apply the same logic to the future batch:
+ // expect twice as many messages to consider sleeping for "another" interval; this basically translates
+ // to doubling our sleep period until we exceed our max sleep window
+ while (sleep * 2 < maxCoalesceWindow)
+ sleep *= 2;
+ parker.park(sleep);
+ return true;
+ }
+
+ public static abstract class CoalescingStrategy
+ {
+ protected final Parker parker;
+ protected final Logger logger;
+ protected volatile boolean shouldLogAverage = false;
+ protected final ByteBuffer logBuffer;
+ private RandomAccessFile ras;
+ private final String displayName;
+
+ protected CoalescingStrategy(Parker parker, Logger logger, String displayName)
+ {
+ this.parker = parker;
+ this.logger = logger;
+ this.displayName = displayName;
+ if (DEBUG_COALESCING)
+ {
+ new Thread(displayName + " debug thread") {
+ @Override
+ public void run() {
+ while (true) {
+ try
+ {
+ Thread.sleep(5000);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError();
+ }
+ shouldLogAverage = true;
+ }
+ }
+ }.start();
+ }
+ RandomAccessFile rasTemp = null;
+ ByteBuffer logBufferTemp = null;
+ if (DEBUG_COALESCING)
+ {
+ try
+ {
+ File outFile = File.createTempFile("coalescing_" + this.displayName + "_", ".log", new File(DEBUG_COALESCING_PATH));
+ rasTemp = new RandomAccessFile(outFile, "rw");
+ logBufferTemp = ras.getChannel().map(MapMode.READ_WRITE, 0, Integer.MAX_VALUE);
+ logBufferTemp.putLong(0);
+ }
+ catch (Exception e)
+ {
+ logger.error("Unable to create output file for debugging coalescing", e);
+ }
+ }
+ ras = rasTemp;
+ logBuffer = logBufferTemp;
+ }
+
+ /*
+ * If debugging is enabled log to the logger the current average gap calculation result.
+ */
+ final protected void debugGap(long averageGap)
+ {
+ if (DEBUG_COALESCING && shouldLogAverage)
+ {
+ shouldLogAverage = false;
+ logger.info(toString() + " gap " + TimeUnit.NANOSECONDS.toMicros(averageGap) + "μs");
+ }
+ }
+
+ /*
+ * If debugging is enabled log the provided nanotime timestamp to a file.
+ */
+ final protected void debugTimestamp(long timestamp)
+ {
+ if(DEBUG_COALESCING && logBuffer != null)
+ {
+ logBuffer.putLong(0, logBuffer.getLong(0) + 1);
+ logBuffer.putLong(timestamp);
+ }
+ }
+
+ /*
+ * If debugging is enabled log the timestamps of all the items in the provided collection
+ * to a file.
+ */
+ final protected <C extends Coalescable> void debugTimestamps(Collection<C> coalescables) {
+ if (DEBUG_COALESCING) {
+ for (C coalescable : coalescables) {
+ debugTimestamp(coalescable.timestampNanos());
+ }
+ }
+ }
+
+ /**
+ * Drain from the input blocking queue to the output list up to maxItems elements.
+ *
+ * The coalescing strategy may choose to park the current thread if it thinks it will
+ * be able to produce an output list with more elements.
+ *
+ * @param input Blocking queue to retrieve elements from
+ * @param out Output list to place retrieved elements in. Must be empty.
+ * @param maxItems Maximum number of elements to place in the output list
+ */
+ public <C extends Coalescable> void coalesce(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException
+ {
+ Preconditions.checkArgument(out.isEmpty(), "out list should be empty");
+ coalesceInternal(input, out, maxItems);
+ }
+
+ protected abstract <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException;
+
+ }
+
+ @VisibleForTesting
+ interface Parker
+ {
+ void park(long nanos);
+ }
+
+ private static final Parker PARKER = new Parker()
+ {
+ @Override
+ public void park(long nanos)
+ {
+ parkLoop(nanos);
+ }
+ };
+
+ @VisibleForTesting
+ static class TimeHorizonMovingAverageCoalescingStrategy extends CoalescingStrategy
+ {
+ // for now we'll just use 64ms per bucket; this can be made configurable, but results in ~1s for 16 samples
+ private static final int INDEX_SHIFT = 26;
+ private static final long BUCKET_INTERVAL = 1L << 26;
+ private static final int BUCKET_COUNT = 16;
+ private static final long INTERVAL = BUCKET_INTERVAL * BUCKET_COUNT;
+ private static final long MEASURED_INTERVAL = BUCKET_INTERVAL * (BUCKET_COUNT - 1);
+
+ // the minimum timestamp we will now accept updates for; only moves forwards, never backwards
+ private long epoch = CLOCK.nanoTime();
+ // the buckets, each following on from epoch; the measurements run from ix(epoch) to ix(epoch - 1)
+ // ix(epoch-1) is a partial result, that is never actually part of the calculation, and most updates
+ // are expected to hit this bucket
+ private final int samples[] = new int[BUCKET_COUNT];
+ private long sum = 0;
+ private final long maxCoalesceWindow;
+
+ public TimeHorizonMovingAverageCoalescingStrategy(int maxCoalesceWindow, Parker parker, Logger logger, String displayName)
+ {
+ super(parker, logger, displayName);
+ this.maxCoalesceWindow = TimeUnit.MICROSECONDS.toNanos(maxCoalesceWindow);
+ sum = 0;
+ }
+
+ private void logSample(long nanos)
+ {
+ debugTimestamp(nanos);
+ long epoch = this.epoch;
+ long delta = nanos - epoch;
+ if (delta < 0)
+ // have to simply ignore, but would be a bit crazy to get such reordering
+ return;
+
+ if (delta > INTERVAL)
+ epoch = rollepoch(delta, epoch, nanos);
+
+ int ix = ix(nanos);
+ samples[ix]++;
+
+ // if we've updated an old bucket, we need to update the sum to match
+ if (ix != ix(epoch - 1))
+ sum++;
+ }
+
+ private long averageGap()
+ {
+ if (sum == 0)
+ return Integer.MAX_VALUE;
+ return MEASURED_INTERVAL / sum;
+ }
+
+ // this sample extends past the end of the range we cover, so rollover
+ private long rollepoch(long delta, long epoch, long nanos)
+ {
+ if (delta > 2 * INTERVAL)
+ {
+ // this sample is more than twice our interval ahead, so just clear our counters completely
+ epoch = epoch(nanos);
+ sum = 0;
+ Arrays.fill(samples, 0);
+ }
+ else
+ {
+ // ix(epoch - 1) => last index; this is our partial result bucket, so we add this to the sum
+ sum += samples[ix(epoch - 1)];
+ // then we roll forwards, clearing buckets, until our interval covers the new sample time
+ while (epoch + INTERVAL < nanos)
+ {
+ int index = ix(epoch);
+ sum -= samples[index];
+ samples[index] = 0;
+ epoch += BUCKET_INTERVAL;
+ }
+ }
+ // store the new epoch
+ this.epoch = epoch;
+ return epoch;
+ }
+
+ private long epoch(long latestNanos)
+ {
+ return (latestNanos - MEASURED_INTERVAL) & ~(BUCKET_INTERVAL - 1);
+ }
+
+ private int ix(long nanos)
+ {
+ return (int) ((nanos >>> INDEX_SHIFT) & 15);
+ }
+
+ @Override
+ protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException
+ {
+ if (input.drainTo(out, maxItems) == 0)
+ {
+ out.add(input.take());
+ input.drainTo(out, maxItems - 1);
+ }
+
+ for (Coalescable qm : out)
+ logSample(qm.timestampNanos());
+
+ long averageGap = averageGap();
+ debugGap(averageGap);
+
+ int count = out.size();
+ if (maybeSleep(count, averageGap, maxCoalesceWindow, parker))
+ {
+ input.drainTo(out, maxItems - out.size());
+ int prevCount = count;
+ count = out.size();
+ for (int i = prevCount; i < count; i++)
+ logSample(out.get(i).timestampNanos());
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Time horizon moving average";
+ }
+ }
+
+ /*
+ * Start coalescing by sleeping if the moving average is < the requested window.
+ * The actual time spent waiting to coalesce will be the min( window, moving average * 2)
+ * The actual amount of time spent waiting can be greater then the window. For instance
+ * observed time spent coalescing was 400 microseconds with the window set to 200 in one benchmark.
+ */
+ @VisibleForTesting
+ static class MovingAverageCoalescingStrategy extends CoalescingStrategy
+ {
+ private final int samples[] = new int[16];
+ private long lastSample = 0;
+ private int index = 0;
+ private long sum = 0;
+
+ private final long maxCoalesceWindow;
+
+ public MovingAverageCoalescingStrategy(int maxCoalesceWindow, Parker parker, Logger logger, String displayName)
+ {
+ super(parker, logger, displayName);
+ this.maxCoalesceWindow = TimeUnit.MICROSECONDS.toNanos(maxCoalesceWindow);
+ for (int ii = 0; ii < samples.length; ii++)
+ samples[ii] = Integer.MAX_VALUE;
+ sum = Integer.MAX_VALUE * (long)samples.length;
+ }
+
+ private long logSample(int value)
+ {
+ sum -= samples[index];
+ sum += value;
+ samples[index] = value;
+ index++;
+ index = index & ((1 << 4) - 1);
+ return sum / 16;
+ }
+
+ private long notifyOfSample(long sample)
+ {
+ debugTimestamp(sample);
+ if (sample > lastSample)
+ {
+ final int delta = (int)(Math.min(Integer.MAX_VALUE, sample - lastSample));
+ lastSample = sample;
+ return logSample(delta);
+ }
+ else
+ {
+ return logSample(1);
+ }
+ }
+
+ @Override
+ protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException
+ {
+ if (input.drainTo(out, maxItems) == 0)
+ {
+ out.add(input.take());
+ }
+
+ long average = notifyOfSample(out.get(0).timestampNanos());
+
+ debugGap(average);
+
+ maybeSleep(out.size(), average, maxCoalesceWindow, parker);
+
+ input.drainTo(out, maxItems - out.size());
+ for (int ii = 1; ii < out.size(); ii++)
+ notifyOfSample(out.get(ii).timestampNanos());
+ }
+
+ @Override
+ public String toString() {
+ return "Moving average";
+ }
+ }
+
+ /*
+ * A fixed strategy as a backup in case MovingAverage or TimeHorizongMovingAverage fails in some scenario
+ */
+ @VisibleForTesting
+ static class FixedCoalescingStrategy extends CoalescingStrategy
+ {
+ private final long coalesceWindow;
+
+ public FixedCoalescingStrategy(int coalesceWindowMicros, Parker parker, Logger logger, String displayName)
+ {
+ super(parker, logger, displayName);
+ coalesceWindow = TimeUnit.MICROSECONDS.toNanos(coalesceWindowMicros);
+ }
+
+ @Override
+ protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException
+ {
+ if (input.drainTo(out, maxItems) == 0)
+ {
+ out.add(input.take());
+ parker.park(coalesceWindow);
+ input.drainTo(out, maxItems - 1);
+ }
+ debugTimestamps(out);
+ }
+
+ @Override
+ public String toString() {
+ return "Fixed";
+ }
+ }
+
+ /*
+ * A coalesscing strategy that just returns all currently available elements
+ */
+ @VisibleForTesting
+ static class DisabledCoalescingStrategy extends CoalescingStrategy
+ {
+
+ public DisabledCoalescingStrategy(int coalesceWindowMicros, Parker parker, Logger logger, String displayName)
+ {
+ super(parker, logger, displayName);
+ }
+
+ @Override
+ protected <C extends Coalescable> void coalesceInternal(BlockingQueue<C> input, List<C> out, int maxItems) throws InterruptedException
+ {
+ if (input.drainTo(out, maxItems) == 0)
+ {
+ out.add(input.take());
+ input.drainTo(out, maxItems - 1);
+ }
+ debugTimestamps(out);
+ }
+
+ @Override
+ public String toString() {
+ return "Disabled";
+ }
+ }
+
+ @VisibleForTesting
+ static CoalescingStrategy newCoalescingStrategy(String strategy,
+ int coalesceWindow,
+ Parker parker,
+ Logger logger,
+ String displayName)
+ {
+ String classname = null;
+ String strategyCleaned = strategy.trim().toUpperCase();
+ switch(strategyCleaned)
+ {
+ case "MOVINGAVERAGE":
+ classname = MovingAverageCoalescingStrategy.class.getName();
+ break;
+ case "FIXED":
+ classname = FixedCoalescingStrategy.class.getName();
+ break;
+ case "TIMEHORIZON":
+ classname = TimeHorizonMovingAverageCoalescingStrategy.class.getName();
+ break;
+ case "DISABLED":
+ classname = DisabledCoalescingStrategy.class.getName();
+ break;
+ default:
+ classname = strategy;
+ }
+
+ try
+ {
+ Class<?> clazz = Class.forName(classname);
+
+ if (!CoalescingStrategy.class.isAssignableFrom(clazz))
+ {
+ throw new RuntimeException(classname + " is not an instance of CoalescingStrategy");
+ }
+
+ Constructor<?> constructor = clazz.getConstructor(int.class, Parker.class, Logger.class, String.class);
+
+ return (CoalescingStrategy)constructor.newInstance(coalesceWindow, parker, logger, displayName);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static CoalescingStrategy newCoalescingStrategy(String strategy, int coalesceWindow, Logger logger, String displayName)
+ {
+ return newCoalescingStrategy(strategy, coalesceWindow, PARKER, logger, displayName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/82849649/src/java/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillis.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillis.java b/src/java/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillis.java
new file mode 100644
index 0000000..a6c5d28
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillis.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.config.Config;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/*
+ * Convert from nanotime to non-monotonic current time millis. Beware of weaker ordering guarantees.
+ */
+public class NanoTimeToCurrentTimeMillis
+{
+ /*
+ * How often to pull a new timestamp from the system.
+ */
+ private static final String TIMESTAMP_UPDATE_INTERVAL_PROPERTY = Config.PROPERTY_PREFIX + "NANOTIMETOMILLIS_TIMESTAMP_UPDATE_INTERVAL";
+ private static final long TIMESTAMP_UPDATE_INTERVAL = Long.getLong(TIMESTAMP_UPDATE_INTERVAL_PROPERTY, 10000);
+
+ private static volatile long TIMESTAMP_BASE[] = new long[] { System.currentTimeMillis(), System.nanoTime() };
+
+ @VisibleForTesting
+ public static final Object TIMESTAMP_UPDATE = new Object();
+
+ /*
+ * System.currentTimeMillis() is 25 nanoseconds. This is 2 nanoseconds (maybe) according to JMH.
+ * Faster than calling both currentTimeMillis() and nanoTime().
+ *
+ * There is also the issue of how scalable nanoTime() and currentTimeMillis() are which is a moving target.
+ *
+ * These timestamps don't order with System.currentTimeMillis() because currentTimeMillis() can tick over
+ * before this one does. I have seen it behind by as much as 2 milliseconds.
+ */
+ public static final long convert(long nanoTime)
+ {
+ final long timestampBase[] = TIMESTAMP_BASE;
+ return timestampBase[0] + TimeUnit.NANOSECONDS.toMillis(nanoTime - timestampBase[1]);
+ }
+
+ static
+ {
+ //Pick up updates from NTP periodically
+ Thread t = new Thread("NanoTimeToCurrentTimeMillis updater")
+ {
+ @Override
+ public void run()
+ {
+ while (true)
+ {
+ try
+ {
+ synchronized (TIMESTAMP_UPDATE)
+ {
+ TIMESTAMP_UPDATE.wait(TIMESTAMP_UPDATE_INTERVAL);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ return;
+ }
+
+ TIMESTAMP_BASE = new long[] {
+ Math.max(TIMESTAMP_BASE[0], System.currentTimeMillis()),
+ Math.max(TIMESTAMP_BASE[1], System.nanoTime()) };
+ }
+ }
+ };
+ t.setDaemon(true);
+ t.start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/82849649/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
new file mode 100644
index 0000000..97d15fe
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/CoalescingStrategiesTest.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import org.apache.cassandra.utils.CoalescingStrategies.Clock;
+import org.apache.cassandra.utils.CoalescingStrategies.Coalescable;
+import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy;
+import org.apache.cassandra.utils.CoalescingStrategies.Parker;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+
+import static org.junit.Assert.*;
+
+public class CoalescingStrategiesTest
+{
+
+ static final ExecutorService ex = Executors.newSingleThreadExecutor();
+
+ private static final Logger logger = LoggerFactory.getLogger(CoalescingStrategiesTest.class);
+
+ static class MockParker implements Parker
+ {
+ Queue<Long> parks = new ArrayDeque<Long>();
+ Semaphore permits = new Semaphore(0);
+
+ Semaphore parked = new Semaphore(0);
+
+ public void park(long nanos)
+ {
+ parks.offer(nanos);
+ parked.release();
+ try
+ {
+ permits.acquire();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ static class SimpleCoalescable implements Coalescable
+ {
+ final long timestampNanos;
+
+ SimpleCoalescable(long timestampNanos)
+ {
+ this.timestampNanos = timestampNanos;
+ }
+
+ public long timestampNanos()
+ {
+ return timestampNanos;
+ }
+ }
+
+
+ static long toNanos(long micros)
+ {
+ return TimeUnit.MICROSECONDS.toNanos(micros);
+ }
+
+ MockParker parker;
+
+ BlockingQueue<SimpleCoalescable> input;
+ List<SimpleCoalescable> output;
+
+ CoalescingStrategy cs;
+
+ Semaphore queueParked = new Semaphore(0);
+ Semaphore queueRelease = new Semaphore(0);
+
+ @SuppressWarnings({ "serial" })
+ @Before
+ public void setUp() throws Exception
+ {
+ cs = null;
+ CoalescingStrategies.CLOCK = new Clock()
+ {
+ @Override
+ public long nanoTime()
+ {
+ return 0;
+ }
+ };
+
+ parker = new MockParker();
+ input = new LinkedBlockingQueue<SimpleCoalescable>()
+ {
+ @Override
+ public SimpleCoalescable take() throws InterruptedException
+ {
+ queueParked.release();
+ queueRelease.acquire();
+ return super.take();
+ }
+ };
+ output = new ArrayList<>(128);
+
+ clear();
+ }
+
+ CoalescingStrategy newStrategy(String name, int window)
+ {
+ return CoalescingStrategies.newCoalescingStrategy(name, window, parker, logger, "Stupendopotamus");
+ }
+
+ void add(long whenMicros)
+ {
+ input.offer(new SimpleCoalescable(toNanos(whenMicros)));
+ }
+
+ void clear()
+ {
+ output.clear();
+ input.clear();
+ parker.parks.clear();
+ parker.parked.drainPermits();
+ parker.permits.drainPermits();
+ queueParked.drainPermits();
+ queueRelease.drainPermits();
+ }
+
+ void release() throws Exception
+ {
+ queueRelease.release();
+ parker.permits.release();
+ fut.get();
+ }
+
+ Future<?> fut;
+ void runBlocker(Semaphore waitFor) throws Exception
+ {
+ fut = ex.submit(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ cs.coalesce(input, output, 128);
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ throw new RuntimeException(ex);
+ }
+ }
+ });
+ waitFor.acquire();
+ }
+
+ @Test
+ public void testFixedCoalescingStrategy() throws Exception
+ {
+ cs = newStrategy("FIXED", 200);
+
+ //Test that when a stream of messages continues arriving it keeps sending until all are drained
+ //It does this because it is already awake and sending messages
+ add(42);
+ add(42);
+ cs.coalesce(input, output, 128);
+ assertEquals( 2, output.size());
+ assertNull(parker.parks.poll());
+
+ clear();
+
+ runBlocker(queueParked);
+ add(42);
+ add(42);
+ add(42);
+ release();
+ assertEquals( 3, output.size());
+ assertEquals(toNanos(200), parker.parks.poll().longValue());
+
+ }
+
+ @Test
+ public void testDisabledCoalescingStrateg() throws Exception
+ {
+ cs = newStrategy("DISABLED", 200);
+
+ add(42);
+ add(42);
+ cs.coalesce(input, output, 128);
+ assertEquals( 2, output.size());
+ assertNull(parker.parks.poll());
+
+ clear();
+
+ runBlocker(queueParked);
+ add(42);
+ add(42);
+ release();
+ assertEquals( 2, output.size());
+ assertNull(parker.parks.poll());
+ }
+
+ @Test
+ public void parkLoop() throws Exception
+ {
+ final Thread current = Thread.currentThread();
+ final Semaphore helperReady = new Semaphore(0);
+ final Semaphore helperGo = new Semaphore(0);
+
+ new Thread()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ helperReady.release();
+ helperGo.acquire();
+ Thread.sleep(50);
+ LockSupport.unpark(current);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ logger.error("Error", e);
+ System.exit(-1);
+ }
+ }
+ }.start();
+
+ long start = System.nanoTime();
+ helperGo.release();
+
+ long parkNanos = TimeUnit.MILLISECONDS.toNanos(500);
+
+ CoalescingStrategies.parkLoop(parkNanos);
+ long delta = System.nanoTime() - start;
+
+ assertTrue (delta >= (parkNanos - (parkNanos / 16)));
+ }
+
+ @Test
+ public void testMovingAverageCoalescingStrategy() throws Exception
+ {
+ cs = newStrategy("org.apache.cassandra.utils.CoalescingStrategies$MovingAverageCoalescingStrategy", 200);
+
+
+ //Test that things can be pulled out of the queue if it is non-empty
+ add(201);
+ add(401);
+ cs.coalesce(input, output, 128);
+ assertEquals( 2, output.size());
+ assertNull(parker.parks.poll());
+
+ //Test that blocking on the queue results in everything drained
+ clear();
+
+ runBlocker(queueParked);
+ add(601);
+ add(801);
+ release();
+ assertEquals( 2, output.size());
+ assertNull(parker.parks.poll());
+
+ clear();
+
+ //Test that out of order samples still flow
+ runBlocker(queueParked);
+ add(0);
+ release();
+ assertEquals( 1, output.size());
+ assertNull(parker.parks.poll());
+
+ clear();
+
+ add(0);
+ cs.coalesce(input, output, 128);
+ assertEquals( 1, output.size());
+ assertNull(parker.parks.poll());
+
+ clear();
+
+ //Test that too high an average doesn't coalesce
+ for (long ii = 0; ii < 128; ii++)
+ add(ii * 1000);
+ cs.coalesce(input, output, 128);
+ assertEquals(output.size(), 128);
+ assertTrue(parker.parks.isEmpty());
+
+ clear();
+
+ runBlocker(queueParked);
+ add(129 * 1000);
+ release();
+ assertTrue(parker.parks.isEmpty());
+
+ clear();
+
+ //Test that a low enough average coalesces
+ cs = newStrategy("MOVINGAVERAGE", 200);
+ for (long ii = 0; ii < 128; ii++)
+ add(ii * 99);
+ cs.coalesce(input, output, 128);
+ assertEquals(output.size(), 128);
+ assertTrue(parker.parks.isEmpty());
+
+ clear();
+
+ runBlocker(queueParked);
+ add(128 * 99);
+ add(129 * 99);
+ release();
+ assertEquals(2, output.size());
+ assertEquals(toNanos(198), parker.parks.poll().longValue());
+ }
+
+ @Test
+ public void testTimeHorizonStrategy() throws Exception
+ {
+ cs = newStrategy("TIMEHORIZON", 200);
+
+ //Test that things can be pulled out of the queue if it is non-empty
+ add(201);
+ add(401);
+ cs.coalesce(input, output, 128);
+ assertEquals( 2, output.size());
+ assertNull(parker.parks.poll());
+
+ //Test that blocking on the queue results in everything drained
+ clear();
+
+ runBlocker(queueParked);
+ add(601);
+ add(801);
+ release();
+ assertEquals( 2, output.size());
+ assertNull(parker.parks.poll());
+
+ clear();
+
+ //Test that out of order samples still flow
+ runBlocker(queueParked);
+ add(0);
+ release();
+ assertEquals( 1, output.size());
+ assertNull(parker.parks.poll());
+
+ clear();
+
+ add(0);
+ cs.coalesce(input, output, 128);
+ assertEquals( 1, output.size());
+ assertNull(parker.parks.poll());
+
+ clear();
+
+ //Test that too high an average doesn't coalesce
+ for (long ii = 0; ii < 128; ii++)
+ add(ii * 1000);
+ cs.coalesce(input, output, 128);
+ assertEquals(output.size(), 128);
+ assertTrue(parker.parks.isEmpty());
+
+ clear();
+
+ runBlocker(queueParked);
+ add(129 * 1000);
+ release();
+ assertTrue(parker.parks.isEmpty());
+
+ clear();
+
+ //Test that a low enough average coalesces
+ cs = newStrategy("TIMEHORIZON", 200);
+ primeTimeHorizonAverage(99);
+
+ clear();
+
+ runBlocker(queueParked);
+ add(100000 * 99);
+ queueRelease.release();
+ parker.parked.acquire();
+ add(100001 * 99);
+ parker.permits.release();
+ fut.get();
+ assertEquals(2, output.size());
+ assertEquals(toNanos(198), parker.parks.poll().longValue());
+
+ clear();
+
+ //Test far future
+ add(Integer.MAX_VALUE);
+ cs.coalesce(input, output, 128);
+ assertEquals(1, output.size());
+ assertTrue(parker.parks.isEmpty());
+
+ clear();
+
+ //Distant past
+ add(0);
+ cs.coalesce(input, output, 128);
+ assertEquals(1, output.size());
+ assertTrue(parker.parks.isEmpty());
+ }
+
+ void primeTimeHorizonAverage(long micros) throws Exception
+ {
+ for (long ii = 0; ii < 100000; ii++)
+ {
+ add(ii * micros);
+ if (ii % 128 == 0)
+ {
+ cs.coalesce(input, output, 128);
+ output.clear();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/82849649/test/unit/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillisTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillisTest.java b/test/unit/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillisTest.java
new file mode 100644
index 0000000..5c025cf
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/NanoTimeToCurrentTimeMillisTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class NanoTimeToCurrentTimeMillisTest
+{
+ @Test
+ public void testTimestampOrdering() throws Exception
+ {
+ long nowNanos = System.nanoTime();
+ long now = System.currentTimeMillis();
+ long lastConverted = 0;
+ for (long ii = 0; ii < 10000000; ii++)
+ {
+ now = Math.max(now, System.currentTimeMillis());
+ if (ii % 10000 == 0)
+ {
+ synchronized (NanoTimeToCurrentTimeMillis.TIMESTAMP_UPDATE)
+ {
+ NanoTimeToCurrentTimeMillis.TIMESTAMP_UPDATE.notify();
+ }
+ Thread.sleep(1);
+ }
+ nowNanos = Math.max(now, System.nanoTime());
+ long convertedNow = NanoTimeToCurrentTimeMillis.convert(nowNanos);
+ assertTrue("convertedNow = " + convertedNow + " lastConverted = " + lastConverted + " in iteration " + ii, convertedNow >= (lastConverted - 1));
+ lastConverted = convertedNow;
+ //Seems to be off by as much as two milliseconds sadly
+ assertTrue("now = " + now + " convertedNow = " + convertedNow + " in iteration " + ii, (now - 2) <= convertedNow);
+
+ }
+ }
+}