You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/07/22 23:08:42 UTC

svn commit: r1505820 - in /qpid/branches/0.24/qpid/java: ./ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ broker/ broker/src/main/java/org/apache/qpid/server/configuration/store/ broker/src/main/java/org/apache/qpid/server/exchange/ b...

Author: rgodfrey
Date: Mon Jul 22 21:08:41 2013
New Revision: 1505820

URL: http://svn.apache.org/r1505820
Log:
QPID-4999 : merged to 0.24 branch (1504429,1504451)

Added:
    qpid/branches/0.24/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java
      - copied unchanged from r1504429, qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandlerTest.java
Modified:
    qpid/branches/0.24/qpid/java/   (props changed)
    qpid/branches/0.24/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
    qpid/branches/0.24/qpid/java/broker/   (props changed)
    qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
    qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java
    qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java
    qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java   (contents, props changed)
    qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
    qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java
    qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
    qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
    qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
    qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/   (props changed)
    qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
    qpid/branches/0.24/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java

Propchange: qpid/branches/0.24/qpid/java/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java:r1504429,1504451

Modified: qpid/branches/0.24/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.24/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1505820&r1=1505819&r2=1505820&view=diff
==============================================================================
--- qpid/branches/0.24/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java (original)
+++ qpid/branches/0.24/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java Mon Jul 22 21:08:41 2013
@@ -21,20 +21,10 @@
 package org.apache.qpid.server.store.berkeleydb;
 
 import com.sleepycat.bind.tuple.ByteBinding;
+import com.sleepycat.bind.tuple.IntegerBinding;
 import com.sleepycat.bind.tuple.LongBinding;
-import com.sleepycat.je.CheckpointConfig;
-import com.sleepycat.je.Cursor;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseConfig;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.ExceptionEvent;
-import com.sleepycat.je.ExceptionListener;
-import com.sleepycat.je.LockConflictException;
-import com.sleepycat.je.LockMode;
-import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.*;
+import com.sleepycat.je.Transaction;
 import java.io.File;
 import java.lang.ref.SoftReference;
 import java.nio.ByteBuffer;
@@ -85,15 +75,17 @@ public abstract class AbstractBDBMessage
 
     private Environment _environment;
 
-    private String CONFIGURED_OBJECTS = "CONFIGURED_OBJECTS";
-    private String MESSAGEMETADATADB_NAME = "MESSAGE_METADATA";
-    private String MESSAGECONTENTDB_NAME = "MESSAGE_CONTENT";
-    private String DELIVERYDB_NAME = "QUEUE_ENTRIES";
-    private String BRIDGEDB_NAME = "BRIDGES";
-    private String LINKDB_NAME = "LINKS";
-    private String XIDDB_NAME = "XIDS";
+    private static String CONFIGURED_OBJECTS = "CONFIGURED_OBJECTS";
+    private static String MESSAGEMETADATADB_NAME = "MESSAGE_METADATA";
+    private static String MESSAGECONTENTDB_NAME = "MESSAGE_CONTENT";
+    private static String DELIVERYDB_NAME = "QUEUE_ENTRIES";
+    private static String BRIDGEDB_NAME = "BRIDGES";
+    private static String LINKDB_NAME = "LINKS";
+    private static String XIDDB_NAME = "XIDS";
+    private static String CONFIG_VERSION_DB = "CONFIG_VERSION";
 
     private Database _configuredObjectsDb;
+    private Database _configVersionDb;
     private Database _messageMetaDataDb;
     private Database _messageContentDb;
     private Database _deliveryDb;
@@ -326,6 +318,7 @@ public abstract class AbstractBDBMessage
         dbConfig.setReadOnly(false);
 
         _configuredObjectsDb = openDatabase(CONFIGURED_OBJECTS, dbConfig);
+        _configVersionDb = openDatabase(CONFIG_VERSION_DB, dbConfig);
         _messageMetaDataDb = openDatabase(MESSAGEMETADATADB_NAME, dbConfig);
         _messageContentDb = openDatabase(MESSAGECONTENTDB_NAME, dbConfig);
         _deliveryDb = openDatabase(DELIVERYDB_NAME, dbConfig);
@@ -399,6 +392,13 @@ public abstract class AbstractBDBMessage
             _xidDb.close();
         }
 
+
+        if (_configVersionDb != null)
+        {
+            LOGGER.info("Close config version database");
+            _configVersionDb.close();
+        }
+
         closeEnvironment();
 
     }
@@ -426,10 +426,15 @@ public abstract class AbstractBDBMessage
     {
         try
         {
-            recoveryHandler.beginConfigurationRecovery(this);
+            final int configVersion = getConfigVersion();
+            recoveryHandler.beginConfigurationRecovery(this, configVersion);
             loadConfiguredObjects(recoveryHandler);
 
-            recoveryHandler.completeConfigurationRecovery();
+            final int newConfigVersion = recoveryHandler.completeConfigurationRecovery();
+            if(newConfigVersion != configVersion)
+            {
+                updateConfigVersion(newConfigVersion);
+            }
         }
         catch (DatabaseException e)
         {
@@ -438,6 +443,66 @@ public abstract class AbstractBDBMessage
 
     }
 
+    private void updateConfigVersion(int newConfigVersion) throws AMQStoreException
+    {
+        Cursor cursor = null;
+        try
+        {
+            Transaction txn = _environment.beginTransaction(null, null);
+            cursor = _configVersionDb.openCursor(txn, null);
+            DatabaseEntry key = new DatabaseEntry();
+            ByteBinding.byteToEntry((byte) 0,key);
+            DatabaseEntry value = new DatabaseEntry();
+
+            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+            {
+                IntegerBinding.intToEntry(newConfigVersion, value);
+                OperationStatus status = cursor.put(key, value);
+                if (status != OperationStatus.SUCCESS)
+                {
+                    throw new AMQStoreException("Error setting config version: " + status);
+                }
+            }
+            cursor.close();
+            cursor = null;
+            txn.commit();
+        }
+        finally
+        {
+            closeCursorSafely(cursor);
+        }
+
+    }
+
+    private int getConfigVersion() throws AMQStoreException
+    {
+        Cursor cursor = null;
+        try
+        {
+            cursor = _configVersionDb.openCursor(null, null);
+            DatabaseEntry key = new DatabaseEntry();
+            DatabaseEntry value = new DatabaseEntry();
+            while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
+            {
+                return IntegerBinding.entryToInt(value);
+            }
+
+            // Insert 0 as the default config version
+            IntegerBinding.intToEntry(0,value);
+            ByteBinding.byteToEntry((byte) 0,key);
+            OperationStatus status = _configVersionDb.put(null, key, value);
+            if (status != OperationStatus.SUCCESS)
+            {
+                throw new AMQStoreException("Error initialising config version: " + status);
+            }
+            return 0;
+        }
+        finally
+        {
+            closeCursorSafely(cursor);
+        }
+    }
+
     private void loadConfiguredObjects(ConfigurationRecoveryHandler crh) throws DatabaseException
     {
         Cursor cursor = null;
@@ -750,9 +815,25 @@ public abstract class AbstractBDBMessage
         }
     }
 
+
     @Override
     public void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException
     {
+        update(id, type, attributes, null);
+    }
+
+    public void update(ConfiguredObjectRecord... records) throws AMQStoreException
+    {
+        com.sleepycat.je.Transaction txn = _environment.beginTransaction(null, null);
+        for(ConfiguredObjectRecord record : records)
+        {
+            update(record.getId(), record.getType(), record.getAttributes(), txn);
+        }
+        txn.commit();
+    }
+
+    private void update(UUID id, String type, Map<String, Object> attributes, com.sleepycat.je.Transaction txn) throws AMQStoreException
+    {
         if (LOGGER.isDebugEnabled())
         {
             LOGGER.debug("Updating " +type + ", id: " + id);
@@ -768,14 +849,14 @@ public abstract class AbstractBDBMessage
             DatabaseEntry newValue = new DatabaseEntry();
             ConfiguredObjectBinding configuredObjectBinding = ConfiguredObjectBinding.getInstance();
 
-            OperationStatus status = _configuredObjectsDb.get(null, key, value, LockMode.DEFAULT);
+            OperationStatus status = _configuredObjectsDb.get(txn, key, value, LockMode.DEFAULT);
             if (status == OperationStatus.SUCCESS)
             {
                 ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(id, type, attributes);
 
                 // write the updated entry to the store
                 configuredObjectBinding.objectToEntry(newQueueRecord, newValue);
-                status = _configuredObjectsDb.put(null, key, newValue);
+                status = _configuredObjectsDb.put(txn, key, newValue);
                 if (status != OperationStatus.SUCCESS)
                 {
                     throw new AMQStoreException("Error updating queue details within the store: " + status);

Propchange: qpid/branches/0.24/qpid/java/broker/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker:r1504429,1504451

Modified: qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java?rev=1505820&r1=1505819&r2=1505820&view=diff
==============================================================================
--- qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java (original)
+++ qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStore.java Mon Jul 22 21:08:41 2013
@@ -531,6 +531,10 @@ public class MemoryConfigurationEntrySto
                 if (fieldValues != null)
                 {
                     Object[] array = fieldValues.toArray(new Object[fieldValues.size()]);
+                    if (attributes == null)
+                    {
+                        attributes = new HashMap<String, Object>();
+                    }
                     attributes.put(fieldName, array);
                 }
             }

Modified: qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java?rev=1505820&r1=1505819&r2=1505820&view=diff
==============================================================================
--- qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java (original)
+++ qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java Mon Jul 22 21:08:41 2013
@@ -91,12 +91,20 @@ public class FilterSupport
     }
 
 
-    static boolean argumentsContainFilter(final Map<String, Object> args)
+    public static boolean argumentsContainFilter(final Map<String, Object> args)
     {
         return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args);
     }
 
 
+    public static void removeFilters(final Map<String, Object> args)
+    {
+        args.remove(AMQPFilterTypes.JMS_SELECTOR.toString());
+        args.remove(AMQPFilterTypes.NO_LOCAL.toString());
+    }
+
+
+
     static boolean argumentsContainNoLocal(final Map<String, Object> args)
     {
         return args != null

Modified: qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1505820&r1=1505819&r2=1505820&view=diff
==============================================================================
--- qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original)
+++ qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/model/VirtualHost.java Mon Jul 22 21:08:41 2013
@@ -119,6 +119,7 @@ public interface VirtualHost extends Con
                             QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_BYTES,
                             QUEUE_ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
                             CONFIG_PATH));
+    int CURRENT_CONFIG_VERSION = 1;
 
     //children
     Collection<VirtualHostAlias> getAliases();

Modified: qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1505820&r1=1505819&r2=1505820&view=diff
==============================================================================
--- qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (original)
+++ qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Mon Jul 22 21:08:41 2013
@@ -55,6 +55,7 @@ import org.codehaus.jackson.map.ObjectMa
 abstract public class AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore
 {
     private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
+    private static final String CONFIGURATION_VERSION_TABLE_NAME = "QPID_CONFIG_VERSION";
 
     private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRIES";
 
@@ -68,9 +69,10 @@ abstract public class AbstractJDBCMessag
     private static final String XID_ACTIONS_TABLE_NAME = "QPID_XID_ACTIONS";
 
     private static final String CONFIGURED_OBJECTS_TABLE_NAME = "QPID_CONFIGURED_OBJECTS";
+    private static final int DEFAULT_CONFIG_VERSION = 0;
 
     public static String[] ALL_TABLES = new String[] { DB_VERSION_TABLE_NAME, LINKS_TABLE_NAME, BRIDGES_TABLE_NAME, XID_ACTIONS_TABLE_NAME,
-        XID_TABLE_NAME, QUEUE_ENTRY_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME, META_DATA_TABLE_NAME, CONFIGURED_OBJECTS_TABLE_NAME };
+        XID_TABLE_NAME, QUEUE_ENTRY_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME, META_DATA_TABLE_NAME, CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURATION_VERSION_TABLE_NAME };
 
     private static final int DB_VERSION = 6;
 
@@ -80,6 +82,12 @@ abstract public class AbstractJDBCMessag
     private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+ DB_VERSION_TABLE_NAME + " ( version int not null )";
     private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+ DB_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )";
 
+    private static final String CREATE_CONFIG_VERSION_TABLE = "CREATE TABLE "+ CONFIGURATION_VERSION_TABLE_NAME + " ( version int not null )";
+    private static final String INSERT_INTO_CONFIG_VERSION = "INSERT INTO "+ CONFIGURATION_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )";
+    private static final String SELECT_FROM_CONFIG_VERSION = "SELECT version FROM " + CONFIGURATION_VERSION_TABLE_NAME;
+    private static final String UPDATE_CONFIG_VERSION = "UPDATE " + CONFIGURATION_VERSION_TABLE_NAME + " SET version = ?";
+
+
     private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_id, message_id) values (?,?)";
     private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_id = ? AND message_id =?";
     private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_id, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_id, message_id";
@@ -223,6 +231,7 @@ abstract public class AbstractJDBCMessag
         Connection conn = newAutoCommitConnection();
 
         createVersionTable(conn);
+        createConfigVersionTable(conn);
         createConfiguredObjectsTable(conn);
         createQueueEntryTable(conn);
         createMetaDataTable(conn);
@@ -259,7 +268,33 @@ abstract public class AbstractJDBCMessag
                 pstmt.close();
             }
         }
+    }
 
+    private void createConfigVersionTable(final Connection conn) throws SQLException
+    {
+        if(!tableExists(CONFIGURATION_VERSION_TABLE_NAME, conn))
+        {
+            Statement stmt = conn.createStatement();
+            try
+            {
+                stmt.execute(CREATE_CONFIG_VERSION_TABLE);
+            }
+            finally
+            {
+                stmt.close();
+            }
+
+            PreparedStatement pstmt = conn.prepareStatement(INSERT_INTO_CONFIG_VERSION);
+            try
+            {
+                pstmt.setInt(1, DEFAULT_CONFIG_VERSION);
+                pstmt.execute();
+            }
+            finally
+            {
+                pstmt.close();
+            }
+        }
     }
 
     private void createConfiguredObjectsTable(final Connection conn) throws SQLException
@@ -279,6 +314,8 @@ abstract public class AbstractJDBCMessag
         }
     }
 
+
+
     private void createQueueEntryTable(final Connection conn) throws SQLException
     {
         if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn))
@@ -457,10 +494,10 @@ abstract public class AbstractJDBCMessag
     {
         try
         {
-            recoveryHandler.beginConfigurationRecovery(this);
+            recoveryHandler.beginConfigurationRecovery(this, getConfigVersion());
             loadConfiguredObjects(recoveryHandler);
 
-            recoveryHandler.completeConfigurationRecovery();
+            setConfigVersion(recoveryHandler.completeConfigurationRecovery());
         }
         catch (SQLException e)
         {
@@ -468,6 +505,67 @@ abstract public class AbstractJDBCMessag
         }
     }
 
+    private void setConfigVersion(int version) throws SQLException
+    {
+        Connection conn = newAutoCommitConnection();
+        try
+        {
+
+            PreparedStatement stmt = conn.prepareStatement(UPDATE_CONFIG_VERSION);
+            try
+            {
+                stmt.setInt(1, version);
+                stmt.execute();
+
+            }
+            finally
+            {
+                stmt.close();
+            }
+        }
+        finally
+        {
+            conn.close();
+        }
+    }
+
+    private int getConfigVersion() throws SQLException
+    {
+        Connection conn = newAutoCommitConnection();
+        try
+        {
+
+            Statement stmt = conn.createStatement();
+            try
+            {
+                ResultSet rs = stmt.executeQuery(SELECT_FROM_CONFIG_VERSION);
+                try
+                {
+
+                    if(rs.next())
+                    {
+                        return rs.getInt(1);
+                    }
+                    return DEFAULT_CONFIG_VERSION;
+                }
+                finally
+                {
+                    rs.close();
+                }
+
+            }
+            finally
+            {
+                stmt.close();
+            }
+        }
+        finally
+        {
+            conn.close();
+        }
+
+    }
+
     @Override
     public void close() throws Exception
     {
@@ -1837,52 +1935,89 @@ abstract public class AbstractJDBCMessag
                 Connection conn = newAutoCommitConnection();
                 try
                 {
-                    PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
-                    try
+                    updateConfiguredObject(configuredObject, conn);
+                }
+                finally
+                {
+                    conn.close();
+                }
+            }
+            catch (SQLException e)
+            {
+                throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+            }
+        }
+    }
+
+    @Override
+    public void update(ConfiguredObjectRecord... records) throws AMQStoreException
+    {
+        if (_stateManager.isInState(State.ACTIVE) || _stateManager.isInState(State.ACTIVATING))
+        {
+            try
+            {
+                Connection conn = newConnection();
+                try
+                {
+                    for(ConfiguredObjectRecord record : records)
                     {
-                        stmt.setString(1, configuredObject.getId().toString());
-                        ResultSet rs = stmt.executeQuery();
+                        updateConfiguredObject(record, conn);
+                    }
+                    conn.commit();
+                }
+                finally
+                {
+                    conn.close();
+                }
+            }
+            catch (SQLException e)
+            {
+                throw new AMQStoreException("Error updating configured objects in database: " + e.getMessage(), e);
+            }
+
+        }
+
+    }
+
+    private void updateConfiguredObject(ConfiguredObjectRecord configuredObject, Connection conn)
+            throws SQLException, AMQStoreException
+    {
+            PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+            try
+            {
+                stmt.setString(1, configuredObject.getId().toString());
+                ResultSet rs = stmt.executeQuery();
+                try
+                {
+                    if (rs.next())
+                    {
+                        PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
                         try
                         {
-                            if (rs.next())
+                            stmt2.setString(1, configuredObject.getType());
+                            if (configuredObject.getAttributes() != null)
                             {
-                                PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
-                                try
-                                {
-                                    stmt2.setString(1, configuredObject.getType());
-                                    if (configuredObject.getAttributes() != null)
-                                    {
-                                        byte[] attributesAsBytes = (new ObjectMapper()).writeValueAsBytes(
-                                                configuredObject.getAttributes());
-                                        ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
-                                        stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
-                                    }
-                                    else
-                                    {
-                                        stmt2.setNull(2, Types.BLOB);
-                                    }
-                                    stmt2.setString(3, configuredObject.getId().toString());
-                                    stmt2.execute();
-                                }
-                                finally
-                                {
-                                    stmt2.close();
-                                }
+                                byte[] attributesAsBytes = (new ObjectMapper()).writeValueAsBytes(
+                                        configuredObject.getAttributes());
+                                ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+                                stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
+                            }
+                            else
+                            {
+                                stmt2.setNull(2, Types.BLOB);
                             }
+                            stmt2.setString(3, configuredObject.getId().toString());
+                            stmt2.execute();
                         }
                         finally
                         {
-                            rs.close();
+                            stmt2.close();
                         }
                     }
-                    finally
-                    {
-                        stmt.close();
-                    }
                 }
                 finally
                 {
-                    conn.close();
+                    rs.close();
                 }
             }
             catch (JsonMappingException e)
@@ -1897,11 +2032,11 @@ abstract public class AbstractJDBCMessag
             {
                 throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
             }
-            catch (SQLException e)
+            finally
             {
-                throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+                stmt.close();
             }
-        }
+
     }
 
     private ConfiguredObjectRecord loadConfiguredObject(final UUID id) throws AMQStoreException

Propchange: qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java:r1504429,1504451

Modified: qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java?rev=1505820&r1=1505819&r2=1505820&view=diff
==============================================================================
--- qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java (original)
+++ qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java Mon Jul 22 21:08:41 2013
@@ -28,10 +28,14 @@ import java.util.UUID;
 
 public interface ConfigurationRecoveryHandler
 {
-    void beginConfigurationRecovery(DurableConfigurationStore store);
+    void beginConfigurationRecovery(DurableConfigurationStore store, int configVersion);
 
     void configuredObject(UUID id, String type, Map<String, Object> attributes);
 
-    void completeConfigurationRecovery();
+    /**
+     *
+     * @return the model version of the configuration
+     */
+    int completeConfigurationRecovery();
 
 }

Modified: qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java?rev=1505820&r1=1505819&r2=1505820&view=diff
==============================================================================
--- qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java (original)
+++ qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java Mon Jul 22 21:08:41 2013
@@ -60,4 +60,29 @@ public class ConfiguredObjectRecord
         return "ConfiguredObjectRecord [id=" + _id + ", type=" + _type + ", attributes=" + _attributes + "]";
     }
 
+    @Override
+    public boolean equals(Object o)
+    {
+        if(this == o)
+        {
+            return true;
+        }
+        if(o == null || getClass() != o.getClass())
+        {
+            return false;
+        }
+
+        ConfiguredObjectRecord that = (ConfiguredObjectRecord) o;
+
+        return _type.equals(that._type) && _id.equals(that._id) && _attributes.equals(that._attributes);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        int result = _id.hashCode();
+        result = 31 * result + _type.hashCode();
+        result = 31 * result + _attributes.hashCode();
+        return result;
+    }
 }

Modified: qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java?rev=1505820&r1=1505819&r2=1505820&view=diff
==============================================================================
--- qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java (original)
+++ qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java Mon Jul 22 21:08:41 2013
@@ -91,4 +91,7 @@ public interface DurableConfigurationSto
     void update(UUID id, String type, Map<String, Object> attributes) throws AMQStoreException;
 
 
+    public void update(ConfiguredObjectRecord... records) throws AMQStoreException;
+
+
 }

Modified: qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java?rev=1505820&r1=1505819&r2=1505820&view=diff
==============================================================================
--- qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java Mon Jul 22 21:08:41 2013
@@ -20,8 +20,6 @@
  */
 package org.apache.qpid.server.store;
 
-import org.apache.commons.configuration.Configuration;
-
 /**
  * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages.
  *

Modified: qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java?rev=1505820&r1=1505819&r2=1505820&view=diff
==============================================================================
--- qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java (original)
+++ qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java Mon Jul 22 21:08:41 2013
@@ -21,6 +21,7 @@ package org.apache.qpid.server.store;
 
 import java.util.Map;
 import java.util.UUID;
+import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.server.model.VirtualHost;
 
 public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore
@@ -38,6 +39,11 @@ public abstract class NullMessageStore i
     }
 
     @Override
+    public void update(ConfiguredObjectRecord... records) throws AMQStoreException
+    {
+    }
+
+    @Override
     public void remove(UUID id, String type)
     {
     }

Propchange: qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:r1504429,1504451

Modified: qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=1505820&r1=1505819&r2=1505820&view=diff
==============================================================================
--- qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java (original)
+++ qpid/branches/0.24/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java Mon Jul 22 21:08:41 2013
@@ -24,6 +24,7 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
@@ -35,6 +36,8 @@ import org.apache.qpid.framing.FieldTabl
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeFactory;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.FilterSupport;
+import org.apache.qpid.server.exchange.TopicExchange;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
 import org.apache.qpid.server.logging.messages.TransactionLogMessages;
@@ -49,6 +52,7 @@ import org.apache.qpid.server.queue.AMQQ
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
@@ -63,6 +67,8 @@ import org.apache.qpid.transport.Xid;
 import org.apache.qpid.transport.util.Functions;
 import org.apache.qpid.util.ByteBufferInputStream;
 
+import static org.apache.qpid.server.model.VirtualHost.CURRENT_CONFIG_VERSION;
+
 public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHandler,
                                                         MessageStoreRecoveryHandler,
                                                         MessageStoreRecoveryHandler.StoredMessageRecoveryHandler,
@@ -85,6 +91,8 @@ public class VirtualHostConfigRecoveryHa
 
     private MessageStoreLogSubject _logSubject;
     private MessageStore _store;
+    private int _currentConfigVersion;
+    private DurableConfigurationStore _configStore;
 
     public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost,
                                             ExchangeRegistry exchangeRegistry,
@@ -96,10 +104,11 @@ public class VirtualHostConfigRecoveryHa
     }
 
     @Override
-    public void beginConfigurationRecovery(DurableConfigurationStore store)
+    public void beginConfigurationRecovery(DurableConfigurationStore store, int configVersion)
     {
         _logSubject = new MessageStoreLogSubject(_virtualHost,store.getClass().getSimpleName());
-
+        _configStore = store;
+        _currentConfigVersion = configVersion;
         CurrentActor.get().message(_logSubject, ConfigStoreMessages.RECOVERY_START());
     }
 
@@ -482,8 +491,20 @@ public class VirtualHostConfigRecoveryHa
     }
 
     @Override
-    public void completeConfigurationRecovery()
+    public int completeConfigurationRecovery()
     {
+        if(CURRENT_CONFIG_VERSION !=_currentConfigVersion)
+        {
+            try
+            {
+                upgrade();
+            }
+            catch (AMQStoreException e)
+            {
+                throw new IllegalArgumentException("Unable to upgrade configuration from version " + _currentConfigVersion + " to version " + CURRENT_CONFIG_VERSION);
+            }
+        }
+
         Map<UUID, Map<String, Object>> exchangeObjects =
                 _configuredObjects.remove(org.apache.qpid.server.model.Exchange.class.getName());
 
@@ -511,6 +532,88 @@ public class VirtualHostConfigRecoveryHa
 
 
         CurrentActor.get().message(_logSubject, ConfigStoreMessages.RECOVERY_COMPLETE());
+
+        return CURRENT_CONFIG_VERSION;
+    }
+
+    private void upgrade() throws AMQStoreException
+    {
+
+        Map<UUID, String> updates = new HashMap<UUID, String>();
+
+        final String bindingType = Binding.class.getName();
+
+        switch(_currentConfigVersion)
+        {
+            case 0:
+                Map<UUID, Map<String, Object>> bindingObjects =
+                                    _configuredObjects.get(bindingType);
+                if(bindingObjects != null)
+                {
+                    for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingObjects.entrySet())
+                    {
+                        Map<String, Object> binding = bindingEntry.getValue();
+
+                        if(hasSelectorArguments(binding) && !isTopicExchange(binding))
+                        {
+                            binding = new LinkedHashMap<String, Object>(binding);
+                            removeSelectorArguments(binding);
+                            bindingEntry.setValue(binding);
+
+                            updates.put(bindingEntry.getKey(), bindingType);
+                        }
+                    }
+                }
+            case CURRENT_CONFIG_VERSION:
+                if(!updates.isEmpty())
+                {
+                    ConfiguredObjectRecord[] updateRecords = new ConfiguredObjectRecord[updates.size()];
+                    int i = 0;
+                    for(Map.Entry<UUID, String> update : updates.entrySet())
+                    {
+                        updateRecords[i++] = new ConfiguredObjectRecord(update.getKey(), update.getValue(), _configuredObjects.get(update.getValue()).get(update.getKey()));
+                    }
+                    _configStore.update(updateRecords);
+                }
+                break;
+            default:
+                throw new IllegalStateException("Unknown configuration model version: " + _currentConfigVersion + ". Are you attempting to run an older instance against an upgraded configuration?");
+        }
+    }
+
+    private void removeSelectorArguments(Map<String, Object> binding)
+    {
+        @SuppressWarnings("unchecked")
+        Map<String, Object> arguments = new LinkedHashMap<String, Object>((Map<String,Object>)binding.get(Binding.ARGUMENTS));
+
+        FilterSupport.removeFilters(arguments);
+        binding.put(Binding.ARGUMENTS, arguments);
+    }
+
+    private boolean isTopicExchange(Map<String, Object> binding)
+    {
+        UUID exchangeId = UUID.fromString((String)binding.get(Binding.EXCHANGE));
+        final
+        Map<UUID, Map<String, Object>> exchanges =
+                _configuredObjects.get(org.apache.qpid.server.model.Exchange.class.getName());
+
+        if(exchanges != null && exchanges.containsKey(exchangeId))
+        {
+            return "topic".equals(exchanges.get(exchangeId).get(org.apache.qpid.server.model.Exchange.TYPE));
+        }
+        else
+        {
+            return _exchangeRegistry.getExchange(exchangeId) != null
+                   && _exchangeRegistry.getExchange(exchangeId).getType() == TopicExchange.TYPE;
+        }
+
+    }
+
+    private boolean hasSelectorArguments(Map<String, Object> binding)
+    {
+        @SuppressWarnings("unchecked")
+        Map<String, Object> arguments = (Map<String, Object>) binding.get(Binding.ARGUMENTS);
+        return (arguments != null) && FilterSupport.argumentsContainFilter(arguments);
     }
 
     private void recoverExchanges(Map<UUID, Map<String, Object>> exchangeObjects)

Modified: qpid/branches/0.24/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.24/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java?rev=1505820&r1=1505819&r2=1505820&view=diff
==============================================================================
--- qpid/branches/0.24/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java (original)
+++ qpid/branches/0.24/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java Mon Jul 22 21:08:41 2013
@@ -197,6 +197,14 @@ public class SlowMessageStore implements
         doPostDelay("update");
     }
 
+    @Override
+    public void update(ConfiguredObjectRecord... records) throws AMQStoreException
+    {
+        doPreDelay("update");
+        _durableConfigurationStore.update(records);
+        doPostDelay("update");
+    }
+
     public Transaction newTransaction()
     {
         doPreDelay("beginTran");



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org