You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ay...@apache.org on 2012/04/05 21:33:19 UTC
svn commit: r1310028 - in /cxf/trunk/rt/ws/rm/src:
main/java/org/apache/cxf/ws/rm/persistence/jdbc/Messages.properties
main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreUpgradeTest.java
Author: ay
Date: Thu Apr 5 19:33:18 2012
New Revision: 1310028
URL: http://svn.apache.org/viewvc?rev=1310028&view=rev
Log:
[CXF-4229] Make upgrading the WS-RM's RMTxStore's tables easier
Added:
cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreUpgradeTest.java
Modified:
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/Messages.properties
cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/Messages.properties
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/Messages.properties?rev=1310028&r1=1310027&r2=1310028&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/Messages.properties (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/Messages.properties Thu Apr 5 19:33:18 2012
@@ -21,4 +21,5 @@
CONNECT_EXC = Failed to connect to store.
ABORT_FAILED_MSG = Failed to abort transaction.
SELECT_DEST_SEQ_FAILED_MSG = Failed to retrieve destination sequences from persistent store.
-SELECT_SRC_SEQ_FAILED_MSG = Failed to retrieve source sequences from persistent store.
\ No newline at end of file
+SELECT_SRC_SEQ_FAILED_MSG = Failed to retrieve source sequences from persistent store.
+VERIFY_TABLE_FAILED_MSG = Failed to verify the table definition.
\ No newline at end of file
Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java?rev=1310028&r1=1310027&r2=1310028&view=diff
==============================================================================
--- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java (original)
+++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java Thu Apr 5 19:33:18 2012
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.sql.Blob;
import java.sql.Connection;
+import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -33,6 +34,9 @@ import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
@@ -58,36 +62,47 @@ import org.apache.cxf.ws.rm.v200702.Sequ
public class RMTxStore implements RMStore {
public static final String DEFAULT_DATABASE_NAME = "rmdb";
+ private static final String[][] DEST_SEQUENCES_TABLE_COLS
+ = {{"SEQ_ID", "VARCHAR(256) NOT NULL"},
+ {"ACKS_TO", "VARCHAR(1024) NOT NULL"},
+ {"LAST_MSG_NO", "DECIMAL(19, 0)"},
+ {"ENDPOINT_ID", "VARCHAR(1024)"},
+ {"ACKNOWLEDGED", "BLOB"},
+ {"PROTOCOL_VERSION", "VARCHAR(256)"}};
+ private static final String[] DEST_SEQUENCES_TABLE_KEYS = {"SEQ_ID"};
+ private static final String[][] SRC_SEQUENCES_TABLE_COLS
+ = {{"SEQ_ID", "VARCHAR(256) NOT NULL"},
+ {"CUR_MSG_NO", "DECIMAL(19, 0) DEFAULT 1 NOT NULL"},
+ {"LAST_MSG", "CHAR(1)"},
+ {"EXPIRY", "DECIMAL(19, 0)"},
+ {"OFFERING_SEQ_ID", "VARCHAR(256)"},
+ {"ENDPOINT_ID", "VARCHAR(1024)"},
+ {"PROTOCOL_VERSION", "VARCHAR(256)"}};
+ private static final String[] SRC_SEQUENCES_TABLE_KEYS = {"SEQ_ID"};
+ private static final String[][] MESSAGES_TABLE_COLS
+ = {{"SEQ_ID", "VARCHAR(256) NOT NULL"},
+ {"MSG_NO", "DECIMAL(19, 0) NOT NULL"},
+ {"SEND_TO", "VARCHAR(256)"},
+ {"CONTENT", "BLOB"}};
+ private static final String[] MESSAGES_TABLE_KEYS = {"SEQ_ID", "MSG_NO"};
- private static final String CREATE_DEST_SEQUENCES_TABLE_STMT =
- "CREATE TABLE CXF_RM_DEST_SEQUENCES "
- + "(SEQ_ID VARCHAR(256) NOT NULL, "
- + "ACKS_TO VARCHAR(1024) NOT NULL, "
- + "LAST_MSG_NO DECIMAL(19, 0), "
- + "ENDPOINT_ID VARCHAR(1024), "
- + "ACKNOWLEDGED BLOB, "
- + "PROTOCOL_VERSION VARCHAR(256), "
- + "PRIMARY KEY (SEQ_ID))";
- private static final String CREATE_SRC_SEQUENCES_TABLE_STMT =
- "CREATE TABLE CXF_RM_SRC_SEQUENCES "
- + "(SEQ_ID VARCHAR(256) NOT NULL, "
- + "CUR_MSG_NO DECIMAL(19, 0) DEFAULT 1 NOT NULL, "
- + "LAST_MSG CHAR(1), "
- + "EXPIRY DECIMAL(19, 0), "
- + "OFFERING_SEQ_ID VARCHAR(256), "
- + "ENDPOINT_ID VARCHAR(1024), "
- + "PROTOCOL_VERSION VARCHAR(256), "
- + "PRIMARY KEY (SEQ_ID))";
- private static final String CREATE_MESSAGES_TABLE_STMT =
- "CREATE TABLE {0} "
- + "(SEQ_ID VARCHAR(256) NOT NULL, "
- + "MSG_NO DECIMAL(19, 0) NOT NULL, "
- + "SEND_TO VARCHAR(256), "
- + "CONTENT BLOB, "
- + "PRIMARY KEY (SEQ_ID, MSG_NO))";
+
+ private static final String DEST_SEQUENCES_TABLE_NAME = "CXF_RM_DEST_SEQUENCES";
+ private static final String SRC_SEQUENCES_TABLE_NAME = "CXF_RM_SRC_SEQUENCES";
private static final String INBOUND_MSGS_TABLE_NAME = "CXF_RM_INBOUND_MESSAGES";
private static final String OUTBOUND_MSGS_TABLE_NAME = "CXF_RM_OUTBOUND_MESSAGES";
+ private static final String CREATE_DEST_SEQUENCES_TABLE_STMT =
+ buildCreateTableStatement(DEST_SEQUENCES_TABLE_NAME,
+ DEST_SEQUENCES_TABLE_COLS, DEST_SEQUENCES_TABLE_KEYS);
+
+ private static final String CREATE_SRC_SEQUENCES_TABLE_STMT =
+ buildCreateTableStatement(SRC_SEQUENCES_TABLE_NAME,
+ SRC_SEQUENCES_TABLE_COLS, SRC_SEQUENCES_TABLE_KEYS);
+ private static final String CREATE_MESSAGES_TABLE_STMT =
+ buildCreateTableStatement("{0}",
+ MESSAGES_TABLE_COLS, MESSAGES_TABLE_KEYS);
+
private static final String CREATE_DEST_SEQUENCE_STMT_STR
= "INSERT INTO CXF_RM_DEST_SEQUENCES (SEQ_ID, ACKS_TO, ENDPOINT_ID, PROTOCOL_VERSION) "
+ "VALUES(?, ?, ?, ?)";
@@ -119,7 +134,8 @@ public class RMTxStore implements RMStor
+ "FROM CXF_RM_SRC_SEQUENCES WHERE ENDPOINT_ID = ?";
private static final String SELECT_MESSAGES_STMT_STR =
"SELECT MSG_NO, SEND_TO, CONTENT FROM {0} WHERE SEQ_ID = ?";
-
+ private static final String ALTER_TABLE_STMT_STR =
+ "ALTER TABLE {0} ADD {1} {2}";
private static final String DERBY_TABLE_EXISTS_STATE = "X0Y32";
private static final int ORACLE_TABLE_EXISTS_CODE = 955;
@@ -640,10 +656,11 @@ public class RMTxStore implements RMStor
throw ex;
} else {
LOG.fine("Table CXF_RM_SRC_SEQUENCES already exists.");
+ verifyTable(SRC_SEQUENCES_TABLE_NAME, SRC_SEQUENCES_TABLE_COLS);
}
}
stmt.close();
-
+
stmt = connection.createStatement();
try {
stmt.executeUpdate(CREATE_DEST_SEQUENCES_TABLE_STMT);
@@ -652,6 +669,7 @@ public class RMTxStore implements RMStor
throw ex;
} else {
LOG.fine("Table CXF_RM_DEST_SEQUENCES already exists.");
+ verifyTable(DEST_SEQUENCES_TABLE_NAME, DEST_SEQUENCES_TABLE_COLS);
}
}
stmt.close();
@@ -667,12 +685,58 @@ public class RMTxStore implements RMStor
if (LOG.isLoggable(Level.FINE)) {
LOG.fine("Table " + tableName + " already exists.");
}
+ verifyTable(tableName, MESSAGES_TABLE_COLS);
}
}
stmt.close();
}
}
+ protected void verifyTable(String tableName, String[][] tableCols) {
+ try {
+ DatabaseMetaData metadata = connection.getMetaData();
+ ResultSet rs = metadata.getColumns(null, null, tableName, "%");
+ Set<String> dbCols = new HashSet<String>();
+ List<String[]> newCols = new ArrayList<String[]>();
+ while (rs.next()) {
+ dbCols.add(rs.getString(4));
+ }
+ for (String[] col : tableCols) {
+ if (!dbCols.contains(col[0])) {
+ newCols.add(col);
+ }
+ }
+ if (newCols.size() > 0) {
+ // need to add the new columns
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "Table " + tableName + " needs additional columns");
+ }
+
+ for (String[] newCol : newCols) {
+ Statement st = null;
+ try {
+ st = connection.createStatement();
+ st.executeUpdate(MessageFormat.format(ALTER_TABLE_STMT_STR,
+ tableName, newCol[0], newCol[1]));
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.log(Level.FINE, "Successfully added column {0} to table {1}",
+ new Object[] {tableName, newCol[0]});
+ }
+ } finally {
+ if (st != null) {
+ st.close();
+ }
+ }
+ }
+ }
+
+ } catch (SQLException ex) {
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.fine("Table " + tableName + " cannot be verified.");
+ }
+ }
+ }
+
@PostConstruct
public synchronized void init() {
@@ -787,7 +851,24 @@ public class RMTxStore implements RMStor
dir.deleteOnExit();
}
}
-
+
+ private static String buildCreateTableStatement(String name, String[][] cols, String[] keys) {
+ StringBuffer buf = new StringBuffer();
+ buf.append("CREATE TABLE ").append(name).append(" (");
+ for (String[] col : cols) {
+ buf.append(col[0]).append(" ").append(col[1]).append(", ");
+ }
+ buf.append("PRIMARY KEY (");
+ for (int i = 0; i < keys.length; i++) {
+ if (i > 0) {
+ buf.append(", ");
+ }
+ buf.append(keys[i]);
+ }
+ buf.append("))");
+ return buf.toString();
+ }
+
protected boolean isTableExistsError(SQLException ex) {
// we could be deriving the state/code from the driver url to avoid explicit setting of them
return (null != tableExistsState && tableExistsState.equals(ex.getSQLState()))
Added: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreUpgradeTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreUpgradeTest.java?rev=1310028&view=auto
==============================================================================
--- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreUpgradeTest.java (added)
+++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStoreUpgradeTest.java Thu Apr 5 19:33:18 2012
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.ws.rm.persistence.jdbc;
+
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.MessageFormat;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests the automatic table updating of RMTxStore that allows compatible changes
+ * in the database tables.
+ */
+public class RMTxStoreUpgradeTest extends Assert {
+ private static final String CREATE_OLD_SRC_SEQ_TABLE_STMT =
+ "CREATE TABLE CXF_RM_SRC_SEQUENCES "
+ + "(SEQ_ID VARCHAR(256) NOT NULL, "
+ + "CUR_MSG_NO DECIMAL(19, 0) DEFAULT 1 NOT NULL, "
+ + "LAST_MSG CHAR(1), "
+ + "EXPIRY DECIMAL(19, 0), "
+ + "OFFERING_SEQ_ID VARCHAR(256), "
+ + "ENDPOINT_ID VARCHAR(1024), "
+ + "PRIMARY KEY (SEQ_ID))";
+
+ private static final String CREATE_OLD_DEST_SEQ_TABLE_STMT =
+ "CREATE TABLE CXF_RM_DEST_SEQUENCES "
+ + "(SEQ_ID VARCHAR(256) NOT NULL, "
+ + "ACKS_TO VARCHAR(1024) NOT NULL, "
+ + "LAST_MSG_NO DECIMAL(19, 0), "
+ + "ENDPOINT_ID VARCHAR(1024), "
+ + "ACKNOWLEDGED BLOB, "
+ + "PRIMARY KEY (SEQ_ID))";
+
+ private static final String CREATE_OLD_MSGS_TABLE_STMT =
+ "CREATE TABLE {0} "
+ + "(SEQ_ID VARCHAR(256) NOT NULL, "
+ + "MSG_NO DECIMAL(19, 0) NOT NULL, "
+ + "SEND_TO VARCHAR(256), "
+ + "CONTENT BLOB, "
+ + "PRIMARY KEY (SEQ_ID, MSG_NO))";
+
+ private static final String INBOUND_MSGS_TABLE_NAME = "CXF_RM_INBOUND_MESSAGES";
+ private static final String OUTBOUND_MSGS_TABLE_NAME = "CXF_RM_OUTBOUND_MESSAGES";
+
+ private static final String TEST_DB_NAME = "rmdb2";
+
+ @BeforeClass
+ public static void setUpOnce() {
+ RMTxStore.deleteDatabaseFiles(TEST_DB_NAME, true);
+ }
+
+ @AfterClass
+ public static void tearDownOnce() {
+ RMTxStore.deleteDatabaseFiles(TEST_DB_NAME, false);
+ }
+
+ @Test
+ public void testUpgradeTables() throws Exception {
+ TestRMTxStore store = new TestRMTxStore();
+ store.setDriverClassName("org.apache.derby.jdbc.EmbeddedDriver");
+
+ // workaround for the db file deletion problem during the tests
+ store.setUrl(MessageFormat.format("jdbc:derby:{0};create=true", TEST_DB_NAME));
+
+ // use the old db definitions to create the tables
+ store.init();
+
+ // verify the absence of the new columns in the tables
+ verifyColumns(store, "CXF_RM_SRC_SEQUENCES", new String[]{"PROTOCOL_VERSION"}, true);
+ verifyColumns(store, "CXF_RM_DEST_SEQUENCES", new String[]{"PROTOCOL_VERSION"}, true);
+ verifyColumns(store, INBOUND_MSGS_TABLE_NAME, new String[]{}, true);
+ verifyColumns(store, OUTBOUND_MSGS_TABLE_NAME, new String[]{}, true);
+
+ // upgrade the tables and add new columns to the old tables
+ store.upgrade();
+ store.init();
+
+ // verify the presence of the new columns in the upgraded tables
+ verifyColumns(store, "CXF_RM_SRC_SEQUENCES", new String[]{"PROTOCOL_VERSION"}, false);
+ verifyColumns(store, "CXF_RM_DEST_SEQUENCES", new String[]{"PROTOCOL_VERSION"}, false);
+ verifyColumns(store, INBOUND_MSGS_TABLE_NAME, new String[]{}, false);
+ verifyColumns(store, OUTBOUND_MSGS_TABLE_NAME, new String[]{}, false);
+ }
+
+ private static void verifyColumns(RMTxStore store, String tableName,
+ String[] cols, boolean absent) throws Exception {
+ // verify the presence of the new fields
+ DatabaseMetaData metadata = store.getConnection().getMetaData();
+ ResultSet rs = metadata.getColumns(null, null, tableName, "%");
+ Set<String> colNames = new HashSet<String>();
+ Collections.addAll(colNames, cols);
+ while (rs.next()) {
+ colNames.remove(rs.getString(4));
+ }
+
+ if (absent) {
+ assertEquals("Some new columns are already present", cols.length, colNames.size());
+ } else {
+ assertEquals("Some new columns are still absent", 0, colNames.size());
+ }
+ }
+
+
+ static class TestRMTxStore extends RMTxStore {
+ private boolean upgraded;
+
+ public void upgrade() {
+ upgraded = true;
+ }
+
+ @Override
+ protected void createTables() throws SQLException {
+ if (upgraded) {
+ super.createTables();
+ return;
+ }
+ // creating the old tables
+ Statement stmt = null;
+ stmt = getConnection().createStatement();
+ try {
+ stmt.executeUpdate(CREATE_OLD_SRC_SEQ_TABLE_STMT);
+ } catch (SQLException ex) {
+ if (!isTableExistsError(ex)) {
+ throw ex;
+ }
+ }
+ stmt.close();
+
+ stmt = getConnection().createStatement();
+ try {
+ stmt.executeUpdate(CREATE_OLD_DEST_SEQ_TABLE_STMT);
+ } catch (SQLException ex) {
+ if (!isTableExistsError(ex)) {
+ throw ex;
+ }
+ }
+ stmt.close();
+
+ for (String tableName : new String[] {INBOUND_MSGS_TABLE_NAME, OUTBOUND_MSGS_TABLE_NAME}) {
+ stmt = getConnection().createStatement();
+ try {
+ stmt.executeUpdate(MessageFormat.format(CREATE_OLD_MSGS_TABLE_STMT, tableName));
+ } catch (SQLException ex) {
+ if (!isTableExistsError(ex)) {
+ throw ex;
+ }
+ }
+ stmt.close();
+ }
+ }
+ }
+}