You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by mi...@apache.org on 2019/02/14 00:23:38 UTC

[activemq-artemis] branch master updated: ARTEMIS-2239 Zero-copy NIO/MAPPED TimedBuffer

This is an automated email from the ASF dual-hosted git repository.

michaelpearce pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 4da9d84  ARTEMIS-2239 Zero-copy NIO/MAPPED TimedBuffer
     new 441c950  This closes #2522
4da9d84 is described below

commit 4da9d8431166fa568db2763c850930c9dff7edf0
Author: Francesco Nigro <ni...@gmail.com>
AuthorDate: Fri Jan 25 14:40:28 2019 +0100

    ARTEMIS-2239 Zero-copy NIO/MAPPED TimedBuffer
    
    NIO/MAPPED journal types can use directly the buffer of TimedBuffer
    to perform file writes, avoiding an expensive copy + zeroing.
---
 .../artemis/core/io/AbstractSequentialFile.java    |  64 ++-----
 .../io/{IOCallback.java => DelegateCallback.java}  |  34 +++-
 .../activemq/artemis/core/io/IOCallback.java       |  24 +++
 .../artemis/core/io/buffer/TimedBuffer.java        |   8 +-
 .../core/io/buffer/TimedBufferObserver.java        |  28 +--
 .../core/io/mapped/TimedSequentialFile.java        | 106 ++++-------
 .../artemis/core/io/nio/NIOSequentialFile.java     |  35 ++++
 .../artemis/core/io/DelegateCallbackTest.java      | 122 ++++++++++++
 .../artemis/core/io/JournalTptBenchmark.java       | 205 ---------------------
 .../core/io/SequentialFileTptBenchmark.java        | 196 --------------------
 .../unit/core/journal/impl/TimedBufferTest.java    |  74 +++-----
 11 files changed, 281 insertions(+), 615 deletions(-)

diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
index 66ac44a..8a01069 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFile.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -58,7 +59,7 @@ public abstract class AbstractSequentialFile implements SequentialFile {
     * Instead of having AIOSequentialFile implementing the Observer, I have done it on an inner class.
     * This is the class returned to the factory when the file is being activated.
     */
-   protected final TimedBufferObserver timedBufferObserver = new LocalBufferObserver();
+   protected final TimedBufferObserver timedBufferObserver = createTimedBufferObserver();
 
    /**
     * @param file
@@ -74,6 +75,10 @@ public abstract class AbstractSequentialFile implements SequentialFile {
       this.factory = factory;
    }
 
+   protected TimedBufferObserver createTimedBufferObserver() {
+      return new LocalBufferObserver();
+   }
+
    // Public --------------------------------------------------------
 
    @Override
@@ -252,43 +257,6 @@ public abstract class AbstractSequentialFile implements SequentialFile {
       return file;
    }
 
-   private static final class DelegateCallback implements IOCallback {
-
-      final List<IOCallback> delegates;
-
-      private DelegateCallback(final List<IOCallback> delegates) {
-         this.delegates = delegates;
-      }
-
-      @Override
-      public void done() {
-         final int size = delegates.size();
-         for (int i = 0; i < size; i++) {
-            try {
-               delegates.get(i).done();
-            } catch (Throwable e) {
-               ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e);
-            }
-         }
-      }
-
-      @Override
-      public void onError(final int errorCode, final String errorMessage) {
-         if (logger.isTraceEnabled()) {
-            logger.trace("onError" + " code: " + errorCode + " message: " + errorMessage);
-         }
-
-         final int size = delegates.size();
-         for (int i = 0; i < size; i++) {
-            try {
-               delegates.get(i).onError(errorCode, errorMessage);
-            } catch (Throwable e) {
-               ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e);
-            }
-         }
-      }
-   }
-
    protected ByteBuffer newBuffer(int size, int limit) {
       size = factory.calculateBlockSize(size);
       limit = factory.calculateBlockSize(limit);
@@ -301,22 +269,20 @@ public abstract class AbstractSequentialFile implements SequentialFile {
    protected class LocalBufferObserver implements TimedBufferObserver {
 
       @Override
-      public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final List<IOCallback> callbacks) {
-         buffer.flip();
-
-         if (buffer.limit() == 0) {
-            factory.releaseBuffer(buffer);
-         } else {
+      public void flushBuffer(final ByteBuf byteBuf, final boolean requestedSync, final List<IOCallback> callbacks) {
+         final int bytes = byteBuf.readableBytes();
+         if (bytes > 0) {
+            final ByteBuffer buffer = newBuffer(byteBuf.capacity(), bytes);
+            buffer.limit(bytes);
+            byteBuf.getBytes(byteBuf.readerIndex(), buffer);
+            buffer.flip();
             writeDirect(buffer, requestedSync, new DelegateCallback(callbacks));
+         } else {
+            IOCallback.done(callbacks);
          }
       }
 
       @Override
-      public ByteBuffer newBuffer(final int size, final int limit) {
-         return AbstractSequentialFile.this.newBuffer(size, limit);
-      }
-
-      @Override
       public int getRemainingBytes() {
          if (fileSize - position.get() > Integer.MAX_VALUE) {
             return Integer.MAX_VALUE;
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCallback.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/DelegateCallback.java
similarity index 51%
copy from artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCallback.java
copy to artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/DelegateCallback.java
index 8447491..12d20b5 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCallback.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/DelegateCallback.java
@@ -16,20 +16,34 @@
  */
 package org.apache.activemq.artemis.core.io;
 
+import java.util.Collection;
+import java.util.Objects;
+
 /**
- * The interface used for AIO Callbacks.
+ * It is a utility class to allow several {@link IOCallback}s to be used as one.
  */
-public interface IOCallback {
+public final class DelegateCallback implements IOCallback {
 
-   /**
-    * Method for sync notifications. When this callback method is called, there is a guarantee the data is written on the disk.
-    * <br><b>Note:</b><i>Leave this method as soon as possible, or you would be blocking the whole notification thread</i>
-    */
-   void done();
+   private final Collection<? extends IOCallback> delegates;
 
    /**
-    * Method for error notifications.
-    * Observation: The whole file will be probably failing if this happens. Like, if you delete the file, you will start to get errors for these operations
+    * It doesn't copy defensively the given {@code delegates}.
+    *
+    * @throws NullPointerException if {@code delegates} is {@code null}
     */
-   void onError(int errorCode, String errorMessage);
+   public DelegateCallback(final Collection<? extends IOCallback> delegates) {
+      Objects.requireNonNull(delegates, "delegates cannot be null!");
+      this.delegates = delegates;
+   }
+
+   @Override
+   public void done() {
+      IOCallback.done(delegates);
+   }
+
+   @Override
+   public void onError(final int errorCode, final String errorMessage) {
+      IOCallback.onError(delegates, errorCode, errorMessage);
+   }
+
 }
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCallback.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCallback.java
index 8447491..fe9a527 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCallback.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/IOCallback.java
@@ -16,6 +16,10 @@
  */
 package org.apache.activemq.artemis.core.io;
 
+import java.util.Collection;
+
+import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
+
 /**
  * The interface used for AIO Callbacks.
  */
@@ -32,4 +36,24 @@ public interface IOCallback {
     * Observation: The whole file will be probably failing if this happens. Like, if you delete the file, you will start to get errors for these operations
     */
    void onError(int errorCode, String errorMessage);
+
+   static void done(Collection<? extends IOCallback> delegates) {
+      delegates.forEach(callback -> {
+         try {
+            callback.done();
+         } catch (Throwable e) {
+            ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e);
+         }
+      });
+   }
+
+   static void onError(Collection<? extends IOCallback> delegates, int errorCode, final String errorMessage) {
+      delegates.forEach(callback -> {
+         try {
+            callback.onError(errorCode, errorMessage);
+         } catch (Throwable e) {
+            ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e);
+         }
+      });
+   }
 }
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
index 9f809de..d49f930 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
@@ -358,13 +358,7 @@ public final class TimedBuffer extends CriticalComponentImpl {
                   bytesFlushed.addAndGet(pos);
                }
 
-               final ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
-               //bufferObserver::newBuffer doesn't necessary return a buffer with limit == pos or limit == bufferSize!!
-               bufferToFlush.limit(pos);
-               //perform memcpy under the hood due to the off heap buffer
-               buffer.getBytes(0, bufferToFlush);
-
-               bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks);
+               bufferObserver.flushBuffer(buffer.byteBuf(), pendingSync, callbacks);
 
                stopSpin();
 
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBufferObserver.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBufferObserver.java
index 7812531..36f6602 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBufferObserver.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBufferObserver.java
@@ -16,38 +16,22 @@
  */
 package org.apache.activemq.artemis.core.io.buffer;
 
-import java.nio.ByteBuffer;
 import java.util.List;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.core.io.IOCallback;
 
 public interface TimedBufferObserver {
 
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   void flushBuffer(ByteBuffer buffer, boolean syncRequested, List<IOCallback> callbacks);
+   /**
+    * It flushes {@link ByteBuf#readableBytes()} of {@code buffer} without changing its reader/writer indexes.<br>
+    * It just use {@code buffer} temporary: it can be reused by the caller right after this call.
+    */
+   void flushBuffer(ByteBuf buffer, boolean syncRequested, List<IOCallback> callbacks);
 
    /**
     * Return the number of remaining bytes that still fit on the observer (file)
     */
    int getRemainingBytes();
 
-   ByteBuffer newBuffer(int size, int limit);
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
 }
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java
index e0a877a..cccbb1d 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/mapped/TimedSequentialFile.java
@@ -22,8 +22,11 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
 import org.apache.activemq.artemis.core.io.DummyCallback;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.io.SequentialFile;
@@ -32,7 +35,6 @@ import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
 import org.apache.activemq.artemis.core.io.buffer.TimedBufferObserver;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
-import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
 
 final class TimedSequentialFile implements SequentialFile {
 
@@ -239,89 +241,45 @@ final class TimedSequentialFile implements SequentialFile {
       return this.sequentialFile.getJavaFile();
    }
 
-   private static void invokeDoneOn(List<? extends IOCallback> callbacks) {
-      final int size = callbacks.size();
-      for (int i = 0; i < size; i++) {
-         try {
-            final IOCallback callback = callbacks.get(i);
-            callback.done();
-         } catch (Throwable e) {
-            ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e);
-         }
-      }
-   }
-
-   private static void invokeOnErrorOn(final int errorCode,
-                                       final String errorMessage,
-                                       List<? extends IOCallback> callbacks) {
-      final int size = callbacks.size();
-      for (int i = 0; i < size; i++) {
-         try {
-            final IOCallback callback = callbacks.get(i);
-            callback.onError(errorCode, errorMessage);
-         } catch (Throwable e) {
-            ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e);
-         }
-      }
-   }
-
-   private static final class DelegateCallback implements IOCallback {
-
-      List<IOCallback> delegates;
-
-      private DelegateCallback() {
-         this.delegates = null;
-      }
-
-      @Override
-      public void done() {
-         invokeDoneOn(delegates);
-      }
-
-      @Override
-      public void onError(final int errorCode, final String errorMessage) {
-         invokeOnErrorOn(errorCode, errorMessage, delegates);
-      }
-   }
-
    private final class LocalBufferObserver implements TimedBufferObserver {
 
-      private final DelegateCallback delegateCallback = new DelegateCallback();
-
       @Override
-      public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final List<IOCallback> callbacks) {
-         buffer.flip();
-
-         if (buffer.limit() == 0) {
-            try {
-               invokeDoneOn(callbacks);
-            } finally {
-               factory.releaseBuffer(buffer);
-            }
-         } else {
-            if (callbacks.isEmpty()) {
-               try {
-                  sequentialFile.writeDirect(buffer, requestedSync);
-               } catch (Exception e) {
-                  throw new IllegalStateException(e);
-               }
+      public void flushBuffer(final ByteBuf byteBuf, final boolean requestedSync, final List<IOCallback> callbacks) {
+         final int bytes = byteBuf.readableBytes();
+         if (bytes > 0) {
+            final boolean releaseBuffer;
+            final ByteBuffer buffer;
+            if (byteBuf.nioBufferCount() == 1) {
+               //any ByteBuffer is fine with the MAPPED journal
+               releaseBuffer = false;
+               buffer = byteBuf.internalNioBuffer(byteBuf.readerIndex(), bytes);
             } else {
-               delegateCallback.delegates = callbacks;
-               try {
-                  sequentialFile.writeDirect(buffer, requestedSync, delegateCallback);
-               } finally {
-                  delegateCallback.delegates = null;
+               //perform the copy on buffer
+               releaseBuffer = true;
+               buffer = factory.newBuffer(byteBuf.capacity());
+               buffer.limit(bytes);
+               byteBuf.getBytes(byteBuf.readerIndex(), buffer);
+               buffer.flip();
+            }
+            try {
+               blockingWriteDirect(buffer, requestedSync, releaseBuffer);
+               IOCallback.done(callbacks);
+            } catch (Throwable t) {
+               final int code;
+               if (t instanceof IOException) {
+                  code = ActiveMQExceptionType.IO_ERROR.getCode();
+                  factory.onIOError(new ActiveMQIOErrorException(t.getMessage(), t), t.getMessage(), TimedSequentialFile.this.sequentialFile);
+               } else {
+                  code = ActiveMQExceptionType.GENERIC_EXCEPTION.getCode();
                }
+               IOCallback.onError(callbacks, code, t.getMessage());
             }
+         } else {
+            IOCallback.done(callbacks);
          }
       }
 
       @Override
-      public ByteBuffer newBuffer(final int size, final int limit) {
-         return factory.newBuffer(limit);
-      }
-
-      @Override
       public int getRemainingBytes() {
          try {
             final long position = sequentialFile.position();
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
index 230cfff..f241ac0 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/nio/NIOSequentialFile.java
@@ -22,16 +22,20 @@ import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.FileChannel;
 import java.nio.file.StandardOpenOption;
+import java.util.List;
 import java.util.concurrent.Executor;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
 import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
 import org.apache.activemq.artemis.core.io.AbstractSequentialFile;
+import org.apache.activemq.artemis.core.io.DelegateCallback;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.buffer.TimedBufferObserver;
 import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
 import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
 import org.apache.activemq.artemis.utils.Env;
@@ -52,6 +56,11 @@ public class NIOSequentialFile extends AbstractSequentialFile {
    }
 
    @Override
+   protected TimedBufferObserver createTimedBufferObserver() {
+      return new SyncLocalBufferObserver();
+   }
+
+   @Override
    public int calculateBlockStart(final int position) {
       return position;
    }
@@ -324,4 +333,30 @@ public class NIOSequentialFile extends AbstractSequentialFile {
       }
       SequentialFile.appendTo(getFile().toPath(), dstFile.getJavaFile().toPath());
    }
+
+   private class SyncLocalBufferObserver extends LocalBufferObserver {
+
+      @Override
+      public void flushBuffer(ByteBuf byteBuf, boolean requestedSync, List<IOCallback> callbacks) {
+         //maybe no need to perform any copy
+         final int bytes = byteBuf.readableBytes();
+         if (bytes == 0) {
+            IOCallback.done(callbacks);
+         } else {
+            //enable zero copy case
+            if (byteBuf.nioBufferCount() == 1 && byteBuf.isDirect()) {
+               final ByteBuffer buffer = byteBuf.internalNioBuffer(byteBuf.readerIndex(), bytes);
+               final IOCallback callback = new DelegateCallback(callbacks);
+               try {
+                  //no need to pool the buffer and don't care if the NIO buffer got modified
+                  internalWrite(buffer, requestedSync, callback, false);
+               } catch (Exception e) {
+                  callback.onError(ActiveMQExceptionType.GENERIC_EXCEPTION.getCode(), e.getMessage());
+               }
+            } else {
+               super.flushBuffer(byteBuf, requestedSync, callbacks);
+            }
+         }
+      }
+   }
 }
diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/DelegateCallbackTest.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/DelegateCallbackTest.java
new file mode 100644
index 0000000..b7e45a5
--- /dev/null
+++ b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/DelegateCallbackTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.io;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DelegateCallbackTest {
+
+   @Test(expected = NullPointerException.class)
+   public void shouldFailWithNullDelegates() {
+      new DelegateCallback(null);
+   }
+
+   private static final class CountingIOCallback implements IOCallback {
+
+      long done = 0;
+      long onError = 0;
+      final boolean fail;
+
+      private CountingIOCallback(boolean fail) {
+         this.fail = fail;
+      }
+
+      @Override
+      public void done() {
+         done++;
+         if (fail) {
+            throw new IllegalStateException();
+         }
+      }
+
+      @Override
+      public void onError(int errorCode, String errorMessage) {
+         onError++;
+         if (fail) {
+            throw new IllegalStateException();
+         }
+      }
+   }
+
+   @Test
+   public void shouldCallDoneOnEachCallback() {
+      final CountingIOCallback countingIOCallback = new CountingIOCallback(false);
+      final DelegateCallback callback = new DelegateCallback(Arrays.asList(countingIOCallback, countingIOCallback));
+      callback.done();
+      Assert.assertEquals(2, countingIOCallback.done);
+      Assert.assertEquals(0, countingIOCallback.onError);
+   }
+
+   @Test
+   public void shouldCallOnErrorOnEachCallback() {
+      final CountingIOCallback countingIOCallback = new CountingIOCallback(false);
+      final DelegateCallback callback = new DelegateCallback(Arrays.asList(countingIOCallback, countingIOCallback));
+      callback.onError(0, "not a real error");
+      Assert.assertEquals(0, countingIOCallback.done);
+      Assert.assertEquals(2, countingIOCallback.onError);
+   }
+
+   @Test
+   public void shouldCallDoneOnEachCallbackWithExceptions() {
+      final CountingIOCallback countingIOCallback = new CountingIOCallback(true);
+      final DelegateCallback callback = new DelegateCallback(Arrays.asList(countingIOCallback, countingIOCallback));
+      callback.done();
+      Assert.assertEquals(2, countingIOCallback.done);
+      Assert.assertEquals(0, countingIOCallback.onError);
+   }
+
+   @Test
+   public void shouldCallOnErrorOnEachCallbackWithExceptions() {
+      final CountingIOCallback countingIOCallback = new CountingIOCallback(true);
+      final DelegateCallback callback = new DelegateCallback(Arrays.asList(countingIOCallback, countingIOCallback));
+      callback.onError(0, "not a real error");
+      Assert.assertEquals(0, countingIOCallback.done);
+      Assert.assertEquals(2, countingIOCallback.onError);
+   }
+
+   @Test
+   public void shouldLogOnDoneForEachExceptions() {
+      AssertionLoggerHandler.startCapture();
+      try {
+         final CountingIOCallback countingIOCallback = new CountingIOCallback(true);
+         final DelegateCallback callback = new DelegateCallback(Collections.singleton(countingIOCallback));
+         callback.done();
+         Assert.assertTrue(AssertionLoggerHandler.findText("AMQ142024"));
+      } finally {
+         AssertionLoggerHandler.stopCapture();
+      }
+   }
+
+   @Test
+   public void shouldLogOnErrorForEachExceptions() {
+      AssertionLoggerHandler.startCapture();
+      try {
+         final CountingIOCallback countingIOCallback = new CountingIOCallback(true);
+         final DelegateCallback callback = new DelegateCallback(Collections.singleton(countingIOCallback));
+         callback.onError(0, "not a real error");
+         Assert.assertTrue(AssertionLoggerHandler.findText("AMQ142025"));
+      } finally {
+         AssertionLoggerHandler.stopCapture();
+      }
+   }
+
+}
diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java
deleted file mode 100644
index 4562585..0000000
--- a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/JournalTptBenchmark.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.artemis.core.io;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.LockSupport;
-import java.util.stream.Stream;
-
-import io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueue;
-import org.apache.activemq.artemis.ArtemisConstants;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
-import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
-import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
-import org.apache.activemq.artemis.core.journal.Journal;
-import org.apache.activemq.artemis.core.journal.RecordInfo;
-import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
-import org.apache.activemq.artemis.jlibaio.LibaioContext;
-import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
-
-/**
- * To benchmark Type.Aio you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
- */
-public class JournalTptBenchmark {
-
-   public static void main(String[] args) throws Exception {
-      final boolean useDefaultIoExecutor = true;
-      final int fileSize = 10 * 1024 * 1024;
-      final boolean dataSync = false;
-      final Type type = Type.Mapped;
-      final int tests = 10;
-      final int warmup = 20_000;
-      final int measurements = 100_000;
-      final int msgSize = 100;
-      final byte[] msgContent = new byte[msgSize];
-      Arrays.fill(msgContent, (byte) 1);
-      final int totalMessages = (measurements * tests + warmup);
-      final File tmpDirectory = new File("./");
-      //using the default configuration when the broker starts!
-      final SequentialFileFactory factory;
-      switch (type) {
-
-         case Mapped:
-            factory = new MappedSequentialFileFactory(tmpDirectory, fileSize, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, null)
-               .setDatasync(dataSync);
-            break;
-         case Nio:
-            factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null, null).setDatasync(dataSync);
-            break;
-         case Aio:
-            factory = new AIOSequentialFileFactory(tmpDirectory, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 500, false, null, null).setDatasync(dataSync);
-            //disable it when using directly the same buffer: ((AIOSequentialFileFactory)factory).disableBufferReuse();
-            if (!LibaioContext.isLoaded()) {
-               throw new IllegalStateException("lib AIO not loaded!");
-            }
-            break;
-         default:
-            throw new AssertionError("unsupported case");
-      }
-
-      int numFiles = (int) (totalMessages * factory.calculateBlockSize(msgSize)) / fileSize;
-      if (numFiles < 2) {
-         numFiles = 2;
-      }
-      ExecutorService service = null;
-      final Journal journal;
-      if (useDefaultIoExecutor) {
-         journal = new JournalImpl(fileSize, numFiles, numFiles, Integer.MAX_VALUE, 100, factory, "activemq-data", "amq", factory.getMaxIO());
-         journal.start();
-      } else {
-         final ArrayList<MpscArrayQueue<Runnable>> tasks = new ArrayList<>();
-         service = Executors.newSingleThreadExecutor();
-         journal = new JournalImpl(() -> new ArtemisExecutor() {
-
-            private final MpscArrayQueue<Runnable> taskQueue = new MpscArrayQueue<>(1024);
-
-            {
-               tasks.add(taskQueue);
-            }
-
-            @Override
-            public void execute(Runnable command) {
-               while (!taskQueue.offer(command)) {
-                  LockSupport.parkNanos(1L);
-               }
-            }
-         }, fileSize, numFiles, numFiles, Integer.MAX_VALUE, 100, factory, "activemq-data", "amq", factory.getMaxIO(), 0);
-         journal.start();
-         service.execute(() -> {
-            final int size = tasks.size();
-            final int capacity = 1024;
-            while (!Thread.currentThread().isInterrupted()) {
-               for (int i = 0; i < size; i++) {
-                  final MpscArrayQueue<Runnable> runnables = tasks.get(i);
-                  for (int j = 0; j < capacity; j++) {
-                     final Runnable task = runnables.poll();
-                     if (task == null) {
-                        break;
-                     }
-                     try {
-                        task.run();
-                     } catch (Throwable t) {
-                        System.err.println(t);
-                     }
-                  }
-               }
-            }
-
-         });
-      }
-      try {
-         journal.load(new ArrayList<RecordInfo>(), null, null);
-      } catch (Exception e) {
-         throw new RuntimeException(e);
-      }
-      try {
-         final EncodingSupport encodingSupport = new EncodingSupport() {
-            @Override
-            public int getEncodeSize() {
-               return msgSize;
-            }
-
-            @Override
-            public void encode(ActiveMQBuffer buffer) {
-               final int writerIndex = buffer.writerIndex();
-               buffer.setBytes(writerIndex, msgContent);
-               buffer.writerIndex(writerIndex + msgSize);
-            }
-
-            @Override
-            public void decode(ActiveMQBuffer buffer) {
-
-            }
-         };
-         long id = 1;
-         {
-            final long elapsed = writeMeasurements(id, journal, encodingSupport, warmup);
-            id += warmup;
-            System.out.println("warmup:" + (measurements * 1000_000_000L) / elapsed + " ops/sec");
-         }
-         for (int t = 0; t < tests; t++) {
-            final long elapsed = writeMeasurements(id, journal, encodingSupport, measurements);
-            System.out.println((measurements * 1000_000_000L) / elapsed + " ops/sec");
-            id += warmup;
-         }
-
-      } finally {
-         journal.stop();
-         if (service != null) {
-            service.shutdown();
-         }
-         final File[] fileToDeletes = tmpDirectory.listFiles();
-         System.out.println("Files to deletes" + Arrays.toString(fileToDeletes));
-         Stream.of(fileToDeletes).forEach(File::delete);
-      }
-   }
-
-   private static long writeMeasurements(long id,
-                                         Journal journal,
-                                         EncodingSupport encodingSupport,
-                                         int measurements) throws Exception {
-      System.gc();
-      TimeUnit.SECONDS.sleep(2);
-
-      final long start = System.nanoTime();
-      for (int i = 0; i < measurements; i++) {
-         write(id, journal, encodingSupport);
-         id++;
-      }
-      final long elapsed = System.nanoTime() - start;
-      return elapsed;
-   }
-
-   private static void write(long id, Journal journal, EncodingSupport encodingSupport) throws Exception {
-      journal.appendAddRecord(id, (byte) 1, encodingSupport, false);
-      journal.appendUpdateRecord(id, (byte) 1, encodingSupport, true);
-   }
-
-   private enum Type {
-
-      Mapped, Nio, Aio
-
-   }
-}
diff --git a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java b/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java
deleted file mode 100644
index 8ba0ccc..0000000
--- a/artemis-journal/src/test/java/org/apache/activemq/artemis/core/io/SequentialFileTptBenchmark.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.artemis.core.io;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.activemq.artemis.ArtemisConstants;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
-import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
-import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
-import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
-import org.apache.activemq.artemis.jlibaio.LibaioContext;
-
-/**
- * To benchmark Type.Aio you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
- */
-public class SequentialFileTptBenchmark {
-
-   private static final FastWaitIOCallback CALLBACK = new FastWaitIOCallback();
-
-   public static void main(String[] args) throws Exception {
-      final boolean dataSync = false;
-      final boolean writeSync = true;
-      final Type type = Type.Mapped;
-      final int tests = 10;
-      final int warmup = 20_000;
-      final int measurements = 100_000;
-      final int msgSize = 100;
-      final byte[] msgContent = new byte[msgSize];
-      Arrays.fill(msgContent, (byte) 1);
-      final File tmpDirectory = new File("./");
-      //using the default configuration when the broker starts!
-      final SequentialFileFactory factory;
-      switch (type) {
-
-         case Mapped:
-            final int fileSize = Math.max(msgSize * measurements, msgSize * warmup);
-            factory = new MappedSequentialFileFactory(tmpDirectory, fileSize, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, null).setDatasync(dataSync);
-            break;
-         case Nio:
-            factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null, null).setDatasync(dataSync);
-            break;
-         case Aio:
-            factory = new AIOSequentialFileFactory(tmpDirectory, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 500, false, null, null).setDatasync(dataSync);
-            //disable it when using directly the same buffer: ((AIOSequentialFileFactory)factory).disableBufferReuse();
-            if (!LibaioContext.isLoaded()) {
-               throw new IllegalStateException("lib AIO not loaded!");
-            }
-            break;
-         default:
-            throw new AssertionError("unsupported case");
-      }
-      factory.start();
-      try {
-         final EncodingSupport encodingSupport = new EncodingSupport() {
-            @Override
-            public int getEncodeSize() {
-               return msgSize;
-            }
-
-            @Override
-            public void encode(ActiveMQBuffer buffer) {
-               final int writerIndex = buffer.writerIndex();
-               buffer.setBytes(writerIndex, msgContent);
-               buffer.writerIndex(writerIndex + msgSize);
-            }
-
-            @Override
-            public void decode(ActiveMQBuffer buffer) {
-
-            }
-         };
-         final int alignedMessageSize = factory.calculateBlockSize(msgSize);
-         final long totalFileSize = Math.max(alignedMessageSize * measurements, alignedMessageSize * warmup);
-         if (totalFileSize > Integer.MAX_VALUE)
-            throw new IllegalArgumentException("reduce measurements/warmup");
-         final int fileSize = (int) totalFileSize;
-         final SequentialFile sequentialFile = factory.createSequentialFile("seq.dat");
-         sequentialFile.getJavaFile().delete();
-         sequentialFile.getJavaFile().deleteOnExit();
-         sequentialFile.open();
-         final long startZeros = System.nanoTime();
-         sequentialFile.fill(fileSize);
-         final long elapsedZeros = System.nanoTime() - startZeros;
-         System.out.println("Zeroed " + fileSize + " bytes in " + TimeUnit.NANOSECONDS.toMicros(elapsedZeros) + " us");
-         try {
-            {
-               final long elapsed = writeMeasurements(factory, sequentialFile, encodingSupport, warmup, writeSync);
-               System.out.println("warmup:" + (measurements * 1000_000_000L) / elapsed + " ops/sec");
-            }
-            for (int t = 0; t < tests; t++) {
-               final long elapsed = writeMeasurements(factory, sequentialFile, encodingSupport, measurements, writeSync);
-               System.out.println((measurements * 1000_000_000L) / elapsed + " ops/sec");
-            }
-         } finally {
-            sequentialFile.close();
-         }
-      } finally {
-         factory.stop();
-      }
-   }
-
-   private static long writeMeasurements(SequentialFileFactory sequentialFileFactory,
-                                         SequentialFile sequentialFile,
-                                         EncodingSupport encodingSupport,
-                                         int measurements,
-                                         boolean writeSync) throws Exception {
-      //System.gc();
-      TimeUnit.SECONDS.sleep(2);
-      sequentialFileFactory.activateBuffer(sequentialFile);
-      sequentialFile.position(0);
-      final long start = System.nanoTime();
-      for (int i = 0; i < measurements; i++) {
-         write(sequentialFile, encodingSupport, writeSync);
-      }
-      sequentialFileFactory.deactivateBuffer();
-      final long elapsed = System.nanoTime() - start;
-      return elapsed;
-   }
-
-   private static void write(SequentialFile sequentialFile,
-                             EncodingSupport encodingSupport,
-                             boolean sync) throws Exception {
-      //this pattern is necessary to ensure that NIO's TimedBuffer fill flush the buffer and know the real size of it
-      if (sequentialFile.fits(encodingSupport.getEncodeSize())) {
-         CALLBACK.reset();
-         sequentialFile.write(encodingSupport, sync, CALLBACK);
-         CALLBACK.waitCompletion();
-      } else {
-         throw new IllegalStateException("can't happen!");
-      }
-   }
-
-   private enum Type {
-
-      Mapped, Nio, Aio
-
-   }
-
-   private static final class FastWaitIOCallback implements IOCallback {
-
-      private final AtomicBoolean done = new AtomicBoolean(false);
-      private int errorCode = 0;
-      private String errorMessage = null;
-
-      public FastWaitIOCallback reset() {
-         errorCode = 0;
-         errorMessage = null;
-         done.lazySet(false);
-         return this;
-      }
-
-      @Override
-      public void done() {
-         errorCode = 0;
-         errorMessage = null;
-         done.lazySet(true);
-      }
-
-      @Override
-      public void onError(int errorCode, String errorMessage) {
-         this.errorCode = errorCode;
-         this.errorMessage = errorMessage;
-         done.lazySet(true);
-      }
-
-      public void waitCompletion() throws InterruptedException, ActiveMQException {
-         while (!done.get()) {
-         }
-         if (errorMessage != null) {
-            throw ActiveMQExceptionType.createException(errorCode, errorMessage);
-         }
-      }
-   }
-}
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java
index 3619db2..318cd53 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.core.io.DummyCallback;
@@ -68,19 +69,15 @@ public class TimedBufferTest extends ActiveMQTestBase {
       class TestObserver implements TimedBufferObserver {
 
          @Override
-         public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback> callbacks) {
+         public void flushBuffer(final ByteBuf byteBuf, final boolean sync, final List<IOCallback> callbacks) {
+            final ByteBuffer buffer = ByteBuffer.allocate(byteBuf.readableBytes());
+            buffer.limit(byteBuf.readableBytes());
+            byteBuf.getBytes(byteBuf.readerIndex(), buffer);
+            buffer.flip();
             buffers.add(buffer);
             flushTimes.incrementAndGet();
          }
 
-         /* (non-Javadoc)
-          * @see org.apache.activemq.artemis.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
-          */
-         @Override
-         public ByteBuffer newBuffer(final int minSize, final int maxSize) {
-            return ByteBuffer.allocate(maxSize);
-         }
-
          @Override
          public int getRemainingBytes() {
             return 1024 * 1024;
@@ -135,20 +132,15 @@ public class TimedBufferTest extends ActiveMQTestBase {
       class TestObserver implements TimedBufferObserver {
 
          @Override
-         public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback> callbacks) {
+         public void flushBuffer(final ByteBuf byteBuf, final boolean sync, final List<IOCallback> callbacks) {
+            final ByteBuffer buffer = ByteBuffer.allocate(byteBuf.readableBytes());
+            buffer.limit(byteBuf.readableBytes());
+            byteBuf.getBytes(byteBuf.readerIndex(), buffer);
             for (IOCallback callback : callbacks) {
                callback.done();
             }
          }
 
-         /* (non-Javadoc)
-          * @see org.apache.activemq.artemis.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
-          */
-         @Override
-         public ByteBuffer newBuffer(final int minSize, final int maxSize) {
-            return ByteBuffer.allocate(maxSize);
-         }
-
          @Override
          public int getRemainingBytes() {
             return 1024 * 1024;
@@ -262,10 +254,11 @@ public class TimedBufferTest extends ActiveMQTestBase {
       }
 
       @Override
-      public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback> callbacks) {
+      public void flushBuffer(final ByteBuf byteBuf, final boolean sync, final List<IOCallback> callbacks) {
          assert sync;
-         assert dummyBuffer == buffer;
-         if (buffer.position() > 0) {
+         dummyBuffer.limit(byteBuf.readableBytes());
+         byteBuf.getBytes(byteBuf.readerIndex(), dummyBuffer);
+         if (dummyBuffer.position() > 0) {
             dummyBuffer.clear();
             flushes++;
             //ask the device to perform a flush
@@ -273,16 +266,6 @@ public class TimedBufferTest extends ActiveMQTestBase {
          }
       }
 
-      /* (non-Javadoc)
-       * @see org.apache.activemq.artemis.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
-       */
-      @Override
-      public ByteBuffer newBuffer(final int minSize, final int maxSize) {
-         assert maxSize <= dummyBuffer.capacity();
-         dummyBuffer.limit(minSize);
-         return dummyBuffer;
-      }
-
       @Override
       public int getRemainingBytes() {
          return Integer.MAX_VALUE;
@@ -318,9 +301,10 @@ public class TimedBufferTest extends ActiveMQTestBase {
       }
 
       @Override
-      public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback> callbacks) {
+      public void flushBuffer(final ByteBuf byteBuf, final boolean sync, final List<IOCallback> callbacks) {
          assert sync;
-         assert dummyBuffer == buffer;
+         dummyBuffer.limit(byteBuf.readableBytes());
+         byteBuf.getBytes(byteBuf.readerIndex(), dummyBuffer);
          if (dummyBuffer.position() > 0) {
             dummyBuffer.clear();
             //emulate the flush time of a blocking device with a precise sleep
@@ -331,16 +315,6 @@ public class TimedBufferTest extends ActiveMQTestBase {
          }
       }
 
-      /* (non-Javadoc)
-       * @see org.apache.activemq.artemis.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
-       */
-      @Override
-      public ByteBuffer newBuffer(final int minSize, final int maxSize) {
-         assert maxSize <= dummyBuffer.capacity();
-         dummyBuffer.limit(minSize);
-         return dummyBuffer;
-      }
-
       @Override
       public int getRemainingBytes() {
          return Integer.MAX_VALUE;
@@ -470,19 +444,15 @@ public class TimedBufferTest extends ActiveMQTestBase {
       class TestObserver implements TimedBufferObserver {
 
          @Override
-         public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback> callbacks) {
+         public void flushBuffer(final ByteBuf byteBuf, final boolean sync, final List<IOCallback> callbacks) {
+            final ByteBuffer buffer = ByteBuffer.allocate(byteBuf.readableBytes());
+            buffer.limit(byteBuf.readableBytes());
+            byteBuf.getBytes(byteBuf.readerIndex(), buffer);
+            buffer.flip();
             buffers.add(buffer);
             flushTimes.incrementAndGet();
          }
 
-         /* (non-Javadoc)
-          * @see org.apache.activemq.artemis.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
-          */
-         @Override
-         public ByteBuffer newBuffer(final int minSize, final int maxSize) {
-            return ByteBuffer.allocate(maxSize);
-         }
-
          @Override
          public int getRemainingBytes() {
             return 1024 * 1024;