You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/07/07 22:46:58 UTC

[activemq-artemis] branch main updated: ARTEMIS-3327 Reverting 5c051e98329eb69a1d923ce20c689d2aee065348 and adding test to validate contract with sync.

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 000f83d  ARTEMIS-3327 Reverting 5c051e98329eb69a1d923ce20c689d2aee065348 and adding test to validate contract with sync.
000f83d is described below

commit 000f83dbc2a0820c3bca985332ebfbc2f600a98c
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Wed Jul 7 16:19:08 2021 -0400

    ARTEMIS-3327 Reverting 5c051e98329eb69a1d923ce20c689d2aee065348 and adding test to validate
    contract with sync.
    
    This reverts commit 5c051e98329eb69a1d923ce20c689d2aee065348.
    
    However this is adding two tests to make sure there won't be a regression on this.
---
 .../artemis/core/journal/impl/JournalImpl.java     |   6 +
 .../unit/core/journal/impl/JournalAsyncTest.java   | 128 +++++++++++++++++++++
 .../impl/fakes/FakeSequentialFileFactory.java      |  11 ++
 3 files changed, 145 insertions(+)

diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 8509db6..ed6f38a 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -935,6 +935,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(addRecordEncodeSize, maxRecordSize);
       }
 
+      final SimpleFuture<Boolean> result = newSyncAndCallbackResult(sync, callback);
       appendExecutor.execute(new Runnable() {
          @Override
          public void run() {
@@ -951,9 +952,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                                   ", usedFile = " +
                                   usedFile);
                }
+               result.set(true);
             } catch (ActiveMQShutdownException e) {
+               result.fail(e);
                logger.error("appendAddRecord:" + e, e);
             } catch (Throwable e) {
+               result.fail(e);
                setErrorCondition(callback, null, e);
                logger.error("appendAddRecord::"  + e, e);
             } finally {
@@ -961,6 +965,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
             }
          }
       });
+
+      result.get();
    }
 
 
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java
index 00539a7..e0368f8 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/JournalAsyncTest.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.core.journal.IOCompletion;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
 import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
@@ -117,6 +118,133 @@ public class JournalAsyncTest extends ActiveMQTestBase {
       }
    }
 
+   @Test
+   public void testAsyncAppendRecord1() throws Exception {
+      final int JOURNAL_SIZE = 20000;
+
+      setupJournal(JOURNAL_SIZE, 100, 5);
+
+      final CountDownLatch latch = new CountDownLatch(1);
+
+      final CountDownLatch latchHoldDirect = new CountDownLatch(1);
+
+      try {
+         factory.setWriteDirectCallback(() -> {
+            try {
+               latchHoldDirect.await(10, TimeUnit.MINUTES);
+            } catch (Throwable ignored) {
+            }
+         });
+
+         class LocalThread extends Thread {
+
+            Exception e;
+
+            @Override
+            public void run() {
+               try {
+                  journalImpl.appendAddRecord(1, (byte) 1, new SimpleEncoding(1, (byte) 0), true, null);
+                  latch.countDown();
+               } catch (Exception e) {
+                  e.printStackTrace();
+                  this.e = e;
+               }
+            }
+         }
+
+         LocalThread t = new LocalThread();
+         t.start();
+
+         Assert.assertFalse("journal.append with sync true should hold until the write is done", latch.await(100, TimeUnit.MILLISECONDS));
+
+         Thread.yield();
+
+         Assert.assertTrue(t.isAlive());
+
+         latchHoldDirect.countDown();
+
+         Assert.assertTrue(latch.await(30, TimeUnit.SECONDS));
+
+         t.join();
+
+         Assert.assertFalse(t.isAlive());
+
+         if (t.e != null) {
+            throw t.e;
+         }
+      } finally {
+         latchHoldDirect.countDown();
+      }
+   }
+
+
+   @Test
+   public void testAsyncAppendRecord2() throws Exception {
+      final int JOURNAL_SIZE = 20000;
+
+      setupJournal(JOURNAL_SIZE, 100, 5);
+
+      final CountDownLatch latch = new CountDownLatch(1);
+
+      final CountDownLatch latchHoldDirect = new CountDownLatch(1);
+
+      try {
+         factory.setWriteDirectCallback(() -> {
+            try {
+               latchHoldDirect.await(10, TimeUnit.MINUTES);
+            } catch (Throwable ignored) {
+            }
+         });
+
+         class LocalThread extends Thread {
+
+            Exception e;
+
+            @Override
+            public void run() {
+               try {
+                  journalImpl.appendAddRecord(1, (byte) 1, new SimpleEncoding(1, (byte) 0), true, new IOCompletion() {
+                     @Override
+                     public void storeLineUp() {
+                     }
+
+                     @Override
+                     public void done() {
+                     }
+
+                     @Override
+                     public void onError(int errorCode, String errorMessage) {
+                     }
+                  });
+                  latch.countDown();
+               } catch (Exception e) {
+                  e.printStackTrace();
+                  this.e = e;
+               }
+            }
+         }
+
+         LocalThread t = new LocalThread();
+         t.start();
+
+         Assert.assertTrue("journal.append with sync true and IOContext should not hold thread", latch.await(10, TimeUnit.SECONDS));
+
+         latchHoldDirect.countDown();
+
+         Assert.assertTrue(latch.await(30, TimeUnit.SECONDS));
+
+         t.join();
+
+         Assert.assertFalse(t.isAlive());
+
+         if (t.e != null) {
+            throw t.e;
+         }
+      } finally {
+         latchHoldDirect.countDown();
+      }
+   }
+
    // If a callback error already arrived, we should just throw the exception right away
    @Test
    public void testPreviousError() throws Exception {
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
index 0c5d89e..8c0a1f7 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
@@ -43,6 +43,9 @@ public class FakeSequentialFileFactory implements SequentialFileFactory {
 
    private final boolean supportsCallback;
 
+
+   private Runnable writeDirectCallback;
+
    private volatile boolean holdCallbacks;
 
    private ListenerHoldCallback holdCallbackListener;
@@ -81,6 +84,10 @@ public class FakeSequentialFileFactory implements SequentialFileFactory {
       return 1;
    }
 
+   public void setWriteDirectCallback(Runnable writeDirectCallback) {
+      this.writeDirectCallback = writeDirectCallback;
+   }
+
    // Public --------------------------------------------------------
 
    @Override
@@ -420,6 +427,10 @@ public class FakeSequentialFileFactory implements SequentialFileFactory {
             action.run();
          }
 
+         if (writeDirectCallback != null) {
+            writeDirectCallback.run();
+         }
+
       }
 
       @Override