You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/04/05 00:10:25 UTC
svn commit: r1584926 [2/3] - in
/qpid/branches/java-broker-config-store-changes/qpid/java: ./
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/
bdbstore/src/main/java/o...
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Fri Apr 4 22:10:24 2014
@@ -49,7 +49,10 @@ import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.plugin.MessageMetaDataType;
-import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonParseException;
@@ -221,19 +224,125 @@ abstract public class AbstractJDBCMessag
}
@Override
- public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
+ public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
{
checkConfigurationStoreOpen();
try
{
- recoveryHandler.beginConfigurationRecovery(this, getConfigVersion());
- loadConfiguredObjects(recoveryHandler);
- setConfigVersion(recoveryHandler.completeConfigurationRecovery());
+ int configVersion = getConfigVersion();
+
+ handler.begin(configVersion);
+ doVisitAllConfiguredObjectRecords(handler);
+
+ int newConfigVersion = handler.end();
+ if(newConfigVersion != configVersion)
+ {
+ setConfigVersion(newConfigVersion);
+ }
}
catch (SQLException e)
{
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ throw new StoreException("Cannot visit configured object records", e);
+ }
+
+ }
+
+ private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+ Map<UUID, ConfiguredObjectRecordImpl> configuredObjects = new HashMap<UUID, ConfiguredObjectRecordImpl>();
+ final ObjectMapper objectMapper = new ObjectMapper();
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
+ try
+ {
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ while (rs.next())
+ {
+ String id = rs.getString(1);
+ String objectType = rs.getString(2);
+ String attributes = getBlobAsString(rs, 3);
+ final ConfiguredObjectRecordImpl configuredObjectRecord =
+ new ConfiguredObjectRecordImpl(UUID.fromString(id), objectType,
+ objectMapper.readValue(attributes, Map.class));
+ configuredObjects.put(configuredObjectRecord.getId(),configuredObjectRecord);
+
+ }
+ }
+ catch (JsonMappingException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ catch (JsonParseException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY);
+ try
+ {
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ while (rs.next())
+ {
+ UUID childId = UUID.fromString(rs.getString(1));
+ String parentType = rs.getString(2);
+ UUID parentId = UUID.fromString(rs.getString(3));
+
+ ConfiguredObjectRecordImpl child = configuredObjects.get(childId);
+ ConfiguredObjectRecordImpl parent = configuredObjects.get(parentId);
+
+ if(child != null && parent != null)
+ {
+ child.addParent(parentType, parent);
+ }
+ else if(child != null && child.getType().endsWith("Binding") && parentType.equals("Exchange"))
+ {
+ // TODO - remove this hack for amq. exchanges
+ child.addParent(parentType, new ConfiguredObjectRecordImpl(parentId, parentType, Collections.<String,Object>emptyMap()));
+ }
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ }
+ finally
+ {
+ conn.close();
+ }
+
+ for(ConfiguredObjectRecord record : configuredObjects.values())
+ {
+ boolean shoudlContinue = handler.handle(record);
+ if (!shoudlContinue)
+ {
+ break;
+ }
}
}
@@ -282,44 +391,25 @@ abstract public class AbstractJDBCMessag
{
createOrOpenMessageStoreDatabase();
upgradeIfNecessary(parent);
- }
- catch (SQLException e)
- {
- throw new StoreException("Unable to activate message store ", e);
- }
- }
- }
- @Override
- public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler)
- {
- checkMessageStoreOpen();
-
- if(messageRecoveryHandler != null)
- {
- try
- {
- recoverMessages(messageRecoveryHandler);
- }
- catch (SQLException e)
- {
- throw new StoreException("Error encountered when restoring message data from " +
- "persistent store ", e);
- }
- }
- if(transactionLogRecoveryHandler != null)
- {
- try
- {
- TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(transactionLogRecoveryHandler);
- recoverXids(dtxrh);
+ visitMessages(new MessageHandler()
+ {
+ @Override
+ public boolean handle(StoredMessage<?> storedMessage)
+ {
+ long id = storedMessage.getMessageNumber();
+ if (_messageId.get() < id)
+ {
+ _messageId.set(id);
+ }
+ return true;
+ }
+ });
}
catch (SQLException e)
{
- throw new StoreException("Error encountered when restoring distributed transaction " +
- "data from persistent store ", e);
+ throw new StoreException("Unable to activate message store ", e);
}
-
}
}
@@ -1043,11 +1133,9 @@ abstract public class AbstractJDBCMessag
getLogger().debug("Enqueuing message "
+ messageId
+ " on queue "
- + (queue instanceof AMQQueue
- ? ((AMQQueue) queue).getName()
- : "")
- + queue.getId()
- + "[Connection"
+ + queue.getName()
+ + " with id " + queue.getId()
+ + " [Connection"
+ conn
+ "]");
}
@@ -1068,7 +1156,7 @@ abstract public class AbstractJDBCMessag
catch (SQLException e)
{
getLogger().error("Failed to enqueue: " + e.getMessage(), e);
- throw new StoreException("Error writing enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + " with id " + queue.getId()
+ throw new StoreException("Error writing enqueued message with id " + messageId + " for queue " + queue.getName() + " with id " + queue.getId()
+ " to database", e);
}
@@ -1093,15 +1181,13 @@ abstract public class AbstractJDBCMessag
if(results != 1)
{
- throw new StoreException("Unable to find message with id " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" )
+ throw new StoreException("Unable to find message with id " + messageId + " on queue " + queue.getName()
+ " with id " + queue.getId());
}
if (getLogger().isDebugEnabled())
{
- getLogger().debug("Dequeuing message " + messageId + " on queue " + (queue instanceof AMQQueue
- ? ((AMQQueue) queue).getName()
- : "")
+ getLogger().debug("Dequeuing message " + messageId + " on queue " + queue.getName()
+ " with id " + queue.getId());
}
}
@@ -1114,7 +1200,7 @@ abstract public class AbstractJDBCMessag
catch (SQLException e)
{
getLogger().error("Failed to dequeue: " + e.getMessage(), e);
- throw new StoreException("Error deleting enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" )
+ throw new StoreException("Error deleting enqueued message with id " + messageId + " for queue " + queue.getName()
+ " with id " + queue.getId() + " from database", e);
}
@@ -1363,131 +1449,6 @@ abstract public class AbstractJDBCMessag
}
- private void recoverMessages(MessageStoreRecoveryHandler recoveryHandler) throws SQLException
- {
- Connection conn = newAutoCommitConnection();
- try
- {
- MessageStoreRecoveryHandler.StoredMessageRecoveryHandler messageHandler = recoveryHandler.begin();
-
- Statement stmt = conn.createStatement();
- try
- {
- ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA);
- try
- {
-
- long maxId = 0;
-
- while(rs.next())
- {
-
- long messageId = rs.getLong(1);
- if(messageId > maxId)
- {
- maxId = messageId;
- }
-
- byte[] dataAsBytes = getBlobAsBytes(rs, 2);
-
- ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
- buf.position(1);
- buf = buf.slice();
- MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
- StorableMessageMetaData metaData = type.createMetaData(buf);
- StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true);
- messageHandler.message(message);
- }
-
- _messageId.set(maxId);
-
- messageHandler.completeMessageRecovery();
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- }
- finally
- {
- conn.close();
- }
- }
-
-
- private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException
- {
- Connection conn = newAutoCommitConnection();
- try
- {
- TransactionLogRecoveryHandler.QueueEntryRecoveryHandler queueEntryHandler = recoveryHandler.begin(this);
-
- Statement stmt = conn.createStatement();
- try
- {
- ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
- try
- {
- while(rs.next())
- {
-
- String id = rs.getString(1);
- long messageId = rs.getLong(2);
- queueEntryHandler.queueEntry(UUID.fromString(id), messageId);
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
-
- return queueEntryHandler.completeQueueEntryRecovery();
- }
- finally
- {
- conn.close();
- }
- }
-
- private static final class Xid
- {
-
- private final long _format;
- private final byte[] _globalId;
- private final byte[] _branchId;
-
- public Xid(long format, byte[] globalId, byte[] branchId)
- {
- _format = format;
- _globalId = globalId;
- _branchId = branchId;
- }
-
- public long getFormat()
- {
- return _format;
- }
-
- public byte[] getGlobalId()
- {
- return _globalId;
- }
-
- public byte[] getBranchId()
- {
- return _branchId;
- }
- }
private static class RecordImpl implements Transaction.Record, TransactionLogResource, EnqueueableMessage
{
@@ -1550,93 +1511,6 @@ abstract public class AbstractJDBCMessag
}
}
- private void recoverXids(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh) throws SQLException
- {
- Connection conn = newAutoCommitConnection();
- try
- {
- List<Xid> xids = new ArrayList<Xid>();
-
- Statement stmt = conn.createStatement();
- try
- {
- ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_XIDS);
- try
- {
- while(rs.next())
- {
-
- long format = rs.getLong(1);
- byte[] globalId = rs.getBytes(2);
- byte[] branchId = rs.getBytes(3);
- xids.add(new Xid(format, globalId, branchId));
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
-
-
-
- for(Xid xid : xids)
- {
- List<RecordImpl> enqueues = new ArrayList<RecordImpl>();
- List<RecordImpl> dequeues = new ArrayList<RecordImpl>();
-
- PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS);
-
- try
- {
- pstmt.setLong(1, xid.getFormat());
- pstmt.setBytes(2, xid.getGlobalId());
- pstmt.setBytes(3, xid.getBranchId());
-
- ResultSet rs = pstmt.executeQuery();
- try
- {
- while(rs.next())
- {
-
- String actionType = rs.getString(1);
- UUID queueId = UUID.fromString(rs.getString(2));
- long messageId = rs.getLong(3);
-
- RecordImpl record = new RecordImpl(queueId, messageId);
- List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues;
- records.add(record);
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- pstmt.close();
- }
-
- dtxrh.dtxRecord(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(),
- enqueues.toArray(new RecordImpl[enqueues.size()]),
- dequeues.toArray(new RecordImpl[dequeues.size()]));
- }
-
-
- dtxrh.completeDtxRecordRecovery();
- }
- finally
- {
- conn.close();
- }
-
- }
-
private StorableMessageMetaData getMetaData(long messageId) throws SQLException
{
@@ -2357,43 +2231,81 @@ abstract public class AbstractJDBCMessag
}
}
- private void loadConfiguredObjects(ConfigurationRecoveryHandler recoveryHandler) throws SQLException,
- StoreException
+ @Override
+ public void visitMessages(MessageHandler handler) throws StoreException
{
- Connection conn = newAutoCommitConnection();
- Map<UUID, ConfiguredObjectRecordImpl> configuredObjects = new HashMap<UUID, ConfiguredObjectRecordImpl>();
- final ObjectMapper objectMapper = new ObjectMapper();
+ checkMessageStoreOpen();
+
+ Connection conn = null;
try
{
- PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
+ conn = newAutoCommitConnection();
+ Statement stmt = conn.createStatement();
try
{
- ResultSet rs = stmt.executeQuery();
+ ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA);
try
{
while (rs.next())
{
- String id = rs.getString(1);
- String objectType = rs.getString(2);
- String attributes = getBlobAsString(rs, 3);
- final ConfiguredObjectRecordImpl configuredObjectRecord =
- new ConfiguredObjectRecordImpl(UUID.fromString(id), objectType,
- objectMapper.readValue(attributes, Map.class));
- configuredObjects.put(configuredObjectRecord.getId(),configuredObjectRecord);
-
+ long messageId = rs.getLong(1);
+ byte[] dataAsBytes = getBlobAsBytes(rs, 2);
+ ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
+ buf.position(1);
+ buf = buf.slice();
+ MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
+ StorableMessageMetaData metaData = type.createMetaData(buf);
+ StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true);
+ if (!handler.handle(message))
+ {
+ break;
+ }
}
}
- catch (JsonMappingException e)
- {
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
- }
- catch (JsonParseException e)
+ finally
{
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ rs.close();
}
- catch (IOException e)
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Error encountered when visiting messages", e);
+ }
+ finally
+ {
+ closeConnection(conn);
+ }
+ }
+
+ @Override
+ public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ Connection conn = null;
+ try
+ {
+ conn = newAutoCommitConnection();
+ Statement stmt = conn.createStatement();
+ try
+ {
+ ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
+ try
{
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ while(rs.next())
+ {
+ String id = rs.getString(1);
+ long messageId = rs.getLong(2);
+ if (!handler.handle(UUID.fromString(id), messageId))
+ {
+ break;
+ }
+ }
}
finally
{
@@ -2404,31 +2316,41 @@ abstract public class AbstractJDBCMessag
{
stmt.close();
}
- stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY);
+ }
+ catch(SQLException e)
+ {
+ throw new StoreException("Error encountered when visiting message instances", e);
+ }
+ finally
+ {
+ closeConnection(conn);
+ }
+ }
+
+ @Override
+ public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ Connection conn = null;
+ try
+ {
+ conn = newAutoCommitConnection();
+ List<Xid> xids = new ArrayList<Xid>();
+
+ Statement stmt = conn.createStatement();
try
{
- ResultSet rs = stmt.executeQuery();
+ ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_XIDS);
try
{
- while (rs.next())
+ while(rs.next())
{
- UUID childId = UUID.fromString(rs.getString(1));
- String parentType = rs.getString(2);
- UUID parentId = UUID.fromString(rs.getString(3));
-
- ConfiguredObjectRecordImpl child = configuredObjects.get(childId);
- ConfiguredObjectRecordImpl parent = configuredObjects.get(parentId);
-
- if(child != null && parent != null)
- {
- child.addParent(parentType, parent);
- }
- else if(child != null && child.getType().endsWith("Binding") && parentType.equals("Exchange"))
- {
- // TODO - remove this hack for amq. exchanges
- child.addParent(parentType, new ConfiguredObjectRecordImpl(parentId, parentType, Collections.<String,Object>emptyMap()));
- }
+ long format = rs.getLong(1);
+ byte[] globalId = rs.getBytes(2);
+ byte[] branchId = rs.getBytes(3);
+ xids.add(new Xid(format, globalId, branchId));
}
}
finally
@@ -2441,18 +2363,67 @@ abstract public class AbstractJDBCMessag
stmt.close();
}
+
+
+ for(Xid xid : xids)
+ {
+ List<RecordImpl> enqueues = new ArrayList<RecordImpl>();
+ List<RecordImpl> dequeues = new ArrayList<RecordImpl>();
+
+ PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS);
+
+ try
+ {
+ pstmt.setLong(1, xid.getFormat());
+ pstmt.setBytes(2, xid.getGlobalId());
+ pstmt.setBytes(3, xid.getBranchId());
+
+ ResultSet rs = pstmt.executeQuery();
+ try
+ {
+ while(rs.next())
+ {
+
+ String actionType = rs.getString(1);
+ UUID queueId = UUID.fromString(rs.getString(2));
+ long messageId = rs.getLong(3);
+
+ RecordImpl record = new RecordImpl(queueId, messageId);
+ List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues;
+ records.add(record);
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ pstmt.close();
+ }
+
+ if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(),
+ enqueues.toArray(new RecordImpl[enqueues.size()]),
+ dequeues.toArray(new RecordImpl[dequeues.size()])))
+ {
+ break;
+ }
+ }
+
}
- finally
+ catch (SQLException e)
{
- conn.close();
- }
+ throw new StoreException("Error encountered when visiting distributed transactions", e);
- for(ConfiguredObjectRecord record : configuredObjects.values())
+ }
+ finally
{
- recoveryHandler.configuredObject(record);
+ closeConnection(conn);
}
}
+
protected abstract String getBlobAsString(ResultSet rs, int col) throws SQLException;
protected abstract void storedSizeChange(int storeSizeIncrease);
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java Fri Apr 4 22:10:24 2014
@@ -20,17 +20,36 @@
*/
package org.apache.qpid.server.store;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.Transaction.Record;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
/** A simple message store that stores the messages in a thread-safe structure in memory. */
-abstract public class AbstractMemoryMessageStore extends NullMessageStore
+abstract class AbstractMemoryMessageStore implements MessageStore, DurableConfigurationStore
{
- private final AtomicLong _messageId = new AtomicLong(1);
-
- private static final Transaction IN_MEMORY_TRANSACTION = new Transaction()
+ private final class MemoryMessageStoreTransaction implements Transaction
{
+ private Map<UUID, Set<Long>> _localEnqueueMap = new HashMap<UUID, Set<Long>>();
+ private Map<UUID, Set<Long>> _localDequeueMap = new HashMap<UUID, Set<Long>>();
+
+ private Map<Xid, DistributedTransactionRecords> _localDistributedTransactionsRecords = new HashMap<Xid, DistributedTransactionRecords>();
+ private Set<Xid> _localDistributedTransactionsRemoves = new HashSet<Xid>();
+
@Override
public StoreFuture commitTranAsync()
{
@@ -40,50 +59,145 @@ abstract public class AbstractMemoryMess
@Override
public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
{
+ Set<Long> messageIds = _localEnqueueMap.get(queue.getId());
+ if (messageIds == null)
+ {
+ messageIds = new HashSet<Long>();
+ _localEnqueueMap.put(queue.getId(), messageIds);
+ }
+ messageIds.add(message.getMessageNumber());
}
@Override
- public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
+ public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
{
+ Set<Long> messageIds = _localDequeueMap.get(queue.getId());
+ if (messageIds == null)
+ {
+ messageIds = new HashSet<Long>();
+ _localDequeueMap.put(queue.getId(), messageIds);
+ }
+ messageIds.add(message.getMessageNumber());
}
@Override
public void commitTran()
{
+ commitTransactionInternal(this);
+ _localEnqueueMap.clear();
+ _localDequeueMap.clear();
}
@Override
public void abortTran()
{
+ _localEnqueueMap.clear();
+ _localDequeueMap.clear();
}
@Override
public void removeXid(long format, byte[] globalId, byte[] branchId)
{
+ _localDistributedTransactionsRemoves.add(new Xid(format, globalId, branchId));
}
@Override
public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
{
+ _localDistributedTransactionsRecords.put(new Xid(format, globalId, branchId), new DistributedTransactionRecords(enqueues, dequeues));
}
- };
+ }
+
+ private final AtomicLong _messageId = new AtomicLong(1);
- private final EventManager _eventManager = new EventManager();
+ private final ConcurrentHashMap<UUID, ConfiguredObjectRecord> _configuredObjectRecords = new ConcurrentHashMap<UUID, ConfiguredObjectRecord>();
+ protected ConcurrentHashMap<Long, StoredMemoryMessage> _messages = new ConcurrentHashMap<Long, StoredMemoryMessage>();
+ private Object _transactionLock = new Object();
+ private Map<UUID, Set<Long>> _messageInstances = new HashMap<UUID, Set<Long>>();
+ private Map<Xid, DistributedTransactionRecords> _distributedTransactions = new HashMap<Xid, DistributedTransactionRecords>();
+
+ @SuppressWarnings("unchecked")
@Override
- public StoredMessage addMessage(StorableMessageMetaData metaData)
+ public StoredMessage<StorableMessageMetaData> addMessage(final StorableMessageMetaData metaData)
+ {
+ long id = _messageId.getAndIncrement();
+
+ if(metaData.isPersistent())
+ {
+ return new StoredMemoryMessage(id, metaData)
+ {
+
+ @Override
+ public StoreFuture flushToStore()
+ {
+ _messages.putIfAbsent(getMessageNumber(), this) ;
+ return super.flushToStore();
+ }
+
+ @Override
+ public void remove()
+ {
+ _messages.remove(getMessageNumber());
+ super.remove();
+ }
+
+ };
+ }
+ else
+ {
+ return new StoredMemoryMessage(id, metaData);
+ }
+ }
+
+ private void commitTransactionInternal(MemoryMessageStoreTransaction transaction)
{
- final long id = _messageId.getAndIncrement();
- StoredMemoryMessage message = new StoredMemoryMessage(id, metaData);
+ synchronized (_transactionLock )
+ {
+ for (Map.Entry<UUID, Set<Long>> loacalEnqueuedEntry : transaction._localEnqueueMap.entrySet())
+ {
+ Set<Long> messageIds = _messageInstances.get(loacalEnqueuedEntry.getKey());
+ if (messageIds == null)
+ {
+ messageIds = new HashSet<Long>();
+ _messageInstances.put(loacalEnqueuedEntry.getKey(), messageIds);
+ }
+ messageIds.addAll(loacalEnqueuedEntry.getValue());
+ }
+
+ for (Map.Entry<UUID, Set<Long>> loacalDequeueEntry : transaction._localDequeueMap.entrySet())
+ {
+ Set<Long> messageIds = _messageInstances.get(loacalDequeueEntry.getKey());
+ if (messageIds != null)
+ {
+ messageIds.removeAll(loacalDequeueEntry.getValue());
+ if (messageIds.isEmpty())
+ {
+ _messageInstances.remove(loacalDequeueEntry.getKey());
+ }
+ }
+ }
+
+ for (Map.Entry<Xid, DistributedTransactionRecords> entry : transaction._localDistributedTransactionsRecords.entrySet())
+ {
+ _distributedTransactions.put(entry.getKey(), entry.getValue());
+ }
+
+ for (Xid removed : transaction._localDistributedTransactionsRemoves)
+ {
+ _distributedTransactions.remove(removed);
+ }
+
+ }
+
- return message;
}
@Override
public Transaction newTransaction()
{
- return IN_MEMORY_TRANSACTION;
+ return new MemoryMessageStoreTransaction();
}
@Override
@@ -95,7 +209,164 @@ abstract public class AbstractMemoryMess
@Override
public void addEventListener(EventListener eventListener, Event... events)
{
- _eventManager.addEventListener(eventListener, events);
}
+ @Override
+ public void create(ConfiguredObjectRecord record)
+ {
+ if (_configuredObjectRecords.putIfAbsent(record.getId(), record) != null)
+ {
+ throw new StoreException("Record with id " + record.getId() + " is already present");
+ }
+ }
+
+ @Override
+ public void update(boolean createIfNecessary, ConfiguredObjectRecord... records)
+ {
+ for (ConfiguredObjectRecord record : records)
+ {
+ ConfiguredObjectRecord previousValue = _configuredObjectRecords.replace(record.getId(), record);
+ if (previousValue == null && !createIfNecessary)
+ {
+ throw new StoreException("Record with id " + record.getId() + " does not exist");
+ }
+ }
+ }
+
+ @Override
+ public UUID[] remove(final ConfiguredObjectRecord... objects)
+ {
+ List<UUID> removed = new ArrayList<UUID>();
+ for (ConfiguredObjectRecord record : objects)
+ {
+ if (_configuredObjectRecords.remove(record.getId()) != null)
+ {
+ removed.add(record.getId());
+ }
+ }
+ return removed.toArray(new UUID[removed.size()]);
+ }
+
+ @Override
+ public void closeConfigurationStore()
+ {
+ _configuredObjectRecords.clear();
+ }
+
+ @Override
+ public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
+ {
+ }
+
+ @Override
+ public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException
+ {
+ handler.begin(VirtualHost.CURRENT_CONFIG_VERSION);
+ for (ConfiguredObjectRecord record : _configuredObjectRecords.values())
+ {
+ if (!handler.handle(record))
+ {
+ break;
+ }
+ }
+ handler.end();
+ }
+
+ @Override
+ public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings)
+ {
+ }
+
+ @Override
+ public void closeMessageStore()
+ {
+ _messages.clear();
+ synchronized (_transactionLock)
+ {
+ _messageInstances.clear();
+ _distributedTransactions.clear();
+ }
+ }
+
+ @Override
+ public String getStoreLocation()
+ {
+ return null;
+ }
+
+ @Override
+ public void onDelete()
+ {
+ }
+
+ @Override
+ public void visitMessages(MessageHandler handler) throws StoreException
+ {
+ for (StoredMemoryMessage message : _messages.values())
+ {
+ if(!handler.handle(message))
+ {
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+ {
+ synchronized (_transactionLock)
+ {
+ for (Map.Entry<UUID, Set<Long>> enqueuedEntry : _messageInstances.entrySet())
+ {
+ UUID resourceId = enqueuedEntry.getKey();
+ for (Long messageId : enqueuedEntry.getValue())
+ {
+ if (!handler.handle(resourceId, messageId))
+ {
+ return;
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
+ {
+ synchronized (_transactionLock)
+ {
+ for (Map.Entry<Xid, DistributedTransactionRecords> entry : _distributedTransactions.entrySet())
+ {
+ Xid xid = entry.getKey();
+ DistributedTransactionRecords records = entry.getValue();
+ if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(), records.getEnqueues(), records.getDequeues()))
+ {
+ break;
+ }
+ }
+ }
+ }
+
+ private static final class DistributedTransactionRecords
+ {
+ private Record[] _enqueues;
+ private Record[] _dequeues;
+
+ public DistributedTransactionRecords(Record[] enqueues, Record[] dequeues)
+ {
+ super();
+ _enqueues = enqueues;
+ _dequeues = dequeues;
+ }
+
+ public Record[] getEnqueues()
+ {
+ return _enqueues;
+ }
+
+ public Record[] getDequeues()
+ {
+ return _dequeues;
+ }
+ }
}
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java Fri Apr 4 22:10:24 2014
@@ -20,11 +20,12 @@
*/
package org.apache.qpid.server.store;
-import org.apache.qpid.server.model.ConfiguredObject;
-
import java.util.Map;
import java.util.UUID;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+
public interface DurableConfigurationStore
{
String STORE_TYPE = "storeType";
@@ -47,12 +48,6 @@ public interface DurableConfigurationSto
void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) throws StoreException;
/**
- * Recovers configuration from the store using given recovery handler
- * @param recoveryHandler recovery handler
- */
- void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) throws StoreException;
-
- /**
* Makes the specified object persistent.
*
* @param object The object to persist.
@@ -85,4 +80,11 @@ public interface DurableConfigurationSto
void closeConfigurationStore() throws StoreException;
+ /**
+ * Visit all configured object records with given handler.
+ *
+ * @param handler a handler to invoke on each configured object record
+ * @throws StoreException
+ */
+ void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException;
}
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java Fri Apr 4 22:10:24 2014
@@ -31,6 +31,7 @@ import java.util.*;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.JsonProcessingException;
@@ -97,22 +98,27 @@ public class JsonFileConfigStore impleme
}
@Override
- public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
+ public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
{
- recoveryHandler.beginConfigurationRecovery(this,_configVersion);
+ handler.begin(_configVersion);
List<ConfiguredObjectRecord> records = new ArrayList<ConfiguredObjectRecord>(_objectsById.values());
for(ConfiguredObjectRecord record : records)
{
- recoveryHandler.configuredObject(record);
+ boolean shouldContinue = handler.handle(record);
+ if (!shouldContinue)
+ {
+ break;
+ }
}
int oldConfigVersion = _configVersion;
- _configVersion = recoveryHandler.completeConfigurationRecovery();
+ _configVersion = handler.end();
if(oldConfigVersion != _configVersion)
{
save();
}
}
+
private void setup(final Map<String, Object> configurationStoreSettings)
{
Object storePathAttr = configurationStoreSettings.get(DurableConfigurationStore.STORE_PATH);
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java Fri Apr 4 22:10:24 2014
@@ -23,6 +23,9 @@ package org.apache.qpid.server.store;
import java.util.Map;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
/**
* MessageStore defines the interface to a storage area, which can be used to preserve the state of messages.
@@ -43,13 +46,6 @@ public interface MessageStore
*/
void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings);
- /**
- * Called after opening to recover messages and transactions with given recovery handlers
- * @param messageRecoveryHandler
- * @param transactionLogRecoveryHandler
- */
- void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler);
-
public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData);
@@ -71,8 +67,10 @@ public interface MessageStore
String getStoreLocation();
- // TODO dead method - remove??
- String getStoreType();
-
void onDelete();
+
+ void visitMessages(MessageHandler handler) throws StoreException;
+ void visitMessageInstances(MessageInstanceHandler handler) throws StoreException;
+ void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException;
+
}
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java Fri Apr 4 22:10:24 2014
@@ -23,6 +23,10 @@ import java.util.Map;
import java.util.UUID;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore
{
@@ -33,11 +37,6 @@ public abstract class NullMessageStore i
}
@Override
- public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
- {
- }
-
- @Override
public void update(boolean createIfNecessary, ConfiguredObjectRecord... records)
{
}
@@ -92,11 +91,6 @@ public abstract class NullMessageStore i
}
@Override
- public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler)
- {
- }
-
- @Override
public void addEventListener(EventListener eventListener, Event... events)
{
}
@@ -112,4 +106,24 @@ public abstract class NullMessageStore i
{
}
+ @Override
+ public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException
+ {
+ }
+
+ @Override
+ public void visitMessages(MessageHandler handler) throws StoreException
+ {
+ }
+
+ @Override
+ public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
+ {
+ }
+
+ @Override
+ public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
+ {
+ }
+
}
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java Fri Apr 4 22:10:24 2014
@@ -78,6 +78,10 @@ public class DefaultUpgraderProvider imp
public DurableConfigurationStoreUpgrader getUpgrader(final int configVersion, DurableConfigurationRecoverer recoverer)
{
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Getting upgrader for configVersion: " + configVersion);
+ }
DurableConfigurationStoreUpgrader currentUpgrader = null;
switch(configVersion)
{
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java Fri Apr 4 22:10:24 2014
@@ -29,11 +29,11 @@ import org.apache.qpid.server.logging.su
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.plugin.MessageStoreFactory;
import org.apache.qpid.server.stats.StatisticsGatherer;
-
-import org.apache.qpid.server.store.DurableConfigurationRecoverer;
+import org.apache.qpid.server.store.ConfiguredObjectRecordRecoveverAndUpgrader;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.DurableConfigurationStoreCreator;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
public class StandardVirtualHost extends AbstractVirtualHost
{
@@ -107,18 +107,22 @@ public class StandardVirtualHost extends
if (_configurationStoreLogSubject != null)
{
getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.STORE_LOCATION(configurationStoreSettings.toString()));
+ getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.RECOVERY_START());
}
- DurableConfigurationRecoverer configRecoverer = new DurableConfigurationRecoverer(getName(), getDurableConfigurationRecoverers(),
- new DefaultUpgraderProvider(this), getEventLogger());
+ ConfiguredObjectRecordHandler upgraderRecoverer = new ConfiguredObjectRecordRecoveverAndUpgrader(this, getDurableConfigurationRecoverers());
+
+ _durableConfigurationStore.visitConfiguredObjectRecords(upgraderRecoverer);
- _durableConfigurationStore.recoverConfigurationStore(configRecoverer);
+ if (_configurationStoreLogSubject != null)
+ {
+ getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.RECOVERY_COMPLETE());
+ }
// If store does not have entries for standard exchanges (amq.*), the following will create them.
initialiseModel();
- VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this, getMessageStoreLogSubject());
- _messageStore.recoverMessageStore(recoveryHandler, recoveryHandler);
+ new MessageStoreRecoverer(this, getMessageStoreLogSubject()).recover();
attainActivation();
}
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java Fri Apr 4 22:10:24 2014
@@ -50,10 +50,10 @@ import org.apache.qpid.server.model.Prot
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.SystemContext;
import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.test.utils.QpidTestCase;
public class ManagementModeStoreHandlerTest extends QpidTestCase
@@ -89,20 +89,22 @@ public class ManagementModeStoreHandlerT
when(_portEntry.getParents()).thenReturn(Collections.singletonMap(Broker.class.getSimpleName(), _root));
when(_portEntry.getType()).thenReturn(Port.class.getSimpleName());
- final ArgumentCaptor<ConfigurationRecoveryHandler> recovererArgumentCaptor = ArgumentCaptor.forClass(ConfigurationRecoveryHandler.class);
+ final ArgumentCaptor<ConfiguredObjectRecordHandler> recovererArgumentCaptor = ArgumentCaptor.forClass(ConfiguredObjectRecordHandler.class);
doAnswer(
new Answer()
{
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable
{
- ConfigurationRecoveryHandler recoverer = recovererArgumentCaptor.getValue();
- recoverer.configuredObject(_root);
- recoverer.configuredObject(_portEntry);
+ ConfiguredObjectRecordHandler recoverer = recovererArgumentCaptor.getValue();
+ if(recoverer.handle(_root))
+ {
+ recoverer.handle(_portEntry);
+ }
return null;
}
}
- ).when(_store).recoverConfigurationStore(recovererArgumentCaptor.capture());
+ ).when(_store).visitConfiguredObjectRecords(recovererArgumentCaptor.capture());
_options = new BrokerOptions();
_handler = new ManagementModeStoreHandler(_store, _options);
@@ -112,21 +114,21 @@ public class ManagementModeStoreHandlerT
private ConfiguredObjectRecord getRootEntry()
{
BrokerFinder brokerFinder = new BrokerFinder();
- _handler.recoverConfigurationStore(brokerFinder);
+ _handler.visitConfiguredObjectRecords(brokerFinder);
return brokerFinder.getBrokerRecord();
}
private ConfiguredObjectRecord getEntry(UUID id)
{
RecordFinder recordFinder = new RecordFinder(id);
- _handler.recoverConfigurationStore(recordFinder);
+ _handler.visitConfiguredObjectRecords(recordFinder);
return recordFinder.getFoundRecord();
}
private Collection<UUID> getChildrenIds(ConfiguredObjectRecord record)
{
ChildFinder childFinder = new ChildFinder(record);
- _handler.recoverConfigurationStore(childFinder);
+ _handler.visitConfiguredObjectRecords(childFinder);
return childFinder.getChildIds();
}
@@ -288,21 +290,25 @@ public class ManagementModeStoreHandlerT
attributes.put(VirtualHost.TYPE, "STANDARD");
final ConfiguredObjectRecord virtualHost = new ConfiguredObjectRecordImpl(virtualHostId, VirtualHost.class.getSimpleName(), attributes, Collections.singletonMap(Broker.class.getSimpleName(), _root));
- final ArgumentCaptor<ConfigurationRecoveryHandler> recovererArgumentCaptor = ArgumentCaptor.forClass(ConfigurationRecoveryHandler.class);
+ final ArgumentCaptor<ConfiguredObjectRecordHandler> recovererArgumentCaptor = ArgumentCaptor.forClass(ConfiguredObjectRecordHandler.class);
doAnswer(
new Answer()
{
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable
{
- ConfigurationRecoveryHandler recoverer = recovererArgumentCaptor.getValue();
- recoverer.configuredObject(_root);
- recoverer.configuredObject(_portEntry);
- recoverer.configuredObject(virtualHost);
+ ConfiguredObjectRecordHandler recoverer = recovererArgumentCaptor.getValue();
+ if(recoverer.handle(_root))
+ {
+ if(recoverer.handle(_portEntry))
+ {
+ recoverer.handle(virtualHost);
+ }
+ }
return null;
}
}
- ).when(_store).recoverConfigurationStore(recovererArgumentCaptor.capture());
+ ).when(_store).visitConfiguredObjectRecords(recovererArgumentCaptor.capture());
State expectedState = mmQuiesceVhosts ? State.QUIESCED : null;
if(mmQuiesceVhosts)
@@ -457,28 +463,32 @@ public class ManagementModeStoreHandlerT
}
- private class BrokerFinder implements ConfigurationRecoveryHandler
+ private class BrokerFinder implements ConfiguredObjectRecordHandler
{
private ConfiguredObjectRecord _brokerRecord;
+ private int _version;
+
@Override
- public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+ public void begin(final int configVersion)
{
-
+ _version = configVersion;
}
@Override
- public void configuredObject(final ConfiguredObjectRecord object)
+ public boolean handle(final ConfiguredObjectRecord object)
{
if(object.getType().equals(Broker.class.getSimpleName()))
{
_brokerRecord = object;
+ return false;
}
+ return true;
}
@Override
- public int completeConfigurationRecovery()
+ public int end()
{
- return 0;
+ return _version;
}
public ConfiguredObjectRecord getBrokerRecord()
@@ -487,10 +497,11 @@ public class ManagementModeStoreHandlerT
}
}
- private class RecordFinder implements ConfigurationRecoveryHandler
+ private class RecordFinder implements ConfiguredObjectRecordHandler
{
private final UUID _id;
private ConfiguredObjectRecord _foundRecord;
+ private int _version;
private RecordFinder(final UUID id)
{
@@ -498,24 +509,26 @@ public class ManagementModeStoreHandlerT
}
@Override
- public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+ public void begin(final int configVersion)
{
-
+ _version = configVersion;
}
@Override
- public void configuredObject(final ConfiguredObjectRecord object)
+ public boolean handle(final ConfiguredObjectRecord object)
{
if(object.getId().equals(_id))
{
_foundRecord = object;
+ return false;
}
+ return true;
}
@Override
- public int completeConfigurationRecovery()
+ public int end()
{
- return 0;
+ return _version;
}
public ConfiguredObjectRecord getFoundRecord()
@@ -524,10 +537,11 @@ public class ManagementModeStoreHandlerT
}
}
- private class ChildFinder implements ConfigurationRecoveryHandler
+ private class ChildFinder implements ConfiguredObjectRecordHandler
{
private final Collection<UUID> _childIds = new HashSet<UUID>();
private final ConfiguredObjectRecord _parent;
+ private int _version;
private ChildFinder(final ConfiguredObjectRecord parent)
{
@@ -535,13 +549,13 @@ public class ManagementModeStoreHandlerT
}
@Override
- public void beginConfigurationRecovery(final DurableConfigurationStore store, final int configVersion)
+ public void begin(final int configVersion)
{
-
+ _version = configVersion;
}
@Override
- public void configuredObject(final ConfiguredObjectRecord object)
+ public boolean handle(final ConfiguredObjectRecord object)
{
if(object.getParents() != null)
@@ -555,12 +569,13 @@ public class ManagementModeStoreHandlerT
}
}
+ return true;
}
@Override
- public int completeConfigurationRecovery()
+ public int end()
{
- return 0;
+ return _version;
}
public Collection<UUID> getChildIds()
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java Fri Apr 4 22:10:24 2014
@@ -21,9 +21,7 @@
package org.apache.qpid.server.store;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@@ -49,6 +47,7 @@ import org.apache.qpid.server.model.UUID
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
import org.mockito.ArgumentCaptor;
@@ -71,9 +70,8 @@ public abstract class AbstractDurableCon
private String _storePath;
private String _storeName;
- private ConfigurationRecoveryHandler _recoveryHandler;
+ private ConfiguredObjectRecordHandler _handler;
- private ExchangeImpl _exchange = mock(ExchangeImpl.class);
private static final String ROUTING_KEY = "routingKey";
private static final String QUEUE_NAME = "queueName";
private Map<String,Object> _bindingArgs;
@@ -96,16 +94,8 @@ public abstract class AbstractDurableCon
FileUtils.delete(new File(_storePath), true);
setTestSystemProperty("QPID_WORK", TMP_FOLDER);
- _recoveryHandler = mock(ConfigurationRecoveryHandler.class);
- when(_exchange.getName()).thenReturn(EXCHANGE_NAME);
- when(_exchange.getId()).thenReturn(_exchangeId);
- when(_exchange.getExchangeType()).thenReturn(mock(ExchangeType.class));
- when(_exchange.getEventLogger()).thenReturn(new EventLogger());
-
- ConfiguredObjectRecord exchangeRecord = mock(ConfiguredObjectRecord.class);
- when(exchangeRecord.getId()).thenReturn(_exchangeId);
- when(exchangeRecord.getType()).thenReturn(Exchange.class.getSimpleName());
- when(_exchange.asObjectRecord()).thenReturn(exchangeRecord);
+ _handler = mock(ConfiguredObjectRecordHandler.class);
+ when(_handler.handle(any(ConfiguredObjectRecord.class))).thenReturn(true);
_bindingArgs = new HashMap<String, Object>();
String argKey = AMQPFilterTypes.JMS_SELECTOR.toString();
@@ -134,7 +124,7 @@ public abstract class AbstractDurableCon
DurableConfigurationStoreHelper.createExchange(_configStore, exchange);
reopenStore();
- verify(_recoveryHandler).configuredObject(matchesRecord(_exchangeId, EXCHANGE,
+ verify(_handler).handle(matchesRecord(_exchangeId, EXCHANGE,
map( org.apache.qpid.server.model.Exchange.NAME, getName(),
org.apache.qpid.server.model.Exchange.TYPE, getName()+"Type",
org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.name())));
@@ -168,14 +158,16 @@ public abstract class AbstractDurableCon
DurableConfigurationStoreHelper.removeExchange(_configStore, exchange);
reopenStore();
- verify(_recoveryHandler, never()).configuredObject(any(ConfiguredObjectRecord.class));
+ verify(_handler, never()).handle(any(ConfiguredObjectRecord.class));
}
public void testBindQueue() throws Exception
{
+ ExchangeImpl<?> exchange = createTestExchange();
AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false, null);
BindingImpl binding = new BindingImpl(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue,
- _exchange, _bindingArgs);
+ exchange, _bindingArgs);
+ DurableConfigurationStoreHelper.createExchange(_configStore, exchange);
DurableConfigurationStoreHelper.createQueue(_configStore, queue);
DurableConfigurationStoreHelper.createBinding(_configStore, binding);
@@ -187,10 +179,10 @@ public abstract class AbstractDurableCon
Map<String,UUID> parents = new HashMap<String, UUID>();
- parents.put(Exchange.class.getSimpleName(), _exchange.getId());
+ parents.put(Exchange.class.getSimpleName(), exchange.getId());
parents.put(Queue.class.getSimpleName(), queue.getId());
- verify(_recoveryHandler).configuredObject(matchesRecord(binding.getId(), BINDING, map, parents));
+ verify(_handler).handle(matchesRecord(binding.getId(), BINDING, map, parents));
}
@@ -260,15 +252,18 @@ public abstract class AbstractDurableCon
public void testUnbindQueue() throws Exception
{
+ ExchangeImpl<?> exchange = createTestExchange();
+ DurableConfigurationStoreHelper.createExchange(_configStore, exchange);
+
AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false, null);
BindingImpl binding = new BindingImpl(UUIDGenerator.generateRandomUUID(), ROUTING_KEY, queue,
- _exchange, _bindingArgs);
+ exchange, _bindingArgs);
DurableConfigurationStoreHelper.createBinding(_configStore, binding);
DurableConfigurationStoreHelper.removeBinding(_configStore, binding);
reopenStore();
- verify(_recoveryHandler, never()).configuredObject(matchesRecord(ANY_UUID, BINDING,
+ verify(_handler, never()).handle(matchesRecord(ANY_UUID, BINDING,
ANY_MAP));
}
@@ -282,7 +277,7 @@ public abstract class AbstractDurableCon
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.OWNER, getName()+"Owner");
queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
- verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes));
+ verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
}
public void testCreateQueueAMQQueueFieldTable() throws Exception
@@ -304,7 +299,7 @@ public abstract class AbstractDurableCon
queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
queueAttributes.putAll(attributes);
- verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes));
+ verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
}
public void testCreateQueueAMQQueueWithAlternateExchange() throws Exception
@@ -322,7 +317,7 @@ public abstract class AbstractDurableCon
queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
- verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes));
+ verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
}
private ExchangeImpl createTestAlternateExchange()
@@ -355,7 +350,7 @@ public abstract class AbstractDurableCon
queueAttributes.put(Queue.NAME, getName());
queueAttributes.putAll(attributes);
- verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes));
+ verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
}
@@ -382,7 +377,7 @@ public abstract class AbstractDurableCon
queueAttributes.putAll(attributes);
queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
- verify(_recoveryHandler).configuredObject(matchesRecord(_queueId, QUEUE, queueAttributes));
+ verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
}
public void testRemoveQueue() throws Exception
@@ -397,7 +392,7 @@ public abstract class AbstractDurableCon
// remove queue
DurableConfigurationStoreHelper.removeQueue(_configStore,queue);
reopenStore();
- verify(_recoveryHandler, never()).configuredObject(any(ConfiguredObjectRecord.class));
+ verify(_handler, never()).handle(any(ConfiguredObjectRecord.class));
}
private AMQQueue createTestQueue(String queueName,
@@ -463,11 +458,9 @@ public abstract class AbstractDurableCon
{
ExchangeImpl exchange = mock(ExchangeImpl.class);
Map<String,Object> actualAttributes = new HashMap<String, Object>();
- actualAttributes.put("id", _exchangeId);
actualAttributes.put("name", getName());
actualAttributes.put("type", getName() + "Type");
actualAttributes.put("lifetimePolicy", LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
- when(exchange.getActualAttributes()).thenReturn(actualAttributes);
when(exchange.getName()).thenReturn(getName());
when(exchange.getTypeName()).thenReturn(getName() + "Type");
when(exchange.isAutoDelete()).thenReturn(true);
@@ -475,11 +468,10 @@ public abstract class AbstractDurableCon
ConfiguredObjectRecord exchangeRecord = mock(ConfiguredObjectRecord.class);
when(exchangeRecord.getId()).thenReturn(_exchangeId);
when(exchangeRecord.getType()).thenReturn(Exchange.class.getSimpleName());
- Map<String,Object> actualAttributesExceptId = new HashMap<String, Object>(actualAttributes);
- actualAttributesExceptId.remove("id");
- when(exchangeRecord.getAttributes()).thenReturn(actualAttributesExceptId);
+ when(exchangeRecord.getAttributes()).thenReturn(actualAttributes);
when(exchange.asObjectRecord()).thenReturn(exchangeRecord);
-
+ when(exchange.getExchangeType()).thenReturn(mock(ExchangeType.class));
+ when(exchange.getEventLogger()).thenReturn(new EventLogger());
return exchange;
}
@@ -491,7 +483,7 @@ public abstract class AbstractDurableCon
ConfiguredObject<?> parent = mock(ConfiguredObject.class);
when(parent.getName()).thenReturn("testName");
_configStore.openConfigurationStore(parent, _configurationStoreSettings);
- _configStore.recoverConfigurationStore(_recoveryHandler);
+ _configStore.visitConfiguredObjectRecords(_handler);
}
protected abstract DurableConfigurationStore createConfigStore() throws Exception;
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreConfigurationTest.java Fri Apr 4 22:10:24 2014
@@ -27,10 +27,4 @@ public class JsonFileConfigStoreConfigur
{
return new JsonFileConfigStore();
}
-
- @Override
- public void testBindQueue() throws Exception
- {
- // TODO: Temporarily disable the test as it is already fixed on trunk
- }
}
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/JsonFileConfigStoreTest.java Fri Apr 4 22:10:24 2014
@@ -28,6 +28,7 @@ import java.util.UUID;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.test.utils.TestFileUtils;
@@ -43,15 +44,15 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
public class JsonFileConfigStoreTest extends QpidTestCase
{
- private final ConfigurationRecoveryHandler _recoveryHandler = mock(ConfigurationRecoveryHandler.class);
-
private JsonFileConfigStore _store;
private HashMap<String, Object> _configurationStoreSettings;
private ConfiguredObject<?> _virtualHost;
private File _storeLocation;
+ private ConfiguredObjectRecordHandler _handler;
private static final UUID ANY_UUID = UUID.randomUUID();
@@ -69,6 +70,9 @@ public class JsonFileConfigStoreTest ext
_configurationStoreSettings.put(JsonFileConfigStore.STORE_TYPE, JsonFileConfigStore.TYPE);
_configurationStoreSettings.put(JsonFileConfigStore.STORE_PATH, _storeLocation.getAbsolutePath());
_store = new JsonFileConfigStore();
+
+ _handler = mock(ConfiguredObjectRecordHandler.class);
+ when(_handler.handle(any(ConfiguredObjectRecord.class))).thenReturn(true);
}
@Override
@@ -113,35 +117,35 @@ public class JsonFileConfigStoreTest ext
}
}
- public void testStartFromNoStore() throws Exception
+ public void testVisitEmptyStore()
{
_store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
- _store.recoverConfigurationStore(_recoveryHandler);
- InOrder inorder = inOrder(_recoveryHandler);
- inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(0));
- inorder.verify(_recoveryHandler,never()).configuredObject(any(ConfiguredObjectRecord.class));
- inorder.verify(_recoveryHandler).completeConfigurationRecovery();
+ _store.visitConfiguredObjectRecords(_handler);
+ InOrder inorder = inOrder(_handler);
+ inorder.verify(_handler).begin(eq(0));
+ inorder.verify(_handler,never()).handle(any(ConfiguredObjectRecord.class));
+ inorder.verify(_handler).end();
_store.closeConfigurationStore();
}
public void testUpdatedConfigVersionIsRetained() throws Exception
{
final int NEW_CONFIG_VERSION = 42;
- when(_recoveryHandler.completeConfigurationRecovery()).thenReturn(NEW_CONFIG_VERSION);
+ when(_handler.end()).thenReturn(NEW_CONFIG_VERSION);
_store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
- _store.recoverConfigurationStore(_recoveryHandler);
+ _store.visitConfiguredObjectRecords(_handler);
_store.closeConfigurationStore();
_store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
- _store.recoverConfigurationStore(_recoveryHandler);
- InOrder inorder = inOrder(_recoveryHandler);
+ _store.visitConfiguredObjectRecords(_handler);
+ InOrder inorder = inOrder(_handler);
// first time the config version should be the initial version - 0
- inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(0));
+ inorder.verify(_handler).begin(eq(0));
// second time the config version should be the updated version
- inorder.verify(_recoveryHandler).beginConfigurationRecovery(eq(_store), eq(NEW_CONFIG_VERSION));
+ inorder.verify(_handler).begin(eq(NEW_CONFIG_VERSION));
_store.closeConfigurationStore();
}
@@ -157,8 +161,9 @@ public class JsonFileConfigStoreTest ext
_store.closeConfigurationStore();
_store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
- _store.recoverConfigurationStore(_recoveryHandler);
- verify(_recoveryHandler).configuredObject(matchesRecord(queueId, queueType, queueAttr));
+
+ _store.visitConfiguredObjectRecords(_handler);
+ verify(_handler, times(1)).handle(matchesRecord(queueId, queueType, queueAttr));
_store.closeConfigurationStore();
}
@@ -179,8 +184,8 @@ public class JsonFileConfigStoreTest ext
_store.closeConfigurationStore();
_store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
- _store.recoverConfigurationStore(_recoveryHandler);
- verify(_recoveryHandler).configuredObject(matchesRecord(queueId, queueType, queueAttr));
+ _store.visitConfiguredObjectRecords(_handler);
+ verify(_handler, times(1)).handle(matchesRecord(queueId, queueType, queueAttr));
_store.closeConfigurationStore();
}
@@ -201,8 +206,8 @@ public class JsonFileConfigStoreTest ext
_store.closeConfigurationStore();
_store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
- _store.recoverConfigurationStore(_recoveryHandler);
- verify(_recoveryHandler, never()).configuredObject(any(ConfiguredObjectRecord.class));
+ _store.visitConfiguredObjectRecords(_handler);
+ verify(_handler, never()).handle(any(ConfiguredObjectRecord.class));
_store.closeConfigurationStore();
}
@@ -311,12 +316,12 @@ public class JsonFileConfigStoreTest ext
_store.update(true, bindingRecord, binding2Record);
_store.closeConfigurationStore();
_store.openConfigurationStore(_virtualHost, _configurationStoreSettings);
- _store.recoverConfigurationStore(_recoveryHandler);
- verify(_recoveryHandler).configuredObject(matchesRecord(queueId, "Queue", EMPTY_ATTR));
- verify(_recoveryHandler).configuredObject(matchesRecord(queue2Id, "Queue", EMPTY_ATTR));
- verify(_recoveryHandler).configuredObject(matchesRecord(exchangeId, "Exchange", EMPTY_ATTR));
- verify(_recoveryHandler).configuredObject(matchesRecord(bindingId, "Binding", EMPTY_ATTR));
- verify(_recoveryHandler).configuredObject(matchesRecord(binding2Id, "Binding", EMPTY_ATTR));
+ _store.visitConfiguredObjectRecords(_handler);
+ verify(_handler).handle(matchesRecord(queueId, "Queue", EMPTY_ATTR));
+ verify(_handler).handle(matchesRecord(queue2Id, "Queue", EMPTY_ATTR));
+ verify(_handler).handle(matchesRecord(exchangeId, "Exchange", EMPTY_ATTR));
+ verify(_handler).handle(matchesRecord(bindingId, "Binding", EMPTY_ATTR));
+ verify(_handler).handle(matchesRecord(binding2Id, "Binding", EMPTY_ATTR));
_store.closeConfigurationStore();
}
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java?rev=1584926&r1=1584925&r2=1584926&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java Fri Apr 4 22:10:24 2014
@@ -33,7 +33,6 @@ import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.util.FileUtils;
@@ -66,12 +65,9 @@ public abstract class MessageStoreQuotaE
_store = createStore();
- MessageStoreRecoveryHandler recoveryHandler = mock(MessageStoreRecoveryHandler.class);
- when(recoveryHandler.begin()).thenReturn(mock(StoredMessageRecoveryHandler.class));
ConfiguredObject<?> parent = mock(ConfiguredObject.class);
when(parent.getName()).thenReturn("test");
_store.openMessageStore(parent, storeSettings);
- _store.recoverMessageStore(recoveryHandler, null);
_transactionResource = UUID.randomUUID();
_events = new ArrayList<Event>();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org