You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/07/18 20:12:04 UTC

[4/4] activemq-artemis git commit: ARTEMIS-1294 Using older sleep on TimedBuffer

ARTEMIS-1294 Using older sleep on TimedBuffer

And also adding test


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ad372ec9
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ad372ec9
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ad372ec9

Branch: refs/heads/master
Commit: ad372ec98e0d8e5765eef56883ea29b10768c70e
Parents: 41a03de
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Jul 18 10:03:47 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Jul 18 16:01:51 2017 -0400

----------------------------------------------------------------------
 .../artemis/core/io/buffer/TimedBuffer.java     | 26 +++++-
 .../unit/core/journal/impl/TimedBufferTest.java | 94 ++++++++++++++++++++
 2 files changed, 117 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ad372ec9/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
index 2713255..087453d 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java
@@ -23,7 +23,6 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.LockSupport;
 
 import io.netty.buffer.Unpooled;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@@ -48,7 +47,7 @@ public final class TimedBuffer {
    // prevent that
    private final Semaphore spinLimiter = new Semaphore(1);
 
-   private CheckTimer timerRunnable = null;
+   private CheckTimer timerRunnable;
 
    private final int bufferSize;
 
@@ -371,6 +370,9 @@ public final class TimedBuffer {
       int failedChecks = 0;
       long timeBefore = 0;
 
+      final int sleepMillis = timeout / 1000000; // truncates
+      final int sleepNanos = timeout % 1000000;
+
       @Override
       public void run() {
          long lastFlushTime = 0;
@@ -419,7 +421,14 @@ public final class TimedBuffer {
                timeBefore = System.nanoTime();
             }
 
-            LockSupport.parkNanos(timeout);
+            try {
+               sleep(sleepMillis, sleepNanos);
+            } catch (InterruptedException e) {
+               throw new ActiveMQInterruptedException(e);
+            } catch (Exception e) {
+               useSleep = false;
+               ActiveMQJournalLogger.LOGGER.warn(e.getMessage() + ", disabling sleep on TimedBuffer, using spin now", e);
+            }
 
             if (checks < MAX_CHECKS_ON_SLEEP) {
                long realTimeSleep = System.nanoTime() - timeBefore;
@@ -445,6 +454,17 @@ public final class TimedBuffer {
    }
 
    /**
+    * Sub classes (tests basically) can use this to override how the sleep is being done
+    *
+    * @param sleepMillis
+    * @param sleepNanos
+    * @throws InterruptedException
+    */
+   protected void sleep(int sleepMillis, int sleepNanos) throws InterruptedException {
+      Thread.sleep(sleepMillis, sleepNanos);
+   }
+
+   /**
     * Sub classes (tests basically) can use this to override disabling spinning
     */
    protected void stopSpin() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ad372ec9/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java
index b2f65cd..bddb7ea 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.unit.core.journal.impl;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@@ -27,6 +28,7 @@ import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
 import org.apache.activemq.artemis.core.io.buffer.TimedBufferObserver;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.ReusableLatch;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -122,6 +124,98 @@ public class TimedBufferTest extends ActiveMQTestBase {
       }
 
    }
+   @Test
+   public void testTimeOnTimedBuffer() throws Exception {
+      final ReusableLatch latchFlushed = new ReusableLatch(0);
+      final AtomicInteger flushes = new AtomicInteger(0);
+      class TestObserver implements TimedBufferObserver {
+
+         @Override
+         public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback> callbacks) {
+            for (IOCallback callback : callbacks) {
+               callback.done();
+            }
+         }
+
+         /* (non-Javadoc)
+          * @see org.apache.activemq.artemis.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
+          */
+         @Override
+         public ByteBuffer newBuffer(final int minSize, final int maxSize) {
+            return ByteBuffer.allocate(maxSize);
+         }
+
+         @Override
+         public int getRemainingBytes() {
+            return 1024 * 1024;
+         }
+      }
+
+      TimedBuffer timedBuffer = new TimedBuffer(100, TimedBufferTest.ONE_SECOND_IN_NANOS / 2, false);
+
+      timedBuffer.start();
+
+      TestObserver observer = new TestObserver();
+      timedBuffer.setObserver(observer);
+
+
+      int x = 0;
+
+      byte[] bytes = new byte[10];
+      for (int j = 0; j < 10; j++) {
+         bytes[j] = ActiveMQTestBase.getSamplebyte(x++);
+      }
+
+      ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(bytes);
+
+      IOCallback callback = new IOCallback() {
+         @Override
+         public void done() {
+            System.out.println("done");
+            latchFlushed.countDown();
+         }
+
+         @Override
+         public void onError(int errorCode, String errorMessage) {
+
+         }
+      };
+
+
+      try {
+         latchFlushed.setCount(2);
+
+         // simulating a low load period
+         timedBuffer.addBytes(buff, true, callback);
+         Thread.sleep(1000);
+         timedBuffer.addBytes(buff, true, callback);
+         Assert.assertTrue(latchFlushed.await(5, TimeUnit.SECONDS));
+         latchFlushed.setCount(5);
+
+
+         flushes.set(0);
+
+         // Sending like crazy... still some wait (1 millisecond) between each send..
+         long time = System.currentTimeMillis();
+         for (int i = 0; i < 5; i++) {
+            timedBuffer.addBytes(buff, true, callback);
+            Thread.sleep(1);
+         }
+         Assert.assertTrue(latchFlushed.await(5, TimeUnit.SECONDS));
+
+         // The purpose of the timed buffer is to batch writes up to a millisecond.. or up to the size of the buffer.
+         Assert.assertTrue("Timed Buffer is not batching accordingly, it was expected to take at least 500 seconds batching multiple writes while it took " + (System.currentTimeMillis() - time) + " milliseconds", System.currentTimeMillis() - time >= 500);
+
+         // it should be in fact only writing once..
+         // i will set for 3 just in case there's a GC or anything else happening on the test
+         Assert.assertTrue("Too many writes were called", flushes.get() <= 3);
+      } finally {
+         timedBuffer.stop();
+      }
+
+
+
+   }
 
    @Test
    public void testTimingAndFlush() throws Exception {