You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/05/08 22:39:35 UTC
activemq-artemis git commit: ARTEMIS-1151 Adapting TimedBuffer and
NIO Buffer Pooling
Repository: activemq-artemis
Updated Branches:
refs/heads/1.x 5ec545a79 -> 25094f272
ARTEMIS-1151 Adapting TimedBuffer and NIO Buffer Pooling
- NIO/ASYNCIO new TimedBuffer with adapting batch window heuristic
- NIO/ASYNCIO improved TimedBuffer write monitoring with
lightweight concurrent performance counters
- NIO/ASYNCIO journal/paging operations benefit from less buffer copy
- NIO/ASYNCIO any buffer copy is always performed with raw batch copy
using SIMD instrinsics (System::arrayCopy) or memcpy under the hood
- NIO improved clear buffers using SIMD instrinsics (Arrays::fill) and/or memset
- NIO journal operation perform by default TLABs allocation pooling (off heap)
retaining only the last max sized buffer
- NIO improved file copy operations using zero-copy FileChannel::transfertTo
- NIO improved zeroing using pooled single OS page buffer to clean the file
+ pwrite (on Linux)
- NIO deterministic release of unpooled direct buffers to avoid OOM errors
due to slow GC
- Exposed OS PAGE SIZE value using Env class
(cherry picked from commit 21c9ed85cf6b9a53debdd32747bd42b2e733da80)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/25094f27
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/25094f27
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/25094f27
Branch: refs/heads/1.x
Commit: 25094f27217f8bd561d8d230d2624fd76f059d66
Parents: 5ec545a
Author: Francesco Nigro <ni...@gmail.com>
Authored: Tue May 2 11:47:44 2017 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon May 8 18:31:23 2017 -0400
----------------------------------------------------------------------
.../cli/commands/util/SyncCalculation.java | 1 +
.../org/apache/activemq/artemis/utils/Env.java | 108 ++++++++
.../artemis/core/io/AbstractSequentialFile.java | 30 +--
.../artemis/core/io/buffer/TimedBuffer.java | 252 +++++++++----------
.../artemis/core/io/nio/NIOSequentialFile.java | 105 +++++---
.../core/io/nio/NIOSequentialFileFactory.java | 82 +++++-
.../unit/core/journal/impl/TimedBufferTest.java | 160 ------------
7 files changed, 389 insertions(+), 349 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25094f27/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
index 02db655..f4fbfee 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/util/SyncCalculation.java
@@ -179,6 +179,7 @@ public class SyncCalculation {
case NIO:
factory = new NIOSequentialFileFactory(datafolder, 1).setDatasync(datasync);
+ ((NIOSequentialFileFactory) factory).disableBufferReuse();
factory.start();
return factory;
case ASYNCIO:
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25094f27/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/Env.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/Env.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/Env.java
new file mode 100644
index 0000000..94f69d3
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/Env.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+
+/**
+ * Utility that detects various properties specific to the current runtime
+ * environment, such as JVM bitness and OS type.
+ */
+public final class Env {
+
+ private static final int OS_PAGE_SIZE;
+
+ static {
+ //most common OS page size value
+ int osPageSize = 4096;
+ sun.misc.Unsafe instance;
+ try {
+ Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
+ field.setAccessible(true);
+ instance = (sun.misc.Unsafe) field.get((Object) null);
+ } catch (Throwable t) {
+ try {
+ Constructor<sun.misc.Unsafe> c = sun.misc.Unsafe.class.getDeclaredConstructor(new Class[0]);
+ c.setAccessible(true);
+ instance = c.newInstance(new Object[0]);
+ } catch (Throwable t1) {
+ instance = null;
+ }
+ }
+ if (instance != null) {
+ osPageSize = instance.pageSize();
+ }
+ OS_PAGE_SIZE = osPageSize;
+ }
+
+ /**
+ * The system will change a few logs and semantics to be suitable to
+ * run a long testsuite.
+ * Like a few log entries that are only valid during a production system.
+ * or a few cases we need to know as warn on the testsuite and as log in production.
+ */
+ private static boolean testEnv = false;
+
+ private static final String OS = System.getProperty("os.name").toLowerCase();
+ private static final boolean IS_LINUX = OS.startsWith("linux");
+ private static final boolean IS_64BIT = checkIs64bit();
+
+ private Env() {
+
+ }
+
+ /**
+ * Return the size in bytes of a OS memory page.
+ * This value will always be a power of two.
+ */
+ public static int osPageSize() {
+ return OS_PAGE_SIZE;
+ }
+
+ public static boolean isTestEnv() {
+ return testEnv;
+ }
+
+ public static void setTestEnv(boolean testEnv) {
+ Env.testEnv = testEnv;
+ }
+
+ public static boolean isLinuxOs() {
+ return IS_LINUX == true;
+ }
+
+ public static boolean is64BitJvm() {
+ return IS_64BIT;
+ }
+
+ private static boolean checkIs64bit() {
+ //check the more used JVMs
+ String systemProp;
+ systemProp = System.getProperty("com.ibm.vm.bitmode");
+ if (systemProp != null) {
+ return "64".equals(systemProp);
+ }
+ systemProp = System.getProperty("sun.arch.data.model");
+ if (systemProp != null) {
+ return "64".equals(systemProp);
+ }
+ systemProp = System.getProperty("java.vm.version");
+ return systemProp != null && systemProp.contains("_64");
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25094f27/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
index cd15246..f6cb9b0 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
@@ -189,9 +189,12 @@ public abstract class AbstractSequentialFile implements SequentialFile {
bytes.setIndex(0, bytes.capacity());
timedBuffer.addBytes(bytes, sync, callback);
} else {
- ByteBuffer buffer = factory.newBuffer(bytes.capacity());
- buffer.put(bytes.toByteBuffer().array());
- buffer.rewind();
+ final int readableBytes = bytes.readableBytes();
+ final ByteBuffer buffer = factory.newBuffer(readableBytes);
+ //factory::newBuffer doesn't necessary return a buffer with limit == readableBytes!!
+ buffer.limit(readableBytes);
+ bytes.getBytes(bytes.readerIndex(), buffer);
+ buffer.flip();
writeDirect(buffer, sync, callback);
}
}
@@ -215,15 +218,12 @@ public abstract class AbstractSequentialFile implements SequentialFile {
if (timedBuffer != null) {
timedBuffer.addBytes(bytes, sync, callback);
} else {
- ByteBuffer buffer = factory.newBuffer(bytes.getEncodeSize());
-
- // If not using the TimedBuffer, a final copy is necessary
- // Because AIO will need a specific Buffer
- // And NIO will also need a whole buffer to perform the write
-
+ final int encodedSize = bytes.getEncodeSize();
+ ByteBuffer buffer = factory.newBuffer(encodedSize);
ActiveMQBuffer outBuffer = ActiveMQBuffers.wrappedBuffer(buffer);
bytes.encode(outBuffer);
- buffer.rewind();
+ buffer.clear();
+ buffer.limit(encodedSize);
writeDirect(buffer, sync, callback);
}
}
@@ -255,9 +255,10 @@ public abstract class AbstractSequentialFile implements SequentialFile {
@Override
public void done() {
- for (IOCallback callback : delegates) {
+ final int size = delegates.size();
+ for (int i = 0; i < size; i++) {
try {
- callback.done();
+ delegates.get(i).done();
} catch (Throwable e) {
ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e);
}
@@ -266,9 +267,10 @@ public abstract class AbstractSequentialFile implements SequentialFile {
@Override
public void onError(final int errorCode, final String errorMessage) {
- for (IOCallback callback : delegates) {
+ final int size = delegates.size();
+ for (int i = 0; i < size; i++) {
try {
- callback.onError(errorCode, errorMessage);
+ delegates.get(i).onError(errorCode, errorMessage);
} catch (Throwable e) {
ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25094f27/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
index 91e5e12..32b15fe 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
@@ -18,27 +18,25 @@ package org.apache.activemq.artemis.core.io.buffer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.LinkedList;
+import java.util.Collections;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
-import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
-public class TimedBuffer {
+public final class TimedBuffer {
// Constants -----------------------------------------------------
- // The number of tries on sleep before switching to spin
- public static final int MAX_CHECKS_ON_SLEEP = 20;
-
// Attributes ----------------------------------------------------
private TimedBufferObserver bufferObserver;
@@ -58,10 +56,9 @@ public class TimedBuffer {
private List<IOCallback> callbacks;
- private volatile int timeout;
+ private final int timeout;
- // used to measure sync requests. When a sync is requested, it shouldn't take more than timeout to happen
- private volatile boolean pendingSync = false;
+ private final AtomicLong pendingSyncs = new AtomicLong();
private Thread timerThread;
@@ -76,7 +73,7 @@ public class TimedBuffer {
private final boolean logRates;
- private final AtomicLong bytesFlushed = new AtomicLong(0);
+ private long bytesFlushed = 0;
private final AtomicLong flushesDone = new AtomicLong(0);
@@ -84,8 +81,6 @@ public class TimedBuffer {
private TimerTask logRatesTimerTask;
- private boolean useSleep = true;
-
// no need to be volatile as every access is synchronized
private boolean spinning = false;
@@ -104,27 +99,18 @@ public class TimedBuffer {
logRatesTimer = new Timer(true);
}
// Setting the interval for nano-sleeps
-
- buffer = ActiveMQBuffers.fixedBuffer(bufferSize);
+ //prefer off heap buffer to allow further humongous allocations and reduce GC overhead
+ buffer = new ChannelBufferWrapper(Unpooled.directBuffer(size, size));
buffer.clear();
bufferLimit = 0;
- callbacks = new ArrayList<>();
+ callbacks = null;
this.timeout = timeout;
}
- // for Debug purposes
- public synchronized boolean isUseSleep() {
- return useSleep;
- }
-
- public synchronized void setUseSleep(boolean useSleep) {
- this.useSleep = useSleep;
- }
-
public synchronized void start() {
if (started) {
return;
@@ -232,7 +218,28 @@ public class TimedBuffer {
}
public synchronized void addBytes(final ActiveMQBuffer bytes, final boolean sync, final IOCallback callback) {
- addBytes(new ByteArrayEncoding(bytes.toByteBuffer().array()), sync, callback);
+ if (!started) {
+ throw new IllegalStateException("TimedBuffer is not started");
+ }
+
+ delayFlush = false;
+
+ //it doesn't modify the reader index of bytes as in the original version
+ final int readableBytes = bytes.readableBytes();
+ final int writerIndex = buffer.writerIndex();
+ buffer.setBytes(writerIndex, bytes, bytes.readerIndex(), readableBytes);
+ buffer.writerIndex(writerIndex + readableBytes);
+
+ if (callbacks == null) {
+ callbacks = new ArrayList<>();
+ }
+ callbacks.add(callback);
+
+ if (sync) {
+ final long currentPendingSyncs = pendingSyncs.get();
+ pendingSyncs.lazySet(currentPendingSyncs + 1);
+ startSpin();
+ }
}
public synchronized void addBytes(final EncodingSupport bytes, final boolean sync, final IOCallback callback) {
@@ -244,11 +251,14 @@ public class TimedBuffer {
bytes.encode(buffer);
+ if (callbacks == null) {
+ callbacks = new ArrayList<>();
+ }
callbacks.add(callback);
if (sync) {
- pendingSync = true;
-
+ final long currentPendingSyncs = pendingSyncs.get();
+ pendingSyncs.lazySet(currentPendingSyncs + 1);
startSpin();
}
@@ -262,45 +272,49 @@ public class TimedBuffer {
* force means the Journal is moving to a new file. Any pending write need to be done immediately
* or data could be lost
*/
- public void flush(final boolean force) {
+ private void flush(final boolean force) {
synchronized (this) {
if (!started) {
throw new IllegalStateException("TimedBuffer is not started");
}
if ((force || !delayFlush) && buffer.writerIndex() > 0) {
- int pos = buffer.writerIndex();
+ final int pos = buffer.writerIndex();
- if (logRates) {
- bytesFlushed.addAndGet(pos);
- }
+ final ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
+ //bufferObserver::newBuffer doesn't necessary return a buffer with limit == pos or limit == bufferSize!!
+ bufferToFlush.limit(pos);
+ //perform memcpy under the hood due to the off heap buffer
+ buffer.getBytes(0, bufferToFlush);
- ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
-
- // Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
- // Using bufferToFlush.put(buffer) would make several append calls for each byte
- // We also transfer the content of this buffer to the native file's buffer
-
- bufferToFlush.put(buffer.toByteBuffer().array(), 0, pos);
-
- bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks);
+ final List<IOCallback> ioCallbacks = callbacks == null ? Collections.emptyList() : callbacks;
+ bufferObserver.flushBuffer(bufferToFlush, pendingSyncs.get() > 0, ioCallbacks);
stopSpin();
- pendingSync = false;
+ pendingSyncs.lazySet(0);
- // swap the instance as the previous callback list is being used asynchronously
- callbacks = new LinkedList<>();
+ callbacks = null;
buffer.clear();
bufferLimit = 0;
- flushesDone.incrementAndGet();
+ if (logRates) {
+ logFlushed(pos);
+ }
}
}
}
+ private void logFlushed(int bytes) {
+ this.bytesFlushed += bytes;
+ //more lightweight than XADD if single writer
+ final long currentFlushesDone = flushesDone.get();
+ //flushesDone::lazySet write-Release bytesFlushed
+ flushesDone.lazySet(currentFlushesDone + 1L);
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -324,21 +338,21 @@ public class TimedBuffer {
if (!closed) {
long now = System.currentTimeMillis();
- long bytesF = bytesFlushed.get();
- long flushesD = flushesDone.get();
-
+ final long flushesDone = TimedBuffer.this.flushesDone.get();
+ //flushesDone::get read-Acquire bytesFlushed
+ final long bytesFlushed = TimedBuffer.this.bytesFlushed;
if (lastExecution != 0) {
- double rate = 1000 * (double) (bytesF - lastBytesFlushed) / (now - lastExecution);
+ final double rate = 1000 * (double) (bytesFlushed - lastBytesFlushed) / (now - lastExecution);
ActiveMQJournalLogger.LOGGER.writeRate(rate, (long) (rate / (1024 * 1024)));
- double flushRate = 1000 * (double) (flushesD - lastFlushesDone) / (now - lastExecution);
+ final double flushRate = 1000 * (double) (flushesDone - lastFlushesDone) / (now - lastExecution);
ActiveMQJournalLogger.LOGGER.flushRate(flushRate);
}
lastExecution = now;
- lastBytesFlushed = bytesF;
+ lastBytesFlushed = bytesFlushed;
- lastFlushesDone = flushesD;
+ lastFlushesDone = flushesDone;
}
}
@@ -354,84 +368,40 @@ public class TimedBuffer {
private volatile boolean closed = false;
- int checks = 0;
- int failedChecks = 0;
- long timeBefore = 0;
-
- final int sleepMillis = timeout / 1000000; // truncates
- final int sleepNanos = timeout % 1000000;
-
@Override
public void run() {
+ int waitTimes = 0;
long lastFlushTime = 0;
+ long estimatedOptimalBatch = Runtime.getRuntime().availableProcessors();
+ final Semaphore spinLimiter = TimedBuffer.this.spinLimiter;
+ final long timeout = TimedBuffer.this.timeout;
while (!closed) {
- // We flush on the timer if there are pending syncs there and we've waited at least one
- // timeout since the time of the last flush.
- // Effectively flushing "resets" the timer
- // On the timeout verification, notice that we ignore the timeout check if we are using sleep
-
- if (pendingSync) {
- if (isUseSleep()) {
- // if using sleep, we will always flush
- flush();
- lastFlushTime = System.nanoTime();
- } else if (bufferObserver != null && System.nanoTime() > lastFlushTime + timeout) {
- // if not using flush we will spin and do the time checks manually
- flush();
- lastFlushTime = System.nanoTime();
+ boolean flushed = false;
+ final long currentPendingSyncs = pendingSyncs.get();
+
+ if (currentPendingSyncs > 0) {
+ if (bufferObserver != null) {
+ final boolean checkpoint = System.nanoTime() > lastFlushTime + timeout;
+ if (checkpoint || currentPendingSyncs >= estimatedOptimalBatch) {
+ flush();
+ if (checkpoint) {
+ estimatedOptimalBatch = currentPendingSyncs;
+ } else {
+ estimatedOptimalBatch = Math.max(estimatedOptimalBatch, currentPendingSyncs);
+ }
+ lastFlushTime = System.nanoTime();
+ //a flush has been requested
+ flushed = true;
+ }
}
-
}
- sleepIfPossible();
-
- try {
- spinLimiter.acquire();
-
- Thread.yield();
-
- spinLimiter.release();
- } catch (InterruptedException e) {
- throw new ActiveMQInterruptedException(e);
- }
- }
- }
-
- /**
- * We will attempt to use sleep only if the system supports nano-sleep
- * we will on that case verify up to MAX_CHECKS if nano sleep is behaving well.
- * if more than 50% of the checks have failed we will cancel the sleep and just use regular spin
- */
- private void sleepIfPossible() {
- if (isUseSleep()) {
- if (checks < MAX_CHECKS_ON_SLEEP) {
- timeBefore = System.nanoTime();
- }
-
- try {
- sleep(sleepMillis, sleepNanos);
- } catch (InterruptedException e) {
- throw new ActiveMQInterruptedException(e);
- } catch (Exception e) {
- setUseSleep(false);
- ActiveMQJournalLogger.LOGGER.warn(e.getMessage() + ", disabling sleep on TimedBuffer, using spin now", e);
- }
-
- if (checks < MAX_CHECKS_ON_SLEEP) {
- long realTimeSleep = System.nanoTime() - timeBefore;
-
- // I'm letting the real time to be up to 50% than the requested sleep.
- if (realTimeSleep > timeout * 1.5) {
- failedChecks++;
- }
-
- if (++checks >= MAX_CHECKS_ON_SLEEP) {
- if (failedChecks > MAX_CHECKS_ON_SLEEP * 0.5) {
- ActiveMQJournalLogger.LOGGER.debug("Thread.sleep with nano seconds is not working as expected, Your kernel possibly doesn't support real time. the Journal TimedBuffer will spin for timeouts");
- setUseSleep(false);
- }
- }
+ if (flushed) {
+ waitTimes = 0;
+ } else {
+ //instead of interruptible sleeping, perform progressive parks depending on the load
+ waitTimes = TimedBuffer.wait(waitTimes, spinLimiter);
}
}
}
@@ -441,15 +411,33 @@ public class TimedBuffer {
}
}
- /**
- * Sub classes (tests basically) can use this to override how the sleep is being done
- *
- * @param sleepMillis
- * @param sleepNanos
- * @throws InterruptedException
- */
- protected void sleep(int sleepMillis, int sleepNanos) throws InterruptedException {
- Thread.sleep(sleepMillis, sleepNanos);
+ private static int wait(int waitTimes, Semaphore spinLimiter) {
+ if (waitTimes < 10) {
+ //doesn't make sense to spin loop here, because of the lock around flush/addBytes operations!
+ Thread.yield();
+ waitTimes++;
+ } else if (waitTimes < 20) {
+ LockSupport.parkNanos(1L);
+ waitTimes++;
+ } else if (waitTimes < 50) {
+ LockSupport.parkNanos(10L);
+ waitTimes++;
+ } else if (waitTimes < 100) {
+ LockSupport.parkNanos(100L);
+ waitTimes++;
+ } else if (waitTimes < 1000) {
+ LockSupport.parkNanos(1000L);
+ waitTimes++;
+ } else {
+ LockSupport.parkNanos(100_000L);
+ try {
+ spinLimiter.acquire();
+ spinLimiter.release();
+ } catch (InterruptedException e) {
+ throw new ActiveMQInterruptedException(e);
+ }
+ }
+ return waitTimes;
}
/**
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25094f27/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
index 29e5b81..d1e333e 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
@@ -22,6 +22,7 @@ import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -33,6 +34,8 @@ import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
+import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
+import org.apache.activemq.artemis.utils.Env;
public final class NIOSequentialFile extends AbstractSequentialFile {
@@ -40,9 +43,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
private RandomAccessFile rfile;
- private final int defaultMaxIO;
-
- private int maxIO;
+ private final int maxIO;
public NIOSequentialFile(final SequentialFileFactory factory,
final File directory,
@@ -50,7 +51,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
final int maxIO,
final Executor writerExecutor) {
super(directory, file, factory, writerExecutor);
- defaultMaxIO = maxIO;
+ this.maxIO = maxIO;
}
@Override
@@ -69,7 +70,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
*/
@Override
public synchronized void open() throws IOException {
- open(defaultMaxIO, true);
+ open(maxIO, true);
}
@Override
@@ -90,31 +91,38 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
@Override
public void fill(final int size) throws IOException {
- ByteBuffer bb = ByteBuffer.allocate(size);
-
- bb.limit(size);
- bb.position(0);
-
try {
- channel.position(0);
- channel.write(bb);
- channel.force(false);
- channel.position(0);
+ //uses the most common OS page size to match the Page Cache entry size and reduce JVM memory footprint
+ final int zeroPageCapacity = Env.osPageSize();
+ final ByteBuffer zeroPage = this.factory.newBuffer(zeroPageCapacity);
+ try {
+ int bytesToWrite = size;
+ long writePosition = 0;
+ while (bytesToWrite > 0) {
+ zeroPage.clear();
+ final int zeroPageLimit = Math.min(bytesToWrite, zeroPageCapacity);
+ zeroPage.limit(zeroPageLimit);
+ //use the cheaper pwrite instead of fseek + fwrite
+ final int writtenBytes = channel.write(zeroPage, writePosition);
+ bytesToWrite -= writtenBytes;
+ writePosition += writtenBytes;
+ }
+ if (factory.isDatasync()) {
+ channel.force(true);
+ }
+ //set the position to 0 to match the fill contract
+ channel.position(0);
+ fileSize = size;
+ } finally {
+ //return it to the factory
+ this.factory.releaseBuffer(zeroPage);
+ }
} catch (ClosedChannelException e) {
throw e;
} catch (IOException e) {
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
throw e;
}
- channel.force(true);
-
- fileSize = channel.size();
- }
-
- public synchronized void waitForClose() throws InterruptedException {
- while (isOpen()) {
- wait();
- }
}
@Override
@@ -247,10 +255,6 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
internalWrite(bytes, sync, null);
}
- public void writeInternal(final ByteBuffer bytes) throws Exception {
- internalWrite(bytes, true, null);
- }
-
@Override
protected ByteBuffer newBuffer(int size, final int limit) {
// For NIO, we don't need to allocate a buffer the entire size of the timed buffer, unlike AIO
@@ -293,14 +297,51 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
private void doInternalWrite(final ByteBuffer bytes,
final boolean sync,
final IOCallback callback) throws IOException {
- channel.write(bytes);
+ try {
+ channel.write(bytes);
- if (sync) {
- sync();
+ if (sync) {
+ sync();
+ }
+
+ if (callback != null) {
+ callback.done();
+ }
+ } finally {
+ //release it to recycle the write buffer if big enough
+ this.factory.releaseBuffer(bytes);
}
+ }
- if (callback != null) {
- callback.done();
+ @Override
+ public void copyTo(SequentialFile dstFile) throws IOException {
+ if (ActiveMQJournalLogger.LOGGER.isDebugEnabled()) {
+ ActiveMQJournalLogger.LOGGER.debug("Copying " + this + " as " + dstFile);
+ }
+ if (isOpen()) {
+ throw new IllegalStateException("File opened!");
+ }
+ if (dstFile.isOpen()) {
+ throw new IllegalArgumentException("dstFile must be closed too");
+ }
+ try (RandomAccessFile src = new RandomAccessFile(getFile(), "rw");
+ FileChannel srcChannel = src.getChannel();
+ FileLock srcLock = srcChannel.lock()) {
+ final long readableBytes = srcChannel.size();
+ if (readableBytes > 0) {
+ try (RandomAccessFile dst = new RandomAccessFile(dstFile.getJavaFile(), "rw");
+ FileChannel dstChannel = dst.getChannel();
+ FileLock dstLock = dstChannel.lock()) {
+ final long oldLength = dst.length();
+ final long newLength = oldLength + readableBytes;
+ dst.setLength(newLength);
+ final long transferred = dstChannel.transferFrom(srcChannel, oldLength, readableBytes);
+ if (transferred != readableBytes) {
+ dstChannel.truncate(oldLength);
+ throw new IOException("copied less then expected");
+ }
+ }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25094f27/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
index f90bebf..781176e 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFileFactory.java
@@ -19,13 +19,23 @@ package org.apache.activemq.artemis.core.io.nio;
import java.io.File;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
+import java.util.Arrays;
+import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.ArtemisConstants;
import org.apache.activemq.artemis.core.io.AbstractSequentialFileFactory;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.utils.Env;
-public class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
+public final class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
+
+ private static final int DEFAULT_CAPACITY_ALIGNMENT = Env.osPageSize();
+
+ private boolean bufferPooling;
+
+ //pools only the biggest one -> optimized for the common case
+ private final ThreadLocal<ByteBuffer> bytesPool;
public NIOSequentialFileFactory(final File journalDir, final int maxIO) {
this(journalDir, null, maxIO);
@@ -63,6 +73,8 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
final boolean logRates,
final IOCriticalErrorListener listener) {
super(journalDir, buffered, bufferSize, bufferTimeout, maxIO, logRates, listener);
+ this.bufferPooling = true;
+ this.bytesPool = new ThreadLocal<>();
}
public static ByteBuffer allocateDirectByteBuffer(final int size) {
@@ -91,6 +103,14 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
return buffer2;
}
+ public void enableBufferReuse() {
+ this.bufferPooling = true;
+ }
+
+ public void disableBufferReuse() {
+ this.bufferPooling = false;
+ }
+
@Override
public SequentialFile createSequentialFile(final String fileName) {
return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor);
@@ -101,31 +121,71 @@ public class NIOSequentialFileFactory extends AbstractSequentialFileFactory {
return timedBuffer != null;
}
+ private static int align(final int value, final int pow2alignment) {
+ return (value + (pow2alignment - 1)) & ~(pow2alignment - 1);
+ }
+
@Override
public ByteBuffer allocateDirectBuffer(final int size) {
- return NIOSequentialFileFactory.allocateDirectByteBuffer(size);
+ final int requiredCapacity = align(size, DEFAULT_CAPACITY_ALIGNMENT);
+ final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(requiredCapacity);
+ byteBuffer.limit(size);
+ return byteBuffer;
}
@Override
public void releaseDirectBuffer(ByteBuffer buffer) {
- // nothing we can do on this case. we can just have good faith on GC
+ PlatformDependent.freeDirectBuffer(buffer);
}
@Override
public ByteBuffer newBuffer(final int size) {
- return ByteBuffer.allocate(size);
+ if (!this.bufferPooling) {
+ return allocateDirectBuffer(size);
+ } else {
+ final int requiredCapacity = align(size, DEFAULT_CAPACITY_ALIGNMENT);
+ ByteBuffer byteBuffer = bytesPool.get();
+ if (byteBuffer == null || requiredCapacity > byteBuffer.capacity()) {
+ //do not free the old one (if any) until the new one will be released into the pool!
+ byteBuffer = ByteBuffer.allocateDirect(requiredCapacity);
+ } else {
+ bytesPool.set(null);
+ PlatformDependent.setMemory(PlatformDependent.directBufferAddress(byteBuffer), size, (byte) 0);
+ byteBuffer.clear();
+ }
+ byteBuffer.limit(size);
+ return byteBuffer;
+ }
}
@Override
- public void clearBuffer(final ByteBuffer buffer) {
- final int limit = buffer.limit();
- buffer.rewind();
-
- for (int i = 0; i < limit; i++) {
- buffer.put((byte) 0);
+ public void releaseBuffer(ByteBuffer buffer) {
+ if (this.bufferPooling) {
+ if (buffer.isDirect()) {
+ final ByteBuffer byteBuffer = bytesPool.get();
+ if (byteBuffer != buffer) {
+ //replace with the current pooled only if greater or null
+ if (byteBuffer == null || buffer.capacity() > byteBuffer.capacity()) {
+ if (byteBuffer != null) {
+ //free the smaller one
+ PlatformDependent.freeDirectBuffer(byteBuffer);
+ }
+ bytesPool.set(buffer);
+ } else {
+ PlatformDependent.freeDirectBuffer(buffer);
+ }
+ }
+ }
}
+ }
- buffer.rewind();
+ @Override
+ public void clearBuffer(final ByteBuffer buffer) {
+ if (buffer.isDirect()) {
+ PlatformDependent.setMemory(PlatformDependent.directBufferAddress(buffer), buffer.limit(), (byte) 0);
+ } else {
+ Arrays.fill(buffer.array(), buffer.arrayOffset(), buffer.limit(), (byte) 0);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/25094f27/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java
index 31cb970..b2f65cd 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java
@@ -19,8 +19,6 @@ package org.apache.activemq.artemis.tests.unit.core.journal.impl;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@@ -205,162 +203,4 @@ public class TimedBufferTest extends ActiveMQTestBase {
}
}
-
- /**
- * This test will verify if the system will switch to spin case the system can't perform sleeps timely
- * due to proper kernel installations
- *
- * @throws Exception
- */
- @Test
- public void testVerifySwitchToSpin() throws Exception {
- class TestObserver implements TimedBufferObserver {
-
- @Override
- public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback> callbacks) {
- }
-
- /* (non-Javadoc)
- * @see org.apache.activemq.artemis.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
- */
- @Override
- public ByteBuffer newBuffer(final int minSize, final int maxSize) {
- return ByteBuffer.allocate(maxSize);
- }
-
- @Override
- public int getRemainingBytes() {
- return 1024 * 1024;
- }
- }
-
- final CountDownLatch sleptLatch = new CountDownLatch(1);
-
- TimedBuffer timedBuffer = new TimedBuffer(100, TimedBufferTest.ONE_SECOND_IN_NANOS / 1000, false) {
-
- @Override
- protected void stopSpin() {
- // keeps spinning forever
- }
-
- @Override
- protected void sleep(int sleepMillis, int sleepNanos) throws InterruptedException {
- Thread.sleep(10);
- }
-
- @Override
- public synchronized void setUseSleep(boolean param) {
- super.setUseSleep(param);
- sleptLatch.countDown();
- }
-
- };
-
- timedBuffer.start();
-
- try {
-
- timedBuffer.setObserver(new TestObserver());
-
- int x = 0;
-
- byte[] bytes = new byte[10];
- for (int j = 0; j < 10; j++) {
- bytes[j] = ActiveMQTestBase.getSamplebyte(x++);
- }
-
- ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(bytes);
-
- timedBuffer.checkSize(10);
- timedBuffer.addBytes(buff, true, dummyCallback);
-
- sleptLatch.await(10, TimeUnit.SECONDS);
-
- assertFalse(timedBuffer.isUseSleep());
- } finally {
- timedBuffer.stop();
- }
-
- }
-
- /**
- * This test will verify if the system will switch to spin case the system can't perform sleeps timely
- * due to proper kernel installations
- *
- * @throws Exception
- */
- @Test
- public void testStillSleeps() throws Exception {
- class TestObserver implements TimedBufferObserver {
-
- @Override
- public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback> callbacks) {
- }
-
- /* (non-Javadoc)
- * @see org.apache.activemq.artemis.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
- */
- @Override
- public ByteBuffer newBuffer(final int minSize, final int maxSize) {
- return ByteBuffer.allocate(maxSize);
- }
-
- @Override
- public int getRemainingBytes() {
- return 1024 * 1024;
- }
- }
-
- final CountDownLatch sleptLatch = new CountDownLatch(TimedBuffer.MAX_CHECKS_ON_SLEEP);
-
- TimedBuffer timedBuffer = new TimedBuffer(100, TimedBufferTest.ONE_SECOND_IN_NANOS / 1000, false) {
-
- @Override
- protected void stopSpin() {
- // keeps spinning forever
- }
-
- @Override
- protected void sleep(int sleepMillis, int sleepNanos) throws InterruptedException {
- sleptLatch.countDown();
- // no sleep
- }
-
- @Override
- public synchronized void setUseSleep(boolean param) {
- super.setUseSleep(param);
- sleptLatch.countDown();
- }
-
- };
-
- timedBuffer.start();
-
- try {
-
- timedBuffer.setObserver(new TestObserver());
-
- int x = 0;
-
- byte[] bytes = new byte[10];
- for (int j = 0; j < 10; j++) {
- bytes[j] = ActiveMQTestBase.getSamplebyte(x++);
- }
-
- ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(bytes);
-
- timedBuffer.checkSize(10);
- timedBuffer.addBytes(buff, true, dummyCallback);
-
- // waits all the sleeps to be done
- sleptLatch.await(10, TimeUnit.SECONDS);
-
- // keeps waiting a bit longer
- Thread.sleep(100);
-
- assertTrue(timedBuffer.isUseSleep());
- } finally {
- timedBuffer.stop();
- }
- }
}