You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ay...@apache.org on 2012/04/05 21:33:19 UTC

svn commit: r1310028 - in /cxf/trunk/rt/ws/rm/src: main/java/org/apache/cxf/ws/rm/persistence/jdbc/Messages.properties main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreUpgradeTest.java

Author: ay
Date: Thu Apr  5 19:33:18 2012
New Revision: 1310028

URL: http://svn.apache.org/viewvc?rev=1310028&view=rev
Log:
[CXF-4229] Make upgrading the WS-RM's RMTxStore's tables easier

Added:
    cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreUpgradeTest.java
Modified:
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/Messages.properties
    cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/Messages.properties
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/Messages.properties?rev=1310028&r1=1310027&r2=1310028&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/Messages.properties (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/Messages.properties Thu Apr  5 19:33:18 2012
@@ -21,4 +21,5 @@
 CONNECT_EXC = Failed to connect to store.
 ABORT_FAILED_MSG = Failed to abort transaction.
 SELECT_DEST_SEQ_FAILED_MSG = Failed to retrieve destination sequences from persistent store.
-SELECT_SRC_SEQ_FAILED_MSG = Failed to retrieve source sequences from persistent store.
\ No newline at end of file
+SELECT_SRC_SEQ_FAILED_MSG = Failed to retrieve source sequences from persistent store.
+VERIFY_TABLE_FAILED_MSG = Failed to verify the table definition.
\ No newline at end of file

Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java?rev=1310028&r1=1310027&r2=1310028&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java Thu Apr  5 19:33:18 2012
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.sql.Blob;
 import java.sql.Connection;
+import java.sql.DatabaseMetaData;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -33,6 +34,9 @@ import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.logging.Level;
@@ -58,36 +62,47 @@ import org.apache.cxf.ws.rm.v200702.Sequ
 public class RMTxStore implements RMStore {
     
     public static final String DEFAULT_DATABASE_NAME = "rmdb";
+    private static final String[][] DEST_SEQUENCES_TABLE_COLS 
+        = {{"SEQ_ID", "VARCHAR(256) NOT NULL"},
+           {"ACKS_TO", "VARCHAR(1024) NOT NULL"},
+           {"LAST_MSG_NO", "DECIMAL(19, 0)"},
+           {"ENDPOINT_ID", "VARCHAR(1024)"},
+           {"ACKNOWLEDGED", "BLOB"},
+           {"PROTOCOL_VERSION", "VARCHAR(256)"}};
+    private static final String[] DEST_SEQUENCES_TABLE_KEYS = {"SEQ_ID"};
+    private static final String[][] SRC_SEQUENCES_TABLE_COLS
+        = {{"SEQ_ID", "VARCHAR(256) NOT NULL"},
+           {"CUR_MSG_NO", "DECIMAL(19, 0) DEFAULT 1 NOT NULL"},
+           {"LAST_MSG", "CHAR(1)"},
+           {"EXPIRY", "DECIMAL(19, 0)"},
+           {"OFFERING_SEQ_ID", "VARCHAR(256)"},
+           {"ENDPOINT_ID", "VARCHAR(1024)"},
+           {"PROTOCOL_VERSION", "VARCHAR(256)"}};
+    private static final String[] SRC_SEQUENCES_TABLE_KEYS = {"SEQ_ID"};
+    private static final String[][] MESSAGES_TABLE_COLS
+        = {{"SEQ_ID", "VARCHAR(256) NOT NULL"},
+           {"MSG_NO", "DECIMAL(19, 0) NOT NULL"},
+           {"SEND_TO", "VARCHAR(256)"},
+           {"CONTENT", "BLOB"}};
+    private static final String[] MESSAGES_TABLE_KEYS = {"SEQ_ID", "MSG_NO"};
     
-    private static final String CREATE_DEST_SEQUENCES_TABLE_STMT =
-        "CREATE TABLE CXF_RM_DEST_SEQUENCES " 
-        + "(SEQ_ID VARCHAR(256) NOT NULL, "
-        + "ACKS_TO VARCHAR(1024) NOT NULL, "
-        + "LAST_MSG_NO DECIMAL(19, 0), "
-        + "ENDPOINT_ID VARCHAR(1024), "
-        + "ACKNOWLEDGED BLOB, "
-        + "PROTOCOL_VERSION VARCHAR(256), "
-        + "PRIMARY KEY (SEQ_ID))";
-    private static final String CREATE_SRC_SEQUENCES_TABLE_STMT =
-        "CREATE TABLE CXF_RM_SRC_SEQUENCES " 
-        + "(SEQ_ID VARCHAR(256) NOT NULL, "
-        + "CUR_MSG_NO DECIMAL(19, 0) DEFAULT 1 NOT NULL, "
-        + "LAST_MSG CHAR(1), "
-        + "EXPIRY DECIMAL(19, 0), "
-        + "OFFERING_SEQ_ID VARCHAR(256), "
-        + "ENDPOINT_ID VARCHAR(1024), "
-        + "PROTOCOL_VERSION VARCHAR(256), "
-        + "PRIMARY KEY (SEQ_ID))";
-    private static final String CREATE_MESSAGES_TABLE_STMT =
-        "CREATE TABLE {0} " 
-        + "(SEQ_ID VARCHAR(256) NOT NULL, "
-        + "MSG_NO DECIMAL(19, 0) NOT NULL, "
-        + "SEND_TO VARCHAR(256), "
-        + "CONTENT BLOB, "
-        + "PRIMARY KEY (SEQ_ID, MSG_NO))";
+
+    private static final String DEST_SEQUENCES_TABLE_NAME = "CXF_RM_DEST_SEQUENCES"; 
+    private static final String SRC_SEQUENCES_TABLE_NAME = "CXF_RM_SRC_SEQUENCES";
     private static final String INBOUND_MSGS_TABLE_NAME = "CXF_RM_INBOUND_MESSAGES";
     private static final String OUTBOUND_MSGS_TABLE_NAME = "CXF_RM_OUTBOUND_MESSAGES";    
     
+    private static final String CREATE_DEST_SEQUENCES_TABLE_STMT = 
+        buildCreateTableStatement(DEST_SEQUENCES_TABLE_NAME, 
+                                  DEST_SEQUENCES_TABLE_COLS, DEST_SEQUENCES_TABLE_KEYS);
+
+    private static final String CREATE_SRC_SEQUENCES_TABLE_STMT =
+        buildCreateTableStatement(SRC_SEQUENCES_TABLE_NAME, 
+                                  SRC_SEQUENCES_TABLE_COLS, SRC_SEQUENCES_TABLE_KEYS);
+    private static final String CREATE_MESSAGES_TABLE_STMT =
+        buildCreateTableStatement("{0}", 
+                                  MESSAGES_TABLE_COLS, MESSAGES_TABLE_KEYS);
+
     private static final String CREATE_DEST_SEQUENCE_STMT_STR 
         = "INSERT INTO CXF_RM_DEST_SEQUENCES (SEQ_ID, ACKS_TO, ENDPOINT_ID, PROTOCOL_VERSION) " 
             + "VALUES(?, ?, ?, ?)";
@@ -119,7 +134,8 @@ public class RMTxStore implements RMStor
         + "FROM CXF_RM_SRC_SEQUENCES WHERE ENDPOINT_ID = ?";
     private static final String SELECT_MESSAGES_STMT_STR =
         "SELECT MSG_NO, SEND_TO, CONTENT FROM {0} WHERE SEQ_ID = ?";
-    
+    private static final String ALTER_TABLE_STMT_STR =
+        "ALTER TABLE {0} ADD {1} {2}";
     private static final String DERBY_TABLE_EXISTS_STATE = "X0Y32";
     private static final int ORACLE_TABLE_EXISTS_CODE = 955;
     
@@ -640,10 +656,11 @@ public class RMTxStore implements RMStor
                 throw ex;
             } else {
                 LOG.fine("Table CXF_RM_SRC_SEQUENCES already exists.");
+                verifyTable(SRC_SEQUENCES_TABLE_NAME, SRC_SEQUENCES_TABLE_COLS);
             }
         }
         stmt.close();
-        
+
         stmt = connection.createStatement();
         try {
             stmt.executeUpdate(CREATE_DEST_SEQUENCES_TABLE_STMT);
@@ -652,6 +669,7 @@ public class RMTxStore implements RMStor
                 throw ex;
             } else {
                 LOG.fine("Table CXF_RM_DEST_SEQUENCES already exists.");
+                verifyTable(DEST_SEQUENCES_TABLE_NAME, DEST_SEQUENCES_TABLE_COLS);        
             }
         }
         stmt.close();
@@ -667,12 +685,58 @@ public class RMTxStore implements RMStor
                     if (LOG.isLoggable(Level.FINE)) {
                         LOG.fine("Table " + tableName + " already exists.");
                     }
+                    verifyTable(tableName, MESSAGES_TABLE_COLS);
                 }
             }
             stmt.close();
         }
     }
     
+    protected void verifyTable(String tableName, String[][] tableCols) {
+        try {
+            DatabaseMetaData metadata = connection.getMetaData();
+            ResultSet rs = metadata.getColumns(null, null, tableName, "%");
+            Set<String> dbCols = new HashSet<String>();
+            List<String[]> newCols = new ArrayList<String[]>(); 
+            while (rs.next()) {
+                dbCols.add(rs.getString(4));
+            }
+            for (String[] col : tableCols) {
+                if (!dbCols.contains(col[0])) {
+                    newCols.add(col);
+                }
+            }
+            if (newCols.size() > 0) {
+                // need to add the new columns
+                if (LOG.isLoggable(Level.FINE)) {
+                    LOG.log(Level.FINE, "Table " + tableName + " needs additional columns");
+                }
+                
+                for (String[] newCol : newCols) {
+                    Statement st = null;
+                    try {
+                        st = connection.createStatement();
+                        st.executeUpdate(MessageFormat.format(ALTER_TABLE_STMT_STR, 
+                                                              tableName, newCol[0], newCol[1]));
+                        if (LOG.isLoggable(Level.FINE)) {
+                            LOG.log(Level.FINE, "Successfully added column {0} to table {1}",
+                                    new Object[] {tableName, newCol[0]});
+                        }
+                    } finally {
+                        if (st != null) {
+                            st.close();
+                        }
+                    }
+                }
+            }
+            
+        } catch (SQLException ex) {
+            if (LOG.isLoggable(Level.FINE)) {
+                LOG.fine("Table " + tableName + " cannot be verified.");
+            }
+        }
+    }
+
     @PostConstruct     
     public synchronized void init() {
         
@@ -787,7 +851,24 @@ public class RMTxStore implements RMStor
             dir.deleteOnExit();
         }
     }
-    
+     
+    private static String buildCreateTableStatement(String name, String[][] cols, String[] keys) {
+        StringBuffer buf = new StringBuffer();
+        buf.append("CREATE TABLE ").append(name).append(" (");
+        for (String[] col : cols) {
+            buf.append(col[0]).append(" ").append(col[1]).append(", ");
+        }
+        buf.append("PRIMARY KEY (");
+        for (int i = 0; i < keys.length; i++) {
+            if (i > 0) {
+                buf.append(", ");
+            }
+            buf.append(keys[i]);
+        }
+        buf.append("))");
+        return buf.toString();
+    }
+
     protected boolean isTableExistsError(SQLException ex) {
         // we could be deriving the state/code from the driver url to avoid explicit setting of them
         return (null != tableExistsState && tableExistsState.equals(ex.getSQLState()))

Added: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreUpgradeTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreUpgradeTest.java?rev=1310028&view=auto
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreUpgradeTest.java (added)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreUpgradeTest.java Thu Apr  5 19:33:18 2012
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm.persistence.jdbc;
+
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.MessageFormat;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests the automatic table updating of RMTxStore that allows compatible changes 
+ * in the database tables. 
+ */
+public class RMTxStoreUpgradeTest extends Assert {
+    private static final String CREATE_OLD_SRC_SEQ_TABLE_STMT = 
+        "CREATE TABLE CXF_RM_SRC_SEQUENCES " 
+        + "(SEQ_ID VARCHAR(256) NOT NULL, "
+        + "CUR_MSG_NO DECIMAL(19, 0) DEFAULT 1 NOT NULL, "
+        + "LAST_MSG CHAR(1), "
+        + "EXPIRY DECIMAL(19, 0), "
+        + "OFFERING_SEQ_ID VARCHAR(256), "
+        + "ENDPOINT_ID VARCHAR(1024), "            
+        + "PRIMARY KEY (SEQ_ID))";
+
+    private static final String CREATE_OLD_DEST_SEQ_TABLE_STMT = 
+        "CREATE TABLE CXF_RM_DEST_SEQUENCES " 
+        + "(SEQ_ID VARCHAR(256) NOT NULL, "
+        + "ACKS_TO VARCHAR(1024) NOT NULL, "
+        + "LAST_MSG_NO DECIMAL(19, 0), "
+        + "ENDPOINT_ID VARCHAR(1024), "
+        + "ACKNOWLEDGED BLOB, "
+        + "PRIMARY KEY (SEQ_ID))";
+
+    private static final String CREATE_OLD_MSGS_TABLE_STMT =
+        "CREATE TABLE {0} " 
+        + "(SEQ_ID VARCHAR(256) NOT NULL, "
+        + "MSG_NO DECIMAL(19, 0) NOT NULL, "
+        + "SEND_TO VARCHAR(256), "
+        + "CONTENT BLOB, "
+        + "PRIMARY KEY (SEQ_ID, MSG_NO))";
+
+    private static final String INBOUND_MSGS_TABLE_NAME = "CXF_RM_INBOUND_MESSAGES";
+    private static final String OUTBOUND_MSGS_TABLE_NAME = "CXF_RM_OUTBOUND_MESSAGES";    
+    
+    private static final String TEST_DB_NAME = "rmdb2";
+
+    @BeforeClass 
+    public static void setUpOnce() {
+        RMTxStore.deleteDatabaseFiles(TEST_DB_NAME, true);
+    }
+     
+    @AfterClass
+    public static void tearDownOnce() {
+        RMTxStore.deleteDatabaseFiles(TEST_DB_NAME, false);
+    }
+    
+    @Test
+    public void testUpgradeTables() throws Exception {
+        TestRMTxStore store = new TestRMTxStore();
+        store.setDriverClassName("org.apache.derby.jdbc.EmbeddedDriver");
+        
+        // workaround for the db file deletion problem during the tests
+        store.setUrl(MessageFormat.format("jdbc:derby:{0};create=true", TEST_DB_NAME));
+        
+        // use the old db definitions to create the tables
+        store.init();
+        
+        // verify the absence of the new columns in the tables
+        verifyColumns(store, "CXF_RM_SRC_SEQUENCES", new String[]{"PROTOCOL_VERSION"}, true);
+        verifyColumns(store, "CXF_RM_DEST_SEQUENCES", new String[]{"PROTOCOL_VERSION"}, true);
+        verifyColumns(store, INBOUND_MSGS_TABLE_NAME, new String[]{}, true);
+        verifyColumns(store, OUTBOUND_MSGS_TABLE_NAME, new String[]{}, true);
+        
+        // upgrade the tables and add new columns to the old tables
+        store.upgrade();
+        store.init();
+        
+        // verify the presence of the new columns in the upgraded tables
+        verifyColumns(store, "CXF_RM_SRC_SEQUENCES", new String[]{"PROTOCOL_VERSION"}, false);
+        verifyColumns(store, "CXF_RM_DEST_SEQUENCES", new String[]{"PROTOCOL_VERSION"}, false);
+        verifyColumns(store, INBOUND_MSGS_TABLE_NAME, new String[]{}, false);
+        verifyColumns(store, OUTBOUND_MSGS_TABLE_NAME, new String[]{}, false);
+    }
+    
+    private static void verifyColumns(RMTxStore store, String tableName, 
+                                      String[] cols, boolean absent) throws Exception {
+        // verify the presence of the new fields
+        DatabaseMetaData metadata = store.getConnection().getMetaData();
+        ResultSet rs = metadata.getColumns(null, null, tableName, "%");
+        Set<String> colNames = new HashSet<String>();
+        Collections.addAll(colNames, cols);
+        while (rs.next()) {
+            colNames.remove(rs.getString(4));
+        }
+        
+        if (absent) {
+            assertEquals("Some new columns are already present", cols.length, colNames.size());            
+        } else {
+            assertEquals("Some new columns are still absent", 0, colNames.size());
+        }
+    }
+    
+    
+    static class TestRMTxStore extends RMTxStore {
+        private boolean upgraded;
+    
+        public void upgrade() {
+            upgraded = true;
+        }
+        
+        @Override
+        protected void createTables() throws SQLException {
+            if (upgraded) {
+                super.createTables();
+                return;
+            }
+            // creating the old tables
+            Statement stmt = null;
+            stmt = getConnection().createStatement();
+            try {
+                stmt.executeUpdate(CREATE_OLD_SRC_SEQ_TABLE_STMT);
+            } catch (SQLException ex) {
+                if (!isTableExistsError(ex)) {
+                    throw ex;
+                } 
+            }
+            stmt.close();
+        
+            stmt = getConnection().createStatement();
+            try {
+                stmt.executeUpdate(CREATE_OLD_DEST_SEQ_TABLE_STMT);
+            } catch (SQLException ex) {
+                if (!isTableExistsError(ex)) {
+                    throw ex;
+                }
+            }
+            stmt.close();
+
+            for (String tableName : new String[] {INBOUND_MSGS_TABLE_NAME, OUTBOUND_MSGS_TABLE_NAME}) {
+                stmt = getConnection().createStatement();
+                try {
+                    stmt.executeUpdate(MessageFormat.format(CREATE_OLD_MSGS_TABLE_STMT, tableName));
+                } catch (SQLException ex) {
+                    if (!isTableExistsError(ex)) {
+                        throw ex;
+                    }
+                }
+                stmt.close();
+            }
+        }
+    }
+}