You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/01/16 15:29:27 UTC
[1/2] qpid-broker-j git commit: QPID-8070:[Broker-J][JDBC Store]
Instantiate asynchronous commits executor on open of JDBC message store
Repository: qpid-broker-j
Updated Branches:
refs/heads/7.0.x ed1b9752b -> 887f535ea
QPID-8070:[Broker-J][JDBC Store] Instantiate asynchronous commits executor on open of JDBC message store
Cherry picked from 17d4b5607f3fe73b9a37c53d38b7db980cf0b245
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/14a53f61
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/14a53f61
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/14a53f61
Branch: refs/heads/7.0.x
Commit: 14a53f619628570426466655ef2aa1edac9085a3
Parents: ed1b975
Author: Alex Rudyy <or...@apache.org>
Authored: Mon Jan 15 17:50:08 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Tue Jan 16 15:21:21 2018 +0000
----------------------------------------------------------------------
.../jdbc/GenericAbstractJDBCMessageStore.java | 1 +
.../server/store/jdbc/JDBCMessageStoreTest.java | 68 ++++++++++++++++++++
2 files changed, 69 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/14a53f61/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
index 5aba1f8..cef4f79 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
@@ -43,6 +43,7 @@ public abstract class GenericAbstractJDBCMessageStore extends AbstractJDBCMessag
{
_parent = parent;
super.setTablePrefix(getTablePrefix(parent));
+ initMessageStore(parent);
doOpen(parent);
createOrOpenMessageStoreDatabase();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/14a53f61/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
----------------------------------------------------------------------
diff --git a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
index 3220cef..5e77791 100644
--- a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
+++ b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
@@ -31,11 +31,21 @@ import java.sql.SQLException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.MessageDurability;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreTestCase;
+import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.virtualhost.jdbc.JDBCVirtualHost;
public class JDBCMessageStoreTest extends MessageStoreTestCase
@@ -92,6 +102,64 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
assertTablesExist(expectedTables, false);
}
+ public void testEnqueueTransactionCommitAsync() throws Exception
+ {
+ final String queueName = getTestName();
+ final UUID transactionalLogId = UUID.randomUUID();
+
+ final MessageStore store = getStore();
+ final TransactionLogResource transactionalLog = mockTransactionLogResource(transactionalLogId, queueName);
+ final InternalMessage message = addTestMessage(store, queueName, "test");
+
+ final Transaction transaction = store.newTransaction();
+ final MessageEnqueueRecord record = transaction.enqueueMessage(transactionalLog, message);
+
+ assertNotNull("Message enqueue record should not be null", record);
+ assertEquals("Unexpected queue id", transactionalLogId, record.getQueueId());
+ assertEquals("Unexpected message number", message.getMessageNumber(), record.getMessageNumber());
+
+ final ListenableFuture<Void> future = transaction.commitTranAsync(null);
+ future.get(1000, TimeUnit.MILLISECONDS);
+ }
+
+ public void testDequeueTransactionCommitAsync() throws Exception
+ {
+ final String queueName = getTestName();
+ final UUID transactionalLogId = UUID.randomUUID();
+
+ final MessageStore store = getStore();
+ final TransactionLogResource transactionalLog = mockTransactionLogResource(transactionalLogId, queueName);
+ final InternalMessage message = addTestMessage(store, queueName, "test2");
+
+ final Transaction enqueueTransaction = store.newTransaction();
+ MessageEnqueueRecord record = enqueueTransaction.enqueueMessage(transactionalLog, message);
+ enqueueTransaction.commitTran();
+
+ final Transaction dequeueTransaction = store.newTransaction();
+ dequeueTransaction.dequeueMessage(record);
+
+ final ListenableFuture<Void> future = dequeueTransaction.commitTranAsync(null);
+ future.get(1000, TimeUnit.MILLISECONDS);
+ }
+
+ private InternalMessage addTestMessage(final MessageStore store,
+ final String transactionalLogName,
+ final String messageContent)
+ {
+ final AMQMessageHeader amqpHeader = mock(AMQMessageHeader.class);
+ return InternalMessage.createMessage(store, amqpHeader, messageContent, true, transactionalLogName);
+ }
+
+ private TransactionLogResource mockTransactionLogResource(final UUID transactionalLogId,
+ final String transactionalLogName)
+ {
+ final TransactionLogResource transactionalLog = mock(TransactionLogResource.class);
+ when(transactionalLog.getId()).thenReturn(transactionalLogId);
+ when(transactionalLog.getName()).thenReturn(transactionalLogName);
+ when(transactionalLog.getMessageDurability()).thenReturn(MessageDurability.ALWAYS);
+ return transactionalLog;
+ }
+
@Override
protected VirtualHost createVirtualHost()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-broker-j git commit: QPID-8070: [Broker-J] [Unit Tests]
Prevent tests leaking in-memory Derby instances
Posted by kw...@apache.org.
QPID-8070: [Broker-J] [Unit Tests] Prevent tests leaking in-memory Derby instances
Cherry picked fromac5587287d67e1965273155a9d70e63d50f0c3b1
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/887f535e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/887f535e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/887f535e
Branch: refs/heads/7.0.x
Commit: 887f535ea766ef9197b28a95f223b742044e5908
Parents: 14a53f6
Author: Keith Wall <kw...@apache.org>
Authored: Tue Jan 16 15:17:37 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Tue Jan 16 15:21:40 2018 +0000
----------------------------------------------------------------------
.../server/store/jdbc/JDBCMessageStoreTest.java | 19 +++++++------------
1 file changed, 7 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/887f535e/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
----------------------------------------------------------------------
diff --git a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
index 5e77791..279ee78 100644
--- a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
+++ b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
@@ -50,7 +50,7 @@ import org.apache.qpid.server.virtualhost.jdbc.JDBCVirtualHost;
public class JDBCMessageStoreTest extends MessageStoreTestCase
{
- public static final String TEST_TABLE_PREFIX = "TEST_TABLE_PREFIX_";
+ private static final String TEST_TABLE_PREFIX = "TEST_TABLE_PREFIX_";
private String _connectionURL;
private static final int BUFFER_SIZE = 10;
private static final int POOL_SIZE = 20;
@@ -163,10 +163,10 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
@Override
protected VirtualHost createVirtualHost()
{
- _connectionURL = "jdbc:derby:memory:/" + getTestName() + ";create=true";
+ _connectionURL = "jdbc:derby:memory:/" + getTestName();
final JDBCVirtualHost jdbcVirtualHost = mock(JDBCVirtualHost.class);
- when(jdbcVirtualHost.getConnectionUrl()).thenReturn(_connectionURL);
+ when(jdbcVirtualHost.getConnectionUrl()).thenReturn(_connectionURL + ";create=true");
when(jdbcVirtualHost.getUsername()).thenReturn("test");
when(jdbcVirtualHost.getPassword()).thenReturn("pass");
when(jdbcVirtualHost.getTableNamePrefix()).thenReturn(TEST_TABLE_PREFIX);
@@ -192,24 +192,19 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
private Set<String> getTableNames() throws SQLException
{
- Set<String> tableNames = new HashSet<String>();
+ Set<String> tableNames = new HashSet<>();
Connection conn = null;
try
{
conn = openConnection();
DatabaseMetaData metaData = conn.getMetaData();
- ResultSet tables = metaData.getTables(null, null, null, new String[] { "TABLE" });
- try
+ try (ResultSet tables = metaData.getTables(null, null, null, new String[]{"TABLE"}))
{
while (tables.next())
{
tableNames.add(tables.getString("TABLE_NAME"));
}
}
- finally
- {
- tables.close();
- }
}
finally
{
@@ -226,12 +221,12 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
return DriverManager.getConnection(_connectionURL);
}
- public static void shutdownDerby(String connectionURL) throws SQLException
+ static void shutdownDerby(String connectionURL) throws SQLException
{
Connection connection = null;
try
{
- connection = DriverManager.getConnection(connectionURL);
+ connection = DriverManager.getConnection(connectionURL + ";shutdown=true");
}
catch(SQLException e)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org