You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/07/19 14:49:34 UTC

[1/2] activemq-artemis git commit: ARTEMIS-1294 Reverted TimedBuffer timeout policy

Repository: activemq-artemis
Updated Branches:
  refs/heads/1.x 423f26f02 -> b6b5b4caa


ARTEMIS-1294 Reverted TimedBuffer timeout policy

(cherry picked from commit 3dc9566fb66b31729c3e04a8d06ff391a8c850b4)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/38cd0cd9
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/38cd0cd9
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/38cd0cd9

Branch: refs/heads/1.x
Commit: 38cd0cd9f4a4b2a55a3eb3d9514e52bcb6687da5
Parents: 423f26f
Author: Francesco Nigro <ni...@gmail.com>
Authored: Mon Jul 17 17:35:38 2017 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Jul 19 10:47:34 2017 -0400

----------------------------------------------------------------------
 .../artemis/core/io/buffer/TimedBuffer.java     | 190 ++++++++++---------
 1 file changed, 96 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/38cd0cd9/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
index 7d5370b..1a50523 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.io.buffer;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -37,6 +36,9 @@ import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
 public final class TimedBuffer {
    // Constants -----------------------------------------------------
 
+   // The number of tries on sleep before switching to spin
+   private static final int MAX_CHECKS_ON_SLEEP = 20;
+
    // Attributes ----------------------------------------------------
 
    private TimedBufferObserver bufferObserver;
@@ -46,7 +48,7 @@ public final class TimedBuffer {
    // prevent that
    private final Semaphore spinLimiter = new Semaphore(1);
 
-   private CheckTimer timerRunnable = new CheckTimer();
+   private CheckTimer timerRunnable = null;
 
    private final int bufferSize;
 
@@ -58,7 +60,8 @@ public final class TimedBuffer {
 
    private final int timeout;
 
-   private final AtomicLong pendingSyncs = new AtomicLong();
+   // used to measure sync requests. When a sync is requested, it shouldn't take more than timeout to happen
+   private volatile boolean pendingSync = false;
 
    private Thread timerThread;
 
@@ -73,7 +76,7 @@ public final class TimedBuffer {
 
    private final boolean logRates;
 
-   private long bytesFlushed = 0;
+   private final AtomicLong bytesFlushed = new AtomicLong(0);
 
    private final AtomicLong flushesDone = new AtomicLong(0);
 
@@ -81,6 +84,9 @@ public final class TimedBuffer {
 
    private TimerTask logRatesTimerTask;
 
+   //used only in the timerThread do not synchronization
+   private boolean useSleep = true;
+
    // no need to be volatile as every access is synchronized
    private boolean spinning = false;
 
@@ -99,16 +105,18 @@ public final class TimedBuffer {
          logRatesTimer = new Timer(true);
       }
       // Setting the interval for nano-sleeps
+
       //prefer off heap buffer to allow further humongous allocations and reduce GC overhead
       //NOTE: it is used ByteBuffer::allocateDirect instead of Unpooled::directBuffer, because the latter could allocate
       //direct ByteBuffers with no Cleaner!
       buffer = new ChannelBufferWrapper(Unpooled.wrappedBuffer(ByteBuffer.allocateDirect(size)));
 
+
       buffer.clear();
 
       bufferLimit = 0;
 
-      callbacks = null;
+      callbacks = new ArrayList<>();
 
       this.timeout = timeout;
    }
@@ -232,14 +240,11 @@ public final class TimedBuffer {
       buffer.setBytes(writerIndex, bytes, bytes.readerIndex(), readableBytes);
       buffer.writerIndex(writerIndex + readableBytes);
 
-      if (callbacks == null) {
-         callbacks = new ArrayList<>();
-      }
       callbacks.add(callback);
 
       if (sync) {
-         final long currentPendingSyncs = pendingSyncs.get();
-         pendingSyncs.lazySet(currentPendingSyncs + 1);
+         pendingSync = true;
+
          startSpin();
       }
    }
@@ -253,14 +258,11 @@ public final class TimedBuffer {
 
       bytes.encode(buffer);
 
-      if (callbacks == null) {
-         callbacks = new ArrayList<>();
-      }
       callbacks.add(callback);
 
       if (sync) {
-         final long currentPendingSyncs = pendingSyncs.get();
-         pendingSyncs.lazySet(currentPendingSyncs + 1);
+         pendingSync = true;
+
          startSpin();
       }
 
@@ -274,14 +276,18 @@ public final class TimedBuffer {
     * force means the Journal is moving to a new file. Any pending write need to be done immediately
     * or data could be lost
     */
-   private void flush(final boolean force) {
+   public void flush(final boolean force) {
       synchronized (this) {
          if (!started) {
             throw new IllegalStateException("TimedBuffer is not started");
          }
 
          if ((force || !delayFlush) && buffer.writerIndex() > 0) {
-            final int pos = buffer.writerIndex();
+            int pos = buffer.writerIndex();
+
+            if (logRates) {
+               bytesFlushed.addAndGet(pos);
+            }
 
             final ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
             //bufferObserver::newBuffer doesn't necessary return a buffer with limit == pos or limit == bufferSize!!
@@ -289,34 +295,25 @@ public final class TimedBuffer {
             //perform memcpy under the hood due to the off heap buffer
             buffer.getBytes(0, bufferToFlush);
 
-            final List<IOCallback> ioCallbacks = callbacks == null ? Collections.emptyList() : callbacks;
-            bufferObserver.flushBuffer(bufferToFlush, pendingSyncs.get() > 0, ioCallbacks);
+
+            bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks);
 
             stopSpin();
 
-            pendingSyncs.lazySet(0);
+            pendingSync = false;
 
-            callbacks = null;
+            // swap the instance as the previous callback list is being used asynchronously
+            callbacks = new ArrayList<>();
 
             buffer.clear();
 
             bufferLimit = 0;
 
-            if (logRates) {
-               logFlushed(pos);
-            }
+            flushesDone.incrementAndGet();
          }
       }
    }
 
-   private void logFlushed(int bytes) {
-      this.bytesFlushed += bytes;
-      //more lightweight than XADD if single writer
-      final long currentFlushesDone = flushesDone.get();
-      //flushesDone::lazySet write-Release bytesFlushed
-      flushesDone.lazySet(currentFlushesDone + 1L);
-   }
-
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -340,21 +337,21 @@ public final class TimedBuffer {
          if (!closed) {
             long now = System.currentTimeMillis();
 
-            final long flushesDone = TimedBuffer.this.flushesDone.get();
-            //flushesDone::get read-Acquire bytesFlushed
-            final long bytesFlushed = TimedBuffer.this.bytesFlushed;
+            long bytesF = bytesFlushed.get();
+            long flushesD = flushesDone.get();
+
             if (lastExecution != 0) {
-               final double rate = 1000 * (double) (bytesFlushed - lastBytesFlushed) / (now - lastExecution);
+               double rate = 1000 * (double) (bytesF - lastBytesFlushed) / (now - lastExecution);
                ActiveMQJournalLogger.LOGGER.writeRate(rate, (long) (rate / (1024 * 1024)));
-               final double flushRate = 1000 * (double) (flushesDone - lastFlushesDone) / (now - lastExecution);
+               double flushRate = 1000 * (double) (flushesD - lastFlushesDone) / (now - lastExecution);
                ActiveMQJournalLogger.LOGGER.flushRate(flushRate);
             }
 
             lastExecution = now;
 
-            lastBytesFlushed = bytesFlushed;
+            lastBytesFlushed = bytesF;
 
-            lastFlushesDone = flushesDone;
+            lastFlushesDone = flushesD;
          }
       }
 
@@ -370,40 +367,74 @@ public final class TimedBuffer {
 
       private volatile boolean closed = false;
 
+      int checks = 0;
+      int failedChecks = 0;
+      long timeBefore = 0;
+
       @Override
       public void run() {
-         int waitTimes = 0;
          long lastFlushTime = 0;
-         long estimatedOptimalBatch = Runtime.getRuntime().availableProcessors();
-         final Semaphore spinLimiter = TimedBuffer.this.spinLimiter;
-         final long timeout = TimedBuffer.this.timeout;
 
          while (!closed) {
-            boolean flushed = false;
-            final long currentPendingSyncs = pendingSyncs.get();
-
-            if (currentPendingSyncs > 0) {
-               if (bufferObserver != null) {
-                  final boolean checkpoint = System.nanoTime() > lastFlushTime + timeout;
-                  if (checkpoint || currentPendingSyncs >= estimatedOptimalBatch) {
-                     flush();
-                     if (checkpoint) {
-                        estimatedOptimalBatch = currentPendingSyncs;
-                     } else {
-                        estimatedOptimalBatch = Math.max(estimatedOptimalBatch, currentPendingSyncs);
-                     }
-                     lastFlushTime = System.nanoTime();
-                     //a flush has been requested
-                     flushed = true;
-                  }
+            // We flush on the timer if there are pending syncs there and we've waited at least one
+            // timeout since the time of the last flush.
+            // Effectively flushing "resets" the timer
+            // On the timeout verification, notice that we ignore the timeout check if we are using sleep
+
+            if (pendingSync) {
+               if (useSleep) {
+                  // if using sleep, we will always flush
+                  flush();
+                  lastFlushTime = System.nanoTime();
+               } else if (bufferObserver != null && System.nanoTime() > lastFlushTime + timeout) {
+                  // if not using flush we will spin and do the time checks manually
+                  flush();
+                  lastFlushTime = System.nanoTime();
                }
+
+            }
+
+            sleepIfPossible();
+
+            try {
+               spinLimiter.acquire();
+
+               Thread.yield();
+
+               spinLimiter.release();
+            } catch (InterruptedException e) {
+               throw new ActiveMQInterruptedException(e);
             }
+         }
+      }
+
+      /**
+       * We will attempt to use sleep only if the system supports nano-sleep
+       * we will on that case verify up to MAX_CHECKS if nano sleep is behaving well.
+       * if more than 50% of the checks have failed we will cancel the sleep and just use regular spin
+       */
+      private void sleepIfPossible() {
+         if (useSleep) {
+            if (checks < MAX_CHECKS_ON_SLEEP) {
+               timeBefore = System.nanoTime();
+            }
+
+            LockSupport.parkNanos(timeout);
+
+            if (checks < MAX_CHECKS_ON_SLEEP) {
+               long realTimeSleep = System.nanoTime() - timeBefore;
+
+               // I'm letting the real time to be up to 50% than the requested sleep.
+               if (realTimeSleep > timeout * 1.5) {
+                  failedChecks++;
+               }
 
-            if (flushed) {
-               waitTimes = 0;
-            } else {
-               //instead of interruptible sleeping, perform progressive parks depending on the load
-               waitTimes = TimedBuffer.wait(waitTimes, spinLimiter);
+               if (++checks >= MAX_CHECKS_ON_SLEEP) {
+                  if (failedChecks > MAX_CHECKS_ON_SLEEP * 0.5) {
+                     ActiveMQJournalLogger.LOGGER.debug("Thread.sleep with nano seconds is not working as expected, Your kernel possibly doesn't support real time. the Journal TimedBuffer will spin for timeouts");
+                     useSleep = false;
+                  }
+               }
             }
          }
       }
@@ -413,35 +444,6 @@ public final class TimedBuffer {
       }
    }
 
-   private static int wait(int waitTimes, Semaphore spinLimiter) {
-      if (waitTimes < 10) {
-         //doesn't make sense to spin loop here, because of the lock around flush/addBytes operations!
-         Thread.yield();
-         waitTimes++;
-      } else if (waitTimes < 20) {
-         LockSupport.parkNanos(1L);
-         waitTimes++;
-      } else if (waitTimes < 50) {
-         LockSupport.parkNanos(10L);
-         waitTimes++;
-      } else if (waitTimes < 100) {
-         LockSupport.parkNanos(100L);
-         waitTimes++;
-      } else if (waitTimes < 1000) {
-         LockSupport.parkNanos(1000L);
-         waitTimes++;
-      } else {
-         LockSupport.parkNanos(100_000L);
-         try {
-            spinLimiter.acquire();
-            spinLimiter.release();
-         } catch (InterruptedException e) {
-            throw new ActiveMQInterruptedException(e);
-         }
-      }
-      return waitTimes;
-   }
-
    /**
     * Sub classes (tests basically) can use this to override disabling spinning
     */


[2/2] activemq-artemis git commit: ARTEMIS-1294 Using older sleep on TimedBuffer

Posted by cl...@apache.org.
ARTEMIS-1294 Using older sleep on TimedBuffer

And also adding test

(cherry picked from commit ad372ec98e0d8e5765eef56883ea29b10768c70e)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b6b5b4ca
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b6b5b4ca
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b6b5b4ca

Branch: refs/heads/1.x
Commit: b6b5b4caa7c97e28a840fb29d33c7d7b7410eb74
Parents: 38cd0cd
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Jul 18 10:03:47 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Jul 19 10:48:12 2017 -0400

----------------------------------------------------------------------
 .../artemis/core/io/buffer/TimedBuffer.java     | 26 +++++-
 .../unit/core/journal/impl/TimedBufferTest.java | 94 ++++++++++++++++++++
 2 files changed, 117 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b6b5b4ca/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
index 1a50523..109e1fa 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
@@ -23,7 +23,6 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.LockSupport;
 
 import io.netty.buffer.Unpooled;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@@ -48,7 +47,7 @@ public final class TimedBuffer {
    // prevent that
    private final Semaphore spinLimiter = new Semaphore(1);
 
-   private CheckTimer timerRunnable = null;
+   private CheckTimer timerRunnable;
 
    private final int bufferSize;
 
@@ -371,6 +370,9 @@ public final class TimedBuffer {
       int failedChecks = 0;
       long timeBefore = 0;
 
+      final int sleepMillis = timeout / 1000000; // truncates
+      final int sleepNanos = timeout % 1000000;
+
       @Override
       public void run() {
          long lastFlushTime = 0;
@@ -419,7 +421,14 @@ public final class TimedBuffer {
                timeBefore = System.nanoTime();
             }
 
-            LockSupport.parkNanos(timeout);
+            try {
+               sleep(sleepMillis, sleepNanos);
+            } catch (InterruptedException e) {
+               throw new ActiveMQInterruptedException(e);
+            } catch (Exception e) {
+               useSleep = false;
+               ActiveMQJournalLogger.LOGGER.warn(e.getMessage() + ", disabling sleep on TimedBuffer, using spin now", e);
+            }
 
             if (checks < MAX_CHECKS_ON_SLEEP) {
                long realTimeSleep = System.nanoTime() - timeBefore;
@@ -445,6 +454,17 @@ public final class TimedBuffer {
    }
 
    /**
+    * Sub classes (tests basically) can use this to override how the sleep is being done
+    *
+    * @param sleepMillis
+    * @param sleepNanos
+    * @throws InterruptedException
+    */
+   protected void sleep(int sleepMillis, int sleepNanos) throws InterruptedException {
+      Thread.sleep(sleepMillis, sleepNanos);
+   }
+
+   /**
     * Sub classes (tests basically) can use this to override disabling spinning
     */
    protected void stopSpin() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b6b5b4ca/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java
index b2f65cd..bddb7ea 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.unit.core.journal.impl;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@@ -27,6 +28,7 @@ import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
 import org.apache.activemq.artemis.core.io.buffer.TimedBufferObserver;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.ReusableLatch;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -122,6 +124,98 @@ public class TimedBufferTest extends ActiveMQTestBase {
       }
 
    }
+   @Test
+   public void testTimeOnTimedBuffer() throws Exception {
+      final ReusableLatch latchFlushed = new ReusableLatch(0);
+      final AtomicInteger flushes = new AtomicInteger(0);
+      class TestObserver implements TimedBufferObserver {
+
+         @Override
+         public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback> callbacks) {
+            for (IOCallback callback : callbacks) {
+               callback.done();
+            }
+         }
+
+         /* (non-Javadoc)
+          * @see org.apache.activemq.artemis.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
+          */
+         @Override
+         public ByteBuffer newBuffer(final int minSize, final int maxSize) {
+            return ByteBuffer.allocate(maxSize);
+         }
+
+         @Override
+         public int getRemainingBytes() {
+            return 1024 * 1024;
+         }
+      }
+
+      TimedBuffer timedBuffer = new TimedBuffer(100, TimedBufferTest.ONE_SECOND_IN_NANOS / 2, false);
+
+      timedBuffer.start();
+
+      TestObserver observer = new TestObserver();
+      timedBuffer.setObserver(observer);
+
+
+      int x = 0;
+
+      byte[] bytes = new byte[10];
+      for (int j = 0; j < 10; j++) {
+         bytes[j] = ActiveMQTestBase.getSamplebyte(x++);
+      }
+
+      ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(bytes);
+
+      IOCallback callback = new IOCallback() {
+         @Override
+         public void done() {
+            System.out.println("done");
+            latchFlushed.countDown();
+         }
+
+         @Override
+         public void onError(int errorCode, String errorMessage) {
+
+         }
+      };
+
+
+      try {
+         latchFlushed.setCount(2);
+
+         // simulating a low load period
+         timedBuffer.addBytes(buff, true, callback);
+         Thread.sleep(1000);
+         timedBuffer.addBytes(buff, true, callback);
+         Assert.assertTrue(latchFlushed.await(5, TimeUnit.SECONDS));
+         latchFlushed.setCount(5);
+
+
+         flushes.set(0);
+
+         // Sending like crazy... still some wait (1 millisecond) between each send..
+         long time = System.currentTimeMillis();
+         for (int i = 0; i < 5; i++) {
+            timedBuffer.addBytes(buff, true, callback);
+            Thread.sleep(1);
+         }
+         Assert.assertTrue(latchFlushed.await(5, TimeUnit.SECONDS));
+
+         // The purpose of the timed buffer is to batch writes up to a millisecond.. or up to the size of the buffer.
+         Assert.assertTrue("Timed Buffer is not batching accordingly, it was expected to take at least 500 seconds batching multiple writes while it took " + (System.currentTimeMillis() - time) + " milliseconds", System.currentTimeMillis() - time >= 500);
+
+         // it should be in fact only writing once..
+         // i will set for 3 just in case there's a GC or anything else happening on the test
+         Assert.assertTrue("Too many writes were called", flushes.get() <= 3);
+      } finally {
+         timedBuffer.stop();
+      }
+
+
+
+   }
 
    @Test
    public void testTimingAndFlush() throws Exception {