You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/03/28 15:54:45 UTC

[07/16] activemq-artemis git commit: ARTEMIS-1447 JDBC NodeManager to support JDBC HA Shared Store

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java
new file mode 100644
index 0000000..136f5db
--- /dev/null
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.impl.jdbc;
+
+import java.sql.SQLException;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.jdbc.store.drivers.derby.DerbySQLProvider;
+import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class JdbcLeaseLockTest {
+
+   private static final long DEFAULT_LOCK_EXPIRATION_MILLIS = TimeUnit.SECONDS.toMillis(10);
+   private static final SQLProvider SQL_PROVIDER = new DerbySQLProvider.Factory().create(ActiveMQDefaultConfiguration.getDefaultNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER);
+   private static final String JDBC_URL = "jdbc:derby:memory:server_lock_db;create=true";
+   private static final String DRIVER_CLASS_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
+   private JdbcSharedStateManager jdbcSharedStateManager;
+
+   private LeaseLock lock() {
+      return lock(DEFAULT_LOCK_EXPIRATION_MILLIS);
+   }
+
+   private LeaseLock lock(long acquireMillis) {
+      try {
+         return JdbcSharedStateManager.createLiveLock(UUID.randomUUID().toString(), jdbcSharedStateManager.getConnection(), SQL_PROVIDER, acquireMillis, 0);
+      } catch (SQLException e) {
+         throw new IllegalStateException(e);
+      }
+   }
+
+   @Before
+   public void createLockTable() {
+      jdbcSharedStateManager = JdbcSharedStateManager.usingConnectionUrl(UUID.randomUUID().toString(), DEFAULT_LOCK_EXPIRATION_MILLIS, JDBC_URL, DRIVER_CLASS_NAME, SQL_PROVIDER);
+   }
+
+   @After
+   public void dropLockTable() throws Exception {
+      jdbcSharedStateManager.destroy();
+      jdbcSharedStateManager.close();
+   }
+
+   @Test
+   public void shouldAcquireLock() {
+      final LeaseLock lock = lock();
+      final boolean acquired = lock.tryAcquire();
+      Assert.assertTrue("Must acquire the lock!", acquired);
+      try {
+         Assert.assertTrue("The lock is been held by the caller!", lock.isHeldByCaller());
+      } finally {
+         lock.release();
+      }
+   }
+
+   @Test
+   public void shouldNotAcquireLockWhenAlreadyHeldByOthers() {
+      final LeaseLock lock = lock();
+      Assert.assertTrue("Must acquire the lock", lock.tryAcquire());
+      try {
+         Assert.assertTrue("Lock held by the caller", lock.isHeldByCaller());
+         final LeaseLock failingLock = lock();
+         Assert.assertFalse("lock already held by other", failingLock.tryAcquire());
+         Assert.assertFalse("lock already held by other", failingLock.isHeldByCaller());
+         Assert.assertTrue("lock already held by other", failingLock.isHeld());
+      } finally {
+         lock.release();
+      }
+   }
+
+   @Test
+   public void shouldNotAcquireLockTwice() {
+      final LeaseLock lock = lock();
+      Assert.assertTrue("Must acquire the lock", lock.tryAcquire());
+      try {
+         Assert.assertFalse("lock already acquired", lock.tryAcquire());
+      } finally {
+         lock.release();
+      }
+   }
+
+   @Test
+   public void shouldNotCorruptGuardedState() throws InterruptedException {
+      final AtomicLong sharedState = new AtomicLong(0);
+      final int producers = 2;
+      final int writesPerProducer = 10;
+      final long idleMillis = 1000;
+      final long millisToAcquireLock = writesPerProducer * (producers - 1) * idleMillis;
+      final LeaseLock.Pauser pauser = LeaseLock.Pauser.sleep(idleMillis, TimeUnit.MILLISECONDS);
+      final CountDownLatch finished = new CountDownLatch(producers);
+      final LeaseLock[] locks = new LeaseLock[producers];
+      final AtomicInteger lockIndex = new AtomicInteger(0);
+      final Runnable producerTask = () -> {
+         final LeaseLock lock = locks[lockIndex.getAndIncrement()];
+         try {
+            for (int i = 0; i < writesPerProducer; i++) {
+               final LeaseLock.AcquireResult acquireResult = lock.tryAcquire(millisToAcquireLock, pauser, () -> true);
+               if (acquireResult != LeaseLock.AcquireResult.Done) {
+                  throw new IllegalStateException(acquireResult + " from " + Thread.currentThread());
+               }
+               //avoid the atomic getAndIncrement operation on purpose
+               sharedState.lazySet(sharedState.get() + 1);
+               lock.release();
+            }
+         } finally {
+            finished.countDown();
+         }
+      };
+      final Thread[] producerThreads = new Thread[producers];
+      for (int i = 0; i < producers; i++) {
+         locks[i] = lock();
+         producerThreads[i] = new Thread(producerTask);
+      }
+      Stream.of(producerThreads).forEach(Thread::start);
+      final long maxTestTime = millisToAcquireLock * writesPerProducer * producers;
+      Assert.assertTrue("Each producers must complete the writes", finished.await(maxTestTime, TimeUnit.MILLISECONDS));
+      Assert.assertEquals("locks hasn't mutual excluded producers", writesPerProducer * producers, sharedState.get());
+   }
+
+   @Test
+   public void shouldAcquireExpiredLock() throws InterruptedException {
+      final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1));
+      Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
+      try {
+         Thread.sleep(lock.expirationMillis() * 2);
+         Assert.assertFalse("lock is already expired", lock.isHeldByCaller());
+         Assert.assertFalse("lock is already expired", lock.isHeld());
+         Assert.assertTrue("lock is already expired", lock.tryAcquire());
+      } finally {
+         lock.release();
+      }
+   }
+
+   @Test
+   public void shouldOtherAcquireExpiredLock() throws InterruptedException {
+      final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1));
+      Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
+      try {
+         Thread.sleep(lock.expirationMillis() * 2);
+         Assert.assertFalse("lock is already expired", lock.isHeldByCaller());
+         Assert.assertFalse("lock is already expired", lock.isHeld());
+         final LeaseLock otherLock = lock(TimeUnit.SECONDS.toMillis(10));
+         try {
+            Assert.assertTrue("lock is already expired", otherLock.tryAcquire());
+         } finally {
+            otherLock.release();
+         }
+      } finally {
+         lock.release();
+      }
+   }
+
+   @Test
+   public void shouldRenewAcquiredLock() throws InterruptedException {
+      final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(10));
+      Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
+      try {
+         Assert.assertTrue("lock is owned", lock.renew());
+      } finally {
+         lock.release();
+      }
+   }
+
+   @Test
+   public void shouldNotRenewReleasedLock() throws InterruptedException {
+      final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(10));
+      Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
+      lock.release();
+      Assert.assertFalse("lock is already released", lock.isHeldByCaller());
+      Assert.assertFalse("lock is already released", lock.isHeld());
+      Assert.assertFalse("lock is already released", lock.renew());
+   }
+
+   @Test
+   public void shouldRenewExpiredLockNotAcquiredByOthers() throws InterruptedException {
+      final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1));
+      Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
+      try {
+         Thread.sleep(lock.expirationMillis() * 2);
+         Assert.assertFalse("lock is already expired", lock.isHeldByCaller());
+         Assert.assertFalse("lock is already expired", lock.isHeld());
+         Assert.assertTrue("lock is owned", lock.renew());
+      } finally {
+         lock.release();
+      }
+   }
+
+   @Test
+   public void shouldNotRenewLockAcquiredByOthers() throws InterruptedException {
+      final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1));
+      Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
+      try {
+         Thread.sleep(lock.expirationMillis() * 2);
+         Assert.assertFalse("lock is already expired", lock.isHeldByCaller());
+         Assert.assertFalse("lock is already expired", lock.isHeld());
+         final LeaseLock otherLock = lock(TimeUnit.SECONDS.toMillis(10));
+         Assert.assertTrue("lock is already expired", otherLock.tryAcquire());
+         try {
+            Assert.assertFalse("lock is owned by others", lock.renew());
+         } finally {
+            otherLock.release();
+         }
+      } finally {
+         lock.release();
+      }
+   }
+}
+

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/565b8175/docs/user-manual/en/persistence.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/persistence.md b/docs/user-manual/en/persistence.md
index 8a422ed..e4cf12f 100644
--- a/docs/user-manual/en/persistence.md
+++ b/docs/user-manual/en/persistence.md
@@ -436,6 +436,21 @@ To configure Apache ActiveMQ Artemis to use a database for persisting messages a
 
     The JDBC network connection timeout in milliseconds. The default value
     is 20000 milliseconds (ie 20 seconds).
+    
+-   `jdbc-lock-acquisition-timeout`
+
+    The max allowed time in milliseconds while trying to acquire a JDBC lock. The default value
+    is 60000 milliseconds (ie 60 seconds).
+        
+-   `jdbc-lock-renew-period`
+
+    The period in milliseconds of the keep alive service of a JDBC lock. The default value
+    is 2000 milliseconds (ie 2 seconds).
+    
+-   `jdbc-lock-expiration`
+
+    The time in milliseconds a JDBC lock is considered valid without keeping it alive. The default value
+    is 20000 milliseconds (ie 20 seconds).
 
 ## Configuring Apache ActiveMQ Artemis for Zero Persistence