You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/01/16 15:46:32 UTC

[2/2] qpid-broker-j git commit: QPID-8070:[Broker-J][JDBC Store] Instantiate asynchronous commits executor on open of JDBC message store

QPID-8070:[Broker-J][JDBC Store] Instantiate asynchronous commits executor on open of JDBC message store

Cherry picked from 14a53f619628570426466655ef2aa1edac9085a3


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/31d81ffe
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/31d81ffe
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/31d81ffe

Branch: refs/heads/6.1.x
Commit: 31d81ffe24b7c928bc855186258f74a8cf890a9a
Parents: 852aa0b
Author: Alex Rudyy <or...@apache.org>
Authored: Mon Jan 15 17:50:08 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Tue Jan 16 15:44:28 2018 +0000

----------------------------------------------------------------------
 .../jdbc/GenericAbstractJDBCMessageStore.java   |  1 +
 .../server/store/jdbc/JDBCMessageStoreTest.java | 67 ++++++++++++++++++++
 2 files changed, 68 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/31d81ffe/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
index d46ec4d..bbfe88a 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
@@ -43,6 +43,7 @@ public abstract class GenericAbstractJDBCMessageStore extends org.apache.qpid.se
         {
             _parent = parent;
             super.setTablePrefix(getTablePrefix(parent));
+            initMessageStore(parent);
             doOpen(parent);
 
             createOrOpenMessageStoreDatabase();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/31d81ffe/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
----------------------------------------------------------------------
diff --git a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
index a3f9271..2bd51c6 100644
--- a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
+++ b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
@@ -31,10 +31,20 @@ import java.sql.SQLException;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.MessageStoreTestCase;
+import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.virtualhost.jdbc.JDBCVirtualHost;
 
 public class JDBCMessageStoreTest extends MessageStoreTestCase
@@ -78,6 +88,63 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
         assertTablesExist(expectedTables, false);
     }
 
+    public void testEnqueueTransactionCommitAsync() throws Exception
+    {
+        final String queueName = getTestName();
+        final UUID transactionalLogId = UUID.randomUUID();
+
+        final MessageStore store = getStore();
+        final TransactionLogResource transactionalLog = mockTransactionLogResource(transactionalLogId, queueName);
+        final InternalMessage message = addTestMessage(store, "test");
+
+        final Transaction transaction = store.newTransaction();
+        final MessageEnqueueRecord record = transaction.enqueueMessage(transactionalLog, message);
+
+        assertNotNull("Message enqueue record should not be null", record);
+        assertEquals("Unexpected queue id", transactionalLogId, record.getQueueId());
+        assertEquals("Unexpected message number", message.getMessageNumber(), record.getMessageNumber());
+
+        final ListenableFuture<Void> future = transaction.commitTranAsync(null);
+        future.get(1000, TimeUnit.MILLISECONDS);
+    }
+
+    public void testDequeueTransactionCommitAsync() throws Exception
+    {
+        final String queueName = getTestName();
+        final UUID transactionalLogId = UUID.randomUUID();
+
+        final MessageStore store = getStore();
+        final TransactionLogResource transactionalLog = mockTransactionLogResource(transactionalLogId, queueName);
+        final InternalMessage message = addTestMessage(store, "test2");
+
+        final Transaction enqueueTransaction = store.newTransaction();
+        MessageEnqueueRecord record = enqueueTransaction.enqueueMessage(transactionalLog, message);
+        enqueueTransaction.commitTran();
+
+        final Transaction dequeueTransaction = store.newTransaction();
+        dequeueTransaction.dequeueMessage(record);
+
+        final ListenableFuture<Void> future = dequeueTransaction.commitTranAsync(null);
+        future.get(1000, TimeUnit.MILLISECONDS);
+    }
+
+    private InternalMessage addTestMessage(final MessageStore store,
+                                           final String messageContent)
+    {
+        final AMQMessageHeader amqpHeader = mock(AMQMessageHeader.class);
+        return InternalMessage.createMessage(store, amqpHeader, messageContent, true);
+    }
+
+    private TransactionLogResource mockTransactionLogResource(final UUID transactionalLogId,
+                                                              final String transactionalLogName)
+    {
+        final TransactionLogResource transactionalLog = mock(TransactionLogResource.class);
+        when(transactionalLog.getId()).thenReturn(transactionalLogId);
+        when(transactionalLog.getName()).thenReturn(transactionalLogName);
+        when(transactionalLog.getMessageDurability()).thenReturn(MessageDurability.ALWAYS);
+        return transactionalLog;
+    }
+
     @Override
     protected VirtualHost createVirtualHost()
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org