You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/07/18 20:12:02 UTC
[2/4] activemq-artemis git commit: ARTEMIS-1294 Reverted TimedBuffer
timeout policy
ARTEMIS-1294 Reverted TimedBuffer timeout policy
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3dc9566f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3dc9566f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3dc9566f
Branch: refs/heads/master
Commit: 3dc9566fb66b31729c3e04a8d06ff391a8c850b4
Parents: 8f50098
Author: Francesco Nigro <ni...@gmail.com>
Authored: Mon Jul 17 17:35:38 2017 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Jul 18 10:23:02 2017 -0400
----------------------------------------------------------------------
.../artemis/core/io/buffer/TimedBuffer.java | 190 ++++++++++---------
1 file changed, 96 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3dc9566f/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
index 238568f..2713255 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.io.buffer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
@@ -37,6 +36,9 @@ import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
public final class TimedBuffer {
// Constants -----------------------------------------------------
+ // The number of tries on sleep before switching to spin
+ private static final int MAX_CHECKS_ON_SLEEP = 20;
+
// Attributes ----------------------------------------------------
private TimedBufferObserver bufferObserver;
@@ -46,7 +48,7 @@ public final class TimedBuffer {
// prevent that
private final Semaphore spinLimiter = new Semaphore(1);
- private CheckTimer timerRunnable = new CheckTimer();
+ private CheckTimer timerRunnable = null;
private final int bufferSize;
@@ -58,7 +60,8 @@ public final class TimedBuffer {
private final int timeout;
- private final AtomicLong pendingSyncs = new AtomicLong();
+ // used to measure sync requests. When a sync is requested, it shouldn't take more than timeout to happen
+ private volatile boolean pendingSync = false;
private Thread timerThread;
@@ -73,7 +76,7 @@ public final class TimedBuffer {
private final boolean logRates;
- private long bytesFlushed = 0;
+ private final AtomicLong bytesFlushed = new AtomicLong(0);
private final AtomicLong flushesDone = new AtomicLong(0);
@@ -81,6 +84,9 @@ public final class TimedBuffer {
private TimerTask logRatesTimerTask;
+ //used only in the timerThread do not synchronization
+ private boolean useSleep = true;
+
// no need to be volatile as every access is synchronized
private boolean spinning = false;
@@ -99,16 +105,18 @@ public final class TimedBuffer {
logRatesTimer = new Timer(true);
}
// Setting the interval for nano-sleeps
+
//prefer off heap buffer to allow further humongous allocations and reduce GC overhead
//NOTE: it is used ByteBuffer::allocateDirect instead of Unpooled::directBuffer, because the latter could allocate
//direct ByteBuffers with no Cleaner!
buffer = new ChannelBufferWrapper(Unpooled.wrappedBuffer(ByteBuffer.allocateDirect(size)));
+
buffer.clear();
bufferLimit = 0;
- callbacks = null;
+ callbacks = new ArrayList<>();
this.timeout = timeout;
}
@@ -232,14 +240,11 @@ public final class TimedBuffer {
buffer.setBytes(writerIndex, bytes, bytes.readerIndex(), readableBytes);
buffer.writerIndex(writerIndex + readableBytes);
- if (callbacks == null) {
- callbacks = new ArrayList<>();
- }
callbacks.add(callback);
if (sync) {
- final long currentPendingSyncs = pendingSyncs.get();
- pendingSyncs.lazySet(currentPendingSyncs + 1);
+ pendingSync = true;
+
startSpin();
}
}
@@ -253,14 +258,11 @@ public final class TimedBuffer {
bytes.encode(buffer);
- if (callbacks == null) {
- callbacks = new ArrayList<>();
- }
callbacks.add(callback);
if (sync) {
- final long currentPendingSyncs = pendingSyncs.get();
- pendingSyncs.lazySet(currentPendingSyncs + 1);
+ pendingSync = true;
+
startSpin();
}
@@ -274,14 +276,18 @@ public final class TimedBuffer {
* force means the Journal is moving to a new file. Any pending write need to be done immediately
* or data could be lost
*/
- private void flush(final boolean force) {
+ public void flush(final boolean force) {
synchronized (this) {
if (!started) {
throw new IllegalStateException("TimedBuffer is not started");
}
if ((force || !delayFlush) && buffer.writerIndex() > 0) {
- final int pos = buffer.writerIndex();
+ int pos = buffer.writerIndex();
+
+ if (logRates) {
+ bytesFlushed.addAndGet(pos);
+ }
final ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
//bufferObserver::newBuffer doesn't necessary return a buffer with limit == pos or limit == bufferSize!!
@@ -289,34 +295,25 @@ public final class TimedBuffer {
//perform memcpy under the hood due to the off heap buffer
buffer.getBytes(0, bufferToFlush);
- final List<IOCallback> ioCallbacks = callbacks == null ? Collections.emptyList() : callbacks;
- bufferObserver.flushBuffer(bufferToFlush, pendingSyncs.get() > 0, ioCallbacks);
+
+ bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks);
stopSpin();
- pendingSyncs.lazySet(0);
+ pendingSync = false;
- callbacks = null;
+ // swap the instance as the previous callback list is being used asynchronously
+ callbacks = new ArrayList<>();
buffer.clear();
bufferLimit = 0;
- if (logRates) {
- logFlushed(pos);
- }
+ flushesDone.incrementAndGet();
}
}
}
- private void logFlushed(int bytes) {
- this.bytesFlushed += bytes;
- //more lightweight than XADD if single writer
- final long currentFlushesDone = flushesDone.get();
- //flushesDone::lazySet write-Release bytesFlushed
- flushesDone.lazySet(currentFlushesDone + 1L);
- }
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -340,21 +337,21 @@ public final class TimedBuffer {
if (!closed) {
long now = System.currentTimeMillis();
- final long flushesDone = TimedBuffer.this.flushesDone.get();
- //flushesDone::get read-Acquire bytesFlushed
- final long bytesFlushed = TimedBuffer.this.bytesFlushed;
+ long bytesF = bytesFlushed.get();
+ long flushesD = flushesDone.get();
+
if (lastExecution != 0) {
- final double rate = 1000 * (double) (bytesFlushed - lastBytesFlushed) / (now - lastExecution);
+ double rate = 1000 * (double) (bytesF - lastBytesFlushed) / (now - lastExecution);
ActiveMQJournalLogger.LOGGER.writeRate(rate, (long) (rate / (1024 * 1024)));
- final double flushRate = 1000 * (double) (flushesDone - lastFlushesDone) / (now - lastExecution);
+ double flushRate = 1000 * (double) (flushesD - lastFlushesDone) / (now - lastExecution);
ActiveMQJournalLogger.LOGGER.flushRate(flushRate);
}
lastExecution = now;
- lastBytesFlushed = bytesFlushed;
+ lastBytesFlushed = bytesF;
- lastFlushesDone = flushesDone;
+ lastFlushesDone = flushesD;
}
}
@@ -370,40 +367,74 @@ public final class TimedBuffer {
private volatile boolean closed = false;
+ int checks = 0;
+ int failedChecks = 0;
+ long timeBefore = 0;
+
@Override
public void run() {
- int waitTimes = 0;
long lastFlushTime = 0;
- long estimatedOptimalBatch = Runtime.getRuntime().availableProcessors();
- final Semaphore spinLimiter = TimedBuffer.this.spinLimiter;
- final long timeout = TimedBuffer.this.timeout;
while (!closed) {
- boolean flushed = false;
- final long currentPendingSyncs = pendingSyncs.get();
-
- if (currentPendingSyncs > 0) {
- if (bufferObserver != null) {
- final boolean checkpoint = System.nanoTime() > lastFlushTime + timeout;
- if (checkpoint || currentPendingSyncs >= estimatedOptimalBatch) {
- flush();
- if (checkpoint) {
- estimatedOptimalBatch = currentPendingSyncs;
- } else {
- estimatedOptimalBatch = Math.max(estimatedOptimalBatch, currentPendingSyncs);
- }
- lastFlushTime = System.nanoTime();
- //a flush has been requested
- flushed = true;
- }
+ // We flush on the timer if there are pending syncs there and we've waited at least one
+ // timeout since the time of the last flush.
+ // Effectively flushing "resets" the timer
+ // On the timeout verification, notice that we ignore the timeout check if we are using sleep
+
+ if (pendingSync) {
+ if (useSleep) {
+ // if using sleep, we will always flush
+ flush();
+ lastFlushTime = System.nanoTime();
+ } else if (bufferObserver != null && System.nanoTime() > lastFlushTime + timeout) {
+ // if not using flush we will spin and do the time checks manually
+ flush();
+ lastFlushTime = System.nanoTime();
}
+
+ }
+
+ sleepIfPossible();
+
+ try {
+ spinLimiter.acquire();
+
+ Thread.yield();
+
+ spinLimiter.release();
+ } catch (InterruptedException e) {
+ throw new ActiveMQInterruptedException(e);
}
+ }
+ }
+
+ /**
+ * We will attempt to use sleep only if the system supports nano-sleep
+ * we will on that case verify up to MAX_CHECKS if nano sleep is behaving well.
+ * if more than 50% of the checks have failed we will cancel the sleep and just use regular spin
+ */
+ private void sleepIfPossible() {
+ if (useSleep) {
+ if (checks < MAX_CHECKS_ON_SLEEP) {
+ timeBefore = System.nanoTime();
+ }
+
+ LockSupport.parkNanos(timeout);
+
+ if (checks < MAX_CHECKS_ON_SLEEP) {
+ long realTimeSleep = System.nanoTime() - timeBefore;
+
+ // I'm letting the real time to be up to 50% than the requested sleep.
+ if (realTimeSleep > timeout * 1.5) {
+ failedChecks++;
+ }
- if (flushed) {
- waitTimes = 0;
- } else {
- //instead of interruptible sleeping, perform progressive parks depending on the load
- waitTimes = TimedBuffer.wait(waitTimes, spinLimiter);
+ if (++checks >= MAX_CHECKS_ON_SLEEP) {
+ if (failedChecks > MAX_CHECKS_ON_SLEEP * 0.5) {
+ ActiveMQJournalLogger.LOGGER.debug("Thread.sleep with nano seconds is not working as expected, Your kernel possibly doesn't support real time. the Journal TimedBuffer will spin for timeouts");
+ useSleep = false;
+ }
+ }
}
}
}
@@ -413,35 +444,6 @@ public final class TimedBuffer {
}
}
- private static int wait(int waitTimes, Semaphore spinLimiter) {
- if (waitTimes < 10) {
- //doesn't make sense to spin loop here, because of the lock around flush/addBytes operations!
- Thread.yield();
- waitTimes++;
- } else if (waitTimes < 20) {
- LockSupport.parkNanos(1L);
- waitTimes++;
- } else if (waitTimes < 50) {
- LockSupport.parkNanos(10L);
- waitTimes++;
- } else if (waitTimes < 100) {
- LockSupport.parkNanos(100L);
- waitTimes++;
- } else if (waitTimes < 1000) {
- LockSupport.parkNanos(1000L);
- waitTimes++;
- } else {
- LockSupport.parkNanos(100_000L);
- try {
- spinLimiter.acquire();
- spinLimiter.release();
- } catch (InterruptedException e) {
- throw new ActiveMQInterruptedException(e);
- }
- }
- return waitTimes;
- }
-
/**
* Sub classes (tests basically) can use this to override disabling spinning
*/