You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/01/15 17:59:43 UTC

qpid-broker-j git commit: QPID-8070:[Broker-J][JDBC Store] Instantiate asynchronous commits executor on open of JDBC message store

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 3b92c3063 -> 17d4b5607


QPID-8070:[Broker-J][JDBC Store] Instantiate asynchronous commits executor on open of JDBC message store


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/17d4b560
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/17d4b560
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/17d4b560

Branch: refs/heads/master
Commit: 17d4b5607f3fe73b9a37c53d38b7db980cf0b245
Parents: 3b92c30
Author: Alex Rudyy <or...@apache.org>
Authored: Mon Jan 15 17:50:08 2018 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Mon Jan 15 17:59:35 2018 +0000

----------------------------------------------------------------------
 .../jdbc/GenericAbstractJDBCMessageStore.java   |  1 +
 .../server/store/jdbc/JDBCMessageStoreTest.java | 68 ++++++++++++++++++++
 2 files changed, 69 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/17d4b560/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
index 5aba1f8..cef4f79 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
@@ -43,6 +43,7 @@ public abstract class GenericAbstractJDBCMessageStore extends AbstractJDBCMessag
         {
             _parent = parent;
             super.setTablePrefix(getTablePrefix(parent));
+            initMessageStore(parent);
             doOpen(parent);
 
             createOrOpenMessageStoreDatabase();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/17d4b560/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
----------------------------------------------------------------------
diff --git a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
index 3220cef..5e77791 100644
--- a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
+++ b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
@@ -31,11 +31,21 @@ import java.sql.SQLException;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.ListenableFuture;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.MessageStoreTestCase;
+import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.virtualhost.jdbc.JDBCVirtualHost;
 
 public class JDBCMessageStoreTest extends MessageStoreTestCase
@@ -92,6 +102,64 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
         assertTablesExist(expectedTables, false);
     }
 
+    public void testEnqueueTransactionCommitAsync() throws Exception
+    {
+        final String queueName = getTestName();
+        final UUID transactionalLogId = UUID.randomUUID();
+
+        final MessageStore store = getStore();
+        final TransactionLogResource transactionalLog = mockTransactionLogResource(transactionalLogId, queueName);
+        final InternalMessage message = addTestMessage(store, queueName, "test");
+
+        final Transaction transaction = store.newTransaction();
+        final MessageEnqueueRecord record = transaction.enqueueMessage(transactionalLog, message);
+
+        assertNotNull("Message enqueue record should not be null", record);
+        assertEquals("Unexpected queue id", transactionalLogId, record.getQueueId());
+        assertEquals("Unexpected message number", message.getMessageNumber(), record.getMessageNumber());
+
+        final ListenableFuture<Void> future = transaction.commitTranAsync(null);
+        future.get(1000, TimeUnit.MILLISECONDS);
+    }
+
+    public void testDequeueTransactionCommitAsync() throws Exception
+    {
+        final String queueName = getTestName();
+        final UUID transactionalLogId = UUID.randomUUID();
+
+        final MessageStore store = getStore();
+        final TransactionLogResource transactionalLog = mockTransactionLogResource(transactionalLogId, queueName);
+        final InternalMessage message = addTestMessage(store, queueName, "test2");
+
+        final Transaction enqueueTransaction = store.newTransaction();
+        MessageEnqueueRecord record = enqueueTransaction.enqueueMessage(transactionalLog, message);
+        enqueueTransaction.commitTran();
+
+        final Transaction dequeueTransaction = store.newTransaction();
+        dequeueTransaction.dequeueMessage(record);
+
+        final ListenableFuture<Void> future = dequeueTransaction.commitTranAsync(null);
+        future.get(1000, TimeUnit.MILLISECONDS);
+    }
+
+    private InternalMessage addTestMessage(final MessageStore store,
+                                           final String transactionalLogName,
+                                           final String messageContent)
+    {
+        final AMQMessageHeader amqpHeader = mock(AMQMessageHeader.class);
+        return InternalMessage.createMessage(store, amqpHeader, messageContent, true, transactionalLogName);
+    }
+
+    private TransactionLogResource mockTransactionLogResource(final UUID transactionalLogId,
+                                                              final String transactionalLogName)
+    {
+        final TransactionLogResource transactionalLog = mock(TransactionLogResource.class);
+        when(transactionalLog.getId()).thenReturn(transactionalLogId);
+        when(transactionalLog.getName()).thenReturn(transactionalLogName);
+        when(transactionalLog.getMessageDurability()).thenReturn(MessageDurability.ALWAYS);
+        return transactionalLog;
+    }
+
     @Override
     protected VirtualHost createVirtualHost()
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org