You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/01/16 15:29:27 UTC

[1/2] qpid-broker-j git commit: QPID-8070:[Broker-J][JDBC Store] Instantiate asynchronous commits executor on open of JDBC message store

Repository: qpid-broker-j
Updated Branches:
  refs/heads/7.0.x ed1b9752b -> 887f535ea


QPID-8070:[Broker-J][JDBC Store] Instantiate asynchronous commits executor on open of JDBC message store

Cherry picked from 17d4b5607f3fe73b9a37c53d38b7db980cf0b245


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/14a53f61
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/14a53f61
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/14a53f61

Branch: refs/heads/7.0.x
Commit: 14a53f619628570426466655ef2aa1edac9085a3
Parents: ed1b975
Author: Alex Rudyy <or...@apache.org>
Authored: Mon Jan 15 17:50:08 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Tue Jan 16 15:21:21 2018 +0000

----------------------------------------------------------------------
 .../jdbc/GenericAbstractJDBCMessageStore.java   |  1 +
 .../server/store/jdbc/JDBCMessageStoreTest.java | 68 ++++++++++++++++++++
 2 files changed, 69 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/14a53f61/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
index 5aba1f8..cef4f79 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
@@ -43,6 +43,7 @@ public abstract class GenericAbstractJDBCMessageStore extends AbstractJDBCMessag
         {
             _parent = parent;
             super.setTablePrefix(getTablePrefix(parent));
+            initMessageStore(parent);
             doOpen(parent);
 
             createOrOpenMessageStoreDatabase();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/14a53f61/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
----------------------------------------------------------------------
diff --git a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
index 3220cef..5e77791 100644
--- a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
+++ b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
@@ -31,11 +31,21 @@ import java.sql.SQLException;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.ListenableFuture;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.MessageStoreTestCase;
+import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.virtualhost.jdbc.JDBCVirtualHost;
 
 public class JDBCMessageStoreTest extends MessageStoreTestCase
@@ -92,6 +102,64 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
         assertTablesExist(expectedTables, false);
     }
 
+    public void testEnqueueTransactionCommitAsync() throws Exception
+    {
+        final String queueName = getTestName();
+        final UUID transactionalLogId = UUID.randomUUID();
+
+        final MessageStore store = getStore();
+        final TransactionLogResource transactionalLog = mockTransactionLogResource(transactionalLogId, queueName);
+        final InternalMessage message = addTestMessage(store, queueName, "test");
+
+        final Transaction transaction = store.newTransaction();
+        final MessageEnqueueRecord record = transaction.enqueueMessage(transactionalLog, message);
+
+        assertNotNull("Message enqueue record should not be null", record);
+        assertEquals("Unexpected queue id", transactionalLogId, record.getQueueId());
+        assertEquals("Unexpected message number", message.getMessageNumber(), record.getMessageNumber());
+
+        final ListenableFuture<Void> future = transaction.commitTranAsync(null);
+        future.get(1000, TimeUnit.MILLISECONDS);
+    }
+
+    public void testDequeueTransactionCommitAsync() throws Exception
+    {
+        final String queueName = getTestName();
+        final UUID transactionalLogId = UUID.randomUUID();
+
+        final MessageStore store = getStore();
+        final TransactionLogResource transactionalLog = mockTransactionLogResource(transactionalLogId, queueName);
+        final InternalMessage message = addTestMessage(store, queueName, "test2");
+
+        final Transaction enqueueTransaction = store.newTransaction();
+        MessageEnqueueRecord record = enqueueTransaction.enqueueMessage(transactionalLog, message);
+        enqueueTransaction.commitTran();
+
+        final Transaction dequeueTransaction = store.newTransaction();
+        dequeueTransaction.dequeueMessage(record);
+
+        final ListenableFuture<Void> future = dequeueTransaction.commitTranAsync(null);
+        future.get(1000, TimeUnit.MILLISECONDS);
+    }
+
+    private InternalMessage addTestMessage(final MessageStore store,
+                                           final String transactionalLogName,
+                                           final String messageContent)
+    {
+        final AMQMessageHeader amqpHeader = mock(AMQMessageHeader.class);
+        return InternalMessage.createMessage(store, amqpHeader, messageContent, true, transactionalLogName);
+    }
+
+    private TransactionLogResource mockTransactionLogResource(final UUID transactionalLogId,
+                                                              final String transactionalLogName)
+    {
+        final TransactionLogResource transactionalLog = mock(TransactionLogResource.class);
+        when(transactionalLog.getId()).thenReturn(transactionalLogId);
+        when(transactionalLog.getName()).thenReturn(transactionalLogName);
+        when(transactionalLog.getMessageDurability()).thenReturn(MessageDurability.ALWAYS);
+        return transactionalLog;
+    }
+
     @Override
     protected VirtualHost createVirtualHost()
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-broker-j git commit: QPID-8070: [Broker-J] [Unit Tests] Prevent tests leaking in-memory Derby instances

Posted by kw...@apache.org.
QPID-8070: [Broker-J] [Unit Tests] Prevent tests leaking in-memory Derby instances

Cherry picked fromac5587287d67e1965273155a9d70e63d50f0c3b1


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/887f535e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/887f535e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/887f535e

Branch: refs/heads/7.0.x
Commit: 887f535ea766ef9197b28a95f223b742044e5908
Parents: 14a53f6
Author: Keith Wall <kw...@apache.org>
Authored: Tue Jan 16 15:17:37 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Tue Jan 16 15:21:40 2018 +0000

----------------------------------------------------------------------
 .../server/store/jdbc/JDBCMessageStoreTest.java  | 19 +++++++------------
 1 file changed, 7 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/887f535e/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
----------------------------------------------------------------------
diff --git a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
index 5e77791..279ee78 100644
--- a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
+++ b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
@@ -50,7 +50,7 @@ import org.apache.qpid.server.virtualhost.jdbc.JDBCVirtualHost;
 
 public class JDBCMessageStoreTest extends MessageStoreTestCase
 {
-    public static final String TEST_TABLE_PREFIX = "TEST_TABLE_PREFIX_";
+    private static final String TEST_TABLE_PREFIX = "TEST_TABLE_PREFIX_";
     private String _connectionURL;
     private static final int BUFFER_SIZE = 10;
     private static final int POOL_SIZE = 20;
@@ -163,10 +163,10 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
     @Override
     protected VirtualHost createVirtualHost()
     {
-        _connectionURL = "jdbc:derby:memory:/" + getTestName() + ";create=true";
+        _connectionURL = "jdbc:derby:memory:/" + getTestName();
 
         final JDBCVirtualHost jdbcVirtualHost = mock(JDBCVirtualHost.class);
-        when(jdbcVirtualHost.getConnectionUrl()).thenReturn(_connectionURL);
+        when(jdbcVirtualHost.getConnectionUrl()).thenReturn(_connectionURL + ";create=true");
         when(jdbcVirtualHost.getUsername()).thenReturn("test");
         when(jdbcVirtualHost.getPassword()).thenReturn("pass");
         when(jdbcVirtualHost.getTableNamePrefix()).thenReturn(TEST_TABLE_PREFIX);
@@ -192,24 +192,19 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
 
     private Set<String> getTableNames() throws SQLException
     {
-        Set<String> tableNames = new HashSet<String>();
+        Set<String> tableNames = new HashSet<>();
         Connection conn = null;
         try
         {
             conn = openConnection();
             DatabaseMetaData metaData = conn.getMetaData();
-            ResultSet tables = metaData.getTables(null, null, null, new String[] { "TABLE" });
-            try
+            try (ResultSet tables = metaData.getTables(null, null, null, new String[]{"TABLE"}))
             {
                 while (tables.next())
                 {
                     tableNames.add(tables.getString("TABLE_NAME"));
                 }
             }
-            finally
-            {
-                tables.close();
-            }
         }
         finally
         {
@@ -226,12 +221,12 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
         return DriverManager.getConnection(_connectionURL);
     }
 
-    public static void shutdownDerby(String connectionURL) throws SQLException
+    static void shutdownDerby(String connectionURL) throws SQLException
     {
         Connection connection = null;
         try
         {
-            connection = DriverManager.getConnection(connectionURL);
+            connection = DriverManager.getConnection(connectionURL + ";shutdown=true");
         }
         catch(SQLException e)
         {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org