You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/07/22 23:08:42 UTC
svn commit: r1505820 - in /qpid/branches/0.24/qpid/java: ./
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ broker/
broker/src/main/java/org/apache/qpid/server/configuration/store/
broker/src/main/java/org/apache/qpid/server/exchange/ b...
Author: rgodfrey
Date: Mon Jul 22 21:08:41 2013
New Revision: 1505820
URL: http://svn.apache.org/r1505820
Log:
QPID-4999 : merged to 0.24 branch (1504429,1504451)
Added:
qpid/branches/0.24/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java
- copied unchanged from r1504429, qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java
Modified:
qpid/branches/0.24/qpid/java/ (props changed)
qpid/branches/0.24/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
qpid/branches/0.24/qpid/java/broker/ (props changed)
qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java
qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java
qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (contents, props changed)
qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java
qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/ (props changed)
qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
qpid/branches/0.24/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
Propchange: qpid/branches/0.24/qpid/java/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java:r1504429,1504451
Modified: qpid/branches/0.24/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.24/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1505820&r1=1505819&r2=1505820&view=diff
==============================================================================
--- qpid/branches/0.24/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java (original)
+++ qpid/branches/0.24/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java Mon Jul 22 21:08:41 2013
@@ -21,20 +21,10 @@
package org.apache.qpid.server.store.berkeleydb;
import com.sleepycat.bind.tuple.ByteBinding;
+import com.sleepycat.bind.tuple.IntegerBinding;
import com.sleepycat.bind.tuple.LongBinding;
-import com.sleepycat.je.CheckpointConfig;
-import com.sleepycat.je.Cursor;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseConfig;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.ExceptionEvent;
-import com.sleepycat.je.ExceptionListener;
-import com.sleepycat.je.LockConflictException;
-import com.sleepycat.je.LockMode;
-import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.*;
+import com.sleepycat.je.Transaction;
import java.io.File;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
@@ -85,15 +75,17 @@ public abstract class AbstractBDBMessage
private Environment _environment;
- private String CONFIGURED_OBJECTS = "CONFIGURED_OBJECTS";
- private String MESSAGEMETADATADB_NAME = "MESSAGE_METADATA";
- private String MESSAGECONTENTDB_NAME = "MESSAGE_CONTENT";
- private String DELIVERYDB_NAME = "QUEUE_ENTRIES";
- private String BRIDGEDB_NAME = "BRIDGES";
- private String LINKDB_NAME = "LINKS";
- private String XIDDB_NAME = "XIDS";
+ private static String CONFIGURED_OBJECTS = "CONFIGURED_OBJECTS";
+ private static String MESSAGEMETADATADB_NAME = "MESSAGE_METADATA";
+ private static String MESSAGECONTENTDB_NAME = "MESSAGE_CONTENT";
+ private static String DELIVERYDB_NAME = "QUEUE_ENTRIES";
+ private static String BRIDGEDB_NAME = "BRIDGES";
+ private static String LINKDB_NAME = "LINKS";
+ private static String XIDDB_NAME = "XIDS";
+ private static String CONFIG_VERSION_DB = "CONFIG_VERSION";
private Database _configuredObjectsDb;
+ private Database _configVersionDb;
private Database _messageMetaDataDb;
private Database _messageContentDb;
private Database _deliveryDb;
@@ -326,6 +318,7 @@ public abstract class AbstractBDBMessage
dbConfig.setReadOnly(false);
_configuredObjectsDb = openDatabase(CONFIGURED_OBJECTS, dbConfig);
+ _configVersionDb = openDatabase(CONFIG_VERSION_DB, dbConfig);
_messageMetaDataDb = openDatabase(MESSAGEMETADATADB_NAME, dbConfig);
_messageContentDb = openDatabase(MESSAGECONTENTDB_NAME, dbConfig);
_deliveryDb = openDatabase(DELIVERYDB_NAME, dbConfig);
@@ -399,6 +392,13 @@ public abstract class AbstractBDBMessage
_xidDb.close();
}
+
+ if (_configVersionDb != null)
+ {
+ LOGGER.info("Close config version database");
+ _configVersionDb.close();
+ }
+
closeEnvironment();
}
@@ -426,10 +426,15 @@ public abstract class AbstractBDBMessage
{
try
{
- recoveryHandler.beginConfigurationRecovery(this);
+ final int configVersion = getConfigVersion();
+ recoveryHandler.beginConfigurationRecovery(this, configVersion);
loadConfiguredObjects(recoveryHandler);
- recoveryHandler.completeConfigurationRecovery();
+ final int newConfigVersion = recoveryHandler.completeConfigurationRecovery();
+ if(newConfigVersion != configVersion)
+ {
+ updateConfigVersion(newConfigVersion);
+ }
}
catch (DatabaseException e)
{
@@ -438,6 +443,66 @@ public abstract class AbstractBDBMessage
}
+ private void updateConfigVersion(int newConfigVersion) throws AMQStoreException
+ {
+ Cursor cursor = null;
+ try
+ {
+ Transaction txn = _environment.beginTransaction(null, null);
+ cursor = _configVersionDb.openCursor(txn, null);
+ DatabaseEntry key = new DatabaseEntry();
+ ByteBinding.byteToEntry((byte) 0,key);
+ DatabaseEntry value = new DatabaseEntry();
+
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ IntegerBinding.intToEntry(newConfigVersion, value);
+ OperationStatus status = cursor.put(key, value);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new AMQStoreException("Error setting config version: " + status);
+ }
+ }
+ cursor.close();
+ cursor = null;
+ txn.commit();
+ }
+ finally
+ {
+ closeCursorSafely(cursor);
+ }
+
+ }
+
+ private int getConfigVersion() throws AMQStoreException
+ {
+ Cursor cursor = null;
+ try
+ {
+ cursor = _configVersionDb.openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+ {
+ return IntegerBinding.entryToInt(value);
+ }
+
+ // Insert 0 as the default config version
+ IntegerBinding.intToEntry(0,value);
+ ByteBinding.byteToEntry((byte) 0,key);
+ OperationStatus status = _configVersionDb.put(null, key, value);
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new AMQStoreException("Error initialising config version: " + status);
+ }
+ return 0;
+ }
+ finally
+ {
+ closeCursorSafely(cursor);
+ }
+ }
+
private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException
{
Cursor cursor = null;
@@ -750,9 +815,25 @@ public abstract class AbstractBDBMessage
}
}
+
@Override
public void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException
{
+ update(id, type, attributes, null);
+ }
+
+ public void update(ConfiguredObjectRecord... records) throws AMQStoreException
+ {
+ com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null);
+ for(ConfiguredObjectRecord record : records)
+ {
+ update(record.getId(), record.getType(), record.getAttributes(), txn);
+ }
+ txn.commit();
+ }
+
+ private void update(UUID id, String type, Map<String, Object> attributes, com.sleepycat.je.Transaction txn) throws AMQStoreException
+ {
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Updating " +type + ", id: " + id);
@@ -768,14 +849,14 @@ public abstract class AbstractBDBMessage
DatabaseEntry newValue = new DatabaseEntry();
ConfiguredObjectBinding configuredObjectBinding = ConfiguredObjectBinding.getInstance();
- OperationStatus status = _configuredObjectsDb.get(null, key, value, LockMode.DEFAULT);
+ OperationStatus status = _configuredObjectsDb.get(txn, key, value, LockMode.DEFAULT);
if (status == OperationStatus.SUCCESS)
{
ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes);
// write the updated entry to the store
configuredObjectBinding.objectToEntry(newQueueRecord, newValue);
- status = _configuredObjectsDb.put(null, key, newValue);
+ status = _configuredObjectsDb.put(txn, key, newValue);
if (status != OperationStatus.SUCCESS)
{
throw new AMQStoreException("Error updating queue details within the store: " + status);
Propchange: qpid/branches/0.24/qpid/java/broker/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/broker:r1504429,1504451
Modified: qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java?rev=1505820&r1=1505819&r2=1505820&view=diff
==============================================================================
--- qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java (original)
+++ qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java Mon Jul 22 21:08:41 2013
@@ -531,6 +531,10 @@ public class MemoryConfigurationEntrySto
if (fieldValues != null)
{
Object[] array = fieldValues.toArray(new Object[fieldValues.size()]);
+ if (attributes == null)
+ {
+ attributes = new HashMap<String, Object>();
+ }
attributes.put(fieldName, array);
}
}
Modified: qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java?rev=1505820&r1=1505819&r2=1505820&view=diff
==============================================================================
--- qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java (original)
+++ qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java Mon Jul 22 21:08:41 2013
@@ -91,12 +91,20 @@ public class FilterSupport
}
- static boolean argumentsContainFilter(final Map<String, Object> args)
+ public static boolean argumentsContainFilter(final Map<String, Object> args)
{
return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args);
}
+ public static void removeFilters(final Map<String, Object> args)
+ {
+ args.remove(AMQPFilterTypes.JMS_SELECTOR.toString());
+ args.remove(AMQPFilterTypes.NO_LOCAL.toString());
+ }
+
+
+
static boolean argumentsContainNoLocal(final Map<String, Object> args)
{
return args != null
Modified: qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1505820&r1=1505819&r2=1505820&view=diff
==============================================================================
--- qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original)
+++ qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java Mon Jul 22 21:08:41 2013
@@ -119,6 +119,7 @@ public interface VirtualHost extends Con
QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES,
QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
CONFIG_PATH));
+ int CURRENT_CONFIG_VERSION = 1;
//children
Collection<VirtualHostAlias> getAliases();
Modified: qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1505820&r1=1505819&r2=1505820&view=diff
==============================================================================
--- qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (original)
+++ qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Mon Jul 22 21:08:41 2013
@@ -55,6 +55,7 @@ import org.codehaus.jackson.map.ObjectMa
abstract public class AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore
{
private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
+ private static final String CONFIGURATION_VERSION_TABLE_NAME = "QPID_CONFIG_VERSION";
private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRIES";
@@ -68,9 +69,10 @@ abstract public class AbstractJDBCMessag
private static final String XID_ACTIONS_TABLE_NAME = "QPID_XID_ACTIONS";
private static final String CONFIGURED_OBJECTS_TABLE_NAME = "QPID_CONFIGURED_OBJECTS";
+ private static final int DEFAULT_CONFIG_VERSION = 0;
public static String[] ALL_TABLES = new String[] { DB_VERSION_TABLE_NAME, LINKS_TABLE_NAME, BRIDGES_TABLE_NAME, XID_ACTIONS_TABLE_NAME,
- XID_TABLE_NAME, QUEUE_ENTRY_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME, META_DATA_TABLE_NAME, CONFIGURED_OBJECTS_TABLE_NAME };
+ XID_TABLE_NAME, QUEUE_ENTRY_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME, META_DATA_TABLE_NAME, CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURATION_VERSION_TABLE_NAME };
private static final int DB_VERSION = 6;
@@ -80,6 +82,12 @@ abstract public class AbstractJDBCMessag
private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+ DB_VERSION_TABLE_NAME + " ( version int not null )";
private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+ DB_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )";
+ private static final String CREATE_CONFIG_VERSION_TABLE = "CREATE TABLE "+ CONFIGURATION_VERSION_TABLE_NAME + " ( version int not null )";
+ private static final String INSERT_INTO_CONFIG_VERSION = "INSERT INTO "+ CONFIGURATION_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )";
+ private static final String SELECT_FROM_CONFIG_VERSION = "SELECT version FROM " + CONFIGURATION_VERSION_TABLE_NAME;
+ private static final String UPDATE_CONFIG_VERSION = "UPDATE " + CONFIGURATION_VERSION_TABLE_NAME + " SET version = ?";
+
+
private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_id, message_id) values (?,?)";
private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_id = ? AND message_id =?";
private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_id, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_id, message_id";
@@ -223,6 +231,7 @@ abstract public class AbstractJDBCMessag
Connection conn = newAutoCommitConnection();
createVersionTable(conn);
+ createConfigVersionTable(conn);
createConfiguredObjectsTable(conn);
createQueueEntryTable(conn);
createMetaDataTable(conn);
@@ -259,7 +268,33 @@ abstract public class AbstractJDBCMessag
pstmt.close();
}
}
+ }
+ private void createConfigVersionTable(final Connection conn) throws SQLException
+ {
+ if(!tableExists(CONFIGURATION_VERSION_TABLE_NAME, conn))
+ {
+ Statement stmt = conn.createStatement();
+ try
+ {
+ stmt.execute(CREATE_CONFIG_VERSION_TABLE);
+ }
+ finally
+ {
+ stmt.close();
+ }
+
+ PreparedStatement pstmt = conn.prepareStatement(INSERT_INTO_CONFIG_VERSION);
+ try
+ {
+ pstmt.setInt(1, DEFAULT_CONFIG_VERSION);
+ pstmt.execute();
+ }
+ finally
+ {
+ pstmt.close();
+ }
+ }
}
private void createConfiguredObjectsTable(final Connection conn) throws SQLException
@@ -279,6 +314,8 @@ abstract public class AbstractJDBCMessag
}
}
+
+
private void createQueueEntryTable(final Connection conn) throws SQLException
{
if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn))
@@ -457,10 +494,10 @@ abstract public class AbstractJDBCMessag
{
try
{
- recoveryHandler.beginConfigurationRecovery(this);
+ recoveryHandler.beginConfigurationRecovery(this, getConfigVersion());
loadConfiguredObjects(recoveryHandler);
- recoveryHandler.completeConfigurationRecovery();
+ setConfigVersion(recoveryHandler.completeConfigurationRecovery());
}
catch (SQLException e)
{
@@ -468,6 +505,67 @@ abstract public class AbstractJDBCMessag
}
}
+ private void setConfigVersion(int version) throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+
+ PreparedStatement stmt = conn.prepareStatement(UPDATE_CONFIG_VERSION);
+ try
+ {
+ stmt.setInt(1, version);
+ stmt.execute();
+
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+
+ private int getConfigVersion() throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+
+ Statement stmt = conn.createStatement();
+ try
+ {
+ ResultSet rs = stmt.executeQuery(SELECT_FROM_CONFIG_VERSION);
+ try
+ {
+
+ if(rs.next())
+ {
+ return rs.getInt(1);
+ }
+ return DEFAULT_CONFIG_VERSION;
+ }
+ finally
+ {
+ rs.close();
+ }
+
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+
+ }
+
@Override
public void close() throws Exception
{
@@ -1837,52 +1935,89 @@ abstract public class AbstractJDBCMessag
Connection conn = newAutoCommitConnection();
try
{
- PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
- try
+ updateConfiguredObject(configuredObject, conn);
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ @Override
+ public void update(ConfiguredObjectRecord... records) throws AMQStoreException
+ {
+ if (_stateManager.isInState(State.ACTIVE) || _stateManager.isInState(State.ACTIVATING))
+ {
+ try
+ {
+ Connection conn = newConnection();
+ try
+ {
+ for(ConfiguredObjectRecord record : records)
{
- stmt.setString(1, configuredObject.getId().toString());
- ResultSet rs = stmt.executeQuery();
+ updateConfiguredObject(record, conn);
+ }
+ conn.commit();
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error updating configured objects in database: " + e.getMessage(), e);
+ }
+
+ }
+
+ }
+
+ private void updateConfiguredObject(ConfiguredObjectRecord configuredObject, Connection conn)
+ throws SQLException, AMQStoreException
+ {
+ PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+ try
+ {
+ stmt.setString(1, configuredObject.getId().toString());
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ if (rs.next())
+ {
+ PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
try
{
- if (rs.next())
+ stmt2.setString(1, configuredObject.getType());
+ if (configuredObject.getAttributes() != null)
{
- PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
- try
- {
- stmt2.setString(1, configuredObject.getType());
- if (configuredObject.getAttributes() != null)
- {
- byte[] attributesAsBytes = (new ObjectMapper()).writeValueAsBytes(
- configuredObject.getAttributes());
- ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
- stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
- }
- else
- {
- stmt2.setNull(2, Types.BLOB);
- }
- stmt2.setString(3, configuredObject.getId().toString());
- stmt2.execute();
- }
- finally
- {
- stmt2.close();
- }
+ byte[] attributesAsBytes = (new ObjectMapper()).writeValueAsBytes(
+ configuredObject.getAttributes());
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
+ }
+ else
+ {
+ stmt2.setNull(2, Types.BLOB);
}
+ stmt2.setString(3, configuredObject.getId().toString());
+ stmt2.execute();
}
finally
{
- rs.close();
+ stmt2.close();
}
}
- finally
- {
- stmt.close();
- }
}
finally
{
- conn.close();
+ rs.close();
}
}
catch (JsonMappingException e)
@@ -1897,11 +2032,11 @@ abstract public class AbstractJDBCMessag
{
throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
}
- catch (SQLException e)
+ finally
{
- throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ stmt.close();
}
- }
+
}
private ConfiguredObjectRecord loadConfiguredObject(final UUID id) throws AMQStoreException
Propchange: qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java:r1504429,1504451
Modified: qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java?rev=1505820&r1=1505819&r2=1505820&view=diff
==============================================================================
--- qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java (original)
+++ qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java Mon Jul 22 21:08:41 2013
@@ -28,10 +28,14 @@ import java.util.UUID;
public interface ConfigurationRecoveryHandler
{
- void beginConfigurationRecovery(DurableConfigurationStore store);
+ void beginConfigurationRecovery(DurableConfigurationStore store, int configVersion);
void configuredObject(UUID id, String type, Map<String, Object> attributes);
- void completeConfigurationRecovery();
+ /**
+ *
+ * @return the model version of the configuration
+ */
+ int completeConfigurationRecovery();
}
Modified: qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java?rev=1505820&r1=1505819&r2=1505820&view=diff
==============================================================================
--- qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java (original)
+++ qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java Mon Jul 22 21:08:41 2013
@@ -60,4 +60,29 @@ public class ConfiguredObjectRecord
return "ConfiguredObjectRecord [id=" + _id + ", type=" + _type + ", attributes=" + _attributes + "]";
}
+ @Override
+ public boolean equals(Object o)
+ {
+ if(this == o)
+ {
+ return true;
+ }
+ if(o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ ConfiguredObjectRecord that = (ConfiguredObjectRecord) o;
+
+ return _type.equals(that._type) && _id.equals(that._id) && _attributes.equals(that._attributes);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = _id.hashCode();
+ result = 31 * result + _type.hashCode();
+ result = 31 * result + _attributes.hashCode();
+ return result;
+ }
}
Modified: qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java?rev=1505820&r1=1505819&r2=1505820&view=diff
==============================================================================
--- qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java (original)
+++ qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java Mon Jul 22 21:08:41 2013
@@ -91,4 +91,7 @@ public interface DurableConfigurationSto
void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException;
+ public void update(ConfiguredObjectRecord... records) throws AMQStoreException;
+
+
}
Modified: qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java?rev=1505820&r1=1505819&r2=1505820&view=diff
==============================================================================
--- qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java Mon Jul 22 21:08:41 2013
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.store;
-import org.apache.commons.configuration.Configuration;
-
/**
* MessageStore defines the interface to a storage area, which can be used to preserve the state of messages.
*
Modified: qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java?rev=1505820&r1=1505819&r2=1505820&view=diff
==============================================================================
--- qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java (original)
+++ qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java Mon Jul 22 21:08:41 2013
@@ -21,6 +21,7 @@ package org.apache.qpid.server.store;
import java.util.Map;
import java.util.UUID;
+import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.model.VirtualHost;
public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore
@@ -38,6 +39,11 @@ public abstract class NullMessageStore i
}
@Override
+ public void update(ConfiguredObjectRecord... records) throws AMQStoreException
+ {
+ }
+
+ @Override
public void remove(UUID id, String type)
{
}
Propchange: qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:r1504429,1504451
Modified: qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=1505820&r1=1505819&r2=1505820&view=diff
==============================================================================
--- qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java (original)
+++ qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java Mon Jul 22 21:08:41 2013
@@ -24,6 +24,7 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
@@ -35,6 +36,8 @@ import org.apache.qpid.framing.FieldTabl
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.FilterSupport;
+import org.apache.qpid.server.exchange.TopicExchange;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
@@ -49,6 +52,7 @@ import org.apache.qpid.server.queue.AMQQ
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
@@ -63,6 +67,8 @@ import org.apache.qpid.transport.Xid;
import org.apache.qpid.transport.util.Functions;
import org.apache.qpid.util.ByteBufferInputStream;
+import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION;
+
public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHandler,
MessageStoreRecoveryHandler,
MessageStoreRecoveryHandler.StoredMessageRecoveryHandler,
@@ -85,6 +91,8 @@ public class VirtualHostConfigRecoveryHa
private MessageStoreLogSubject _logSubject;
private MessageStore _store;
+ private int _currentConfigVersion;
+ private DurableConfigurationStore _configStore;
public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost,
ExchangeRegistry exchangeRegistry,
@@ -96,10 +104,11 @@ public class VirtualHostConfigRecoveryHa
}
@Override
- public void beginConfigurationRecovery(DurableConfigurationStore store)
+ public void beginConfigurationRecovery(DurableConfigurationStore store, int configVersion)
{
_logSubject = new MessageStoreLogSubject(_virtualHost,store.getClass().getSimpleName());
-
+ _configStore = store;
+ _currentConfigVersion = configVersion;
CurrentActor.get().message(_logSubject, ConfigStoreMessages.RECOVERY_START());
}
@@ -482,8 +491,20 @@ public class VirtualHostConfigRecoveryHa
}
@Override
- public void completeConfigurationRecovery()
+ public int completeConfigurationRecovery()
{
+ if(CURRENT_CONFIG_VERSION !=_currentConfigVersion)
+ {
+ try
+ {
+ upgrade();
+ }
+ catch (AMQStoreException e)
+ {
+ throw new IllegalArgumentException("Unable to upgrade configuration from version " + _currentConfigVersion + " to version " + CURRENT_CONFIG_VERSION);
+ }
+ }
+
Map<UUID, Map<String, Object>> exchangeObjects =
_configuredObjects.remove(org.apache.qpid.server.model.Exchange.class.getName());
@@ -511,6 +532,88 @@ public class VirtualHostConfigRecoveryHa
CurrentActor.get().message(_logSubject, ConfigStoreMessages.RECOVERY_COMPLETE());
+
+ return CURRENT_CONFIG_VERSION;
+ }
+
+ private void upgrade() throws AMQStoreException
+ {
+
+ Map<UUID, String> updates = new HashMap<UUID, String>();
+
+ final String bindingType = Binding.class.getName();
+
+ switch(_currentConfigVersion)
+ {
+ case 0:
+ Map<UUID, Map<String, Object>> bindingObjects =
+ _configuredObjects.get(bindingType);
+ if(bindingObjects != null)
+ {
+ for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingObjects.entrySet())
+ {
+ Map<String, Object> binding = bindingEntry.getValue();
+
+ if(hasSelectorArguments(binding) && !isTopicExchange(binding))
+ {
+ binding = new LinkedHashMap<String, Object>(binding);
+ removeSelectorArguments(binding);
+ bindingEntry.setValue(binding);
+
+ updates.put(bindingEntry.getKey(), bindingType);
+ }
+ }
+ }
+ case CURRENT_CONFIG_VERSION:
+ if(!updates.isEmpty())
+ {
+ ConfiguredObjectRecord[] updateRecords = new ConfiguredObjectRecord[updates.size()];
+ int i = 0;
+ for(Map.Entry<UUID, String> update : updates.entrySet())
+ {
+ updateRecords[i++] = new ConfiguredObjectRecord(update.getKey(), update.getValue(), _configuredObjects.get(update.getValue()).get(update.getKey()));
+ }
+ _configStore.update(updateRecords);
+ }
+ break;
+ default:
+ throw new IllegalStateException("Unknown configuration model version: " + _currentConfigVersion + ". Are you attempting to run an older instance against an upgraded configuration?");
+ }
+ }
+
+ private void removeSelectorArguments(Map<String, Object> binding)
+ {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> arguments = new LinkedHashMap<String, Object>((Map<String,Object>)binding.get(Binding.ARGUMENTS));
+
+ FilterSupport.removeFilters(arguments);
+ binding.put(Binding.ARGUMENTS, arguments);
+ }
+
+ private boolean isTopicExchange(Map<String, Object> binding)
+ {
+ UUID exchangeId = UUID.fromString((String)binding.get(Binding.EXCHANGE));
+ final
+ Map<UUID, Map<String, Object>> exchanges =
+ _configuredObjects.get(org.apache.qpid.server.model.Exchange.class.getName());
+
+ if(exchanges != null && exchanges.containsKey(exchangeId))
+ {
+ return "topic".equals(exchanges.get(exchangeId).get(org.apache.qpid.server.model.Exchange.TYPE));
+ }
+ else
+ {
+ return _exchangeRegistry.getExchange(exchangeId) != null
+ && _exchangeRegistry.getExchange(exchangeId).getType() == TopicExchange.TYPE;
+ }
+
+ }
+
+ private boolean hasSelectorArguments(Map<String, Object> binding)
+ {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> arguments = (Map<String, Object>) binding.get(Binding.ARGUMENTS);
+ return (arguments != null) && FilterSupport.argumentsContainFilter(arguments);
}
private void recoverExchanges(Map<UUID, Map<String, Object>> exchangeObjects)
Modified: qpid/branches/0.24/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.24/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java?rev=1505820&r1=1505819&r2=1505820&view=diff
==============================================================================
--- qpid/branches/0.24/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java (original)
+++ qpid/branches/0.24/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java Mon Jul 22 21:08:41 2013
@@ -197,6 +197,14 @@ public class SlowMessageStore implements
doPostDelay("update");
}
+ @Override
+ public void update(ConfiguredObjectRecord... records) throws AMQStoreException
+ {
+ doPreDelay("update");
+ _durableConfigurationStore.update(records);
+ doPostDelay("update");
+ }
+
public Transaction newTransaction()
{
doPreDelay("beginTran");
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org