You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2009/01/09 09:00:37 UTC

svn commit: r732947 - in /servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main: DefaultJDBCLock.java Lock.java LockMonitor.java Main.java SimpleFileLock.java Statements.java

Author: gnodet
Date: Fri Jan  9 00:00:37 2009
New Revision: 732947

URL: http://svn.apache.org/viewvc?rev=732947&view=rev
Log:
SMX4KNL-163: Provide default jdbc lock implementation for master/slave deployments

Added:
    servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main/DefaultJDBCLock.java
    servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main/LockMonitor.java
    servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main/Statements.java
Modified:
    servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main/Lock.java
    servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main/Main.java
    servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main/SimpleFileLock.java

Added: servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main/DefaultJDBCLock.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main/DefaultJDBCLock.java?rev=732947&view=auto
==============================================================================
--- servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main/DefaultJDBCLock.java (added)
+++ servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main/DefaultJDBCLock.java Fri Jan  9 00:00:37 2009
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.servicemix.kernel.main;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Properties;
+
+/**
+ * Represents an exclusive lock on a database,
+ * used to avoid multiple SMX instances attempting
+ * to become master.
+ * 
+ * @version $Revision: $
+ */
+public class DefaultJDBCLock implements Lock {
+
+    private static final String PROPERTY_LOCK_URL           = "servicemix.lock.jdbc.url";
+    private static final String PROPERTY_LOCK_JDBC_DRIVER   = "servicemix.lock.jdbc.driver";
+    private static final String PROPERTY_LOCK_JDBC_USER     = "servicemix.lock.jdbc.user";
+    private static final String PROPERTY_LOCK_JDBC_PASSWORD = "servicemix.lock.jdbc.password";
+    private static final String PROPERTY_LOCK_JDBC_TABLE    = "servicemix.lock.jdbc.table";
+    private static final String PROPERTY_LOCK_JDBC_CLUSTERNAME     = "servicemix.lock.jdbc.clustername";
+    private static final String PROPERTY_LOCK_JDBC_TIMEOUT  = "servicemix.lock.jdbc.timeout";
+    private final Statements statements;
+    private Connection lockConnection;
+    private String url;
+    private String driver;
+    private String user; 
+    private String password;
+    private String table;
+    private String clusterName;
+    private int timeout;
+
+    public DefaultJDBCLock(Properties props) {
+        this.url = props.getProperty(PROPERTY_LOCK_URL);
+        this.driver = props.getProperty(PROPERTY_LOCK_JDBC_DRIVER);
+        this.user = props.getProperty(PROPERTY_LOCK_JDBC_USER);
+        this.password = props.getProperty(PROPERTY_LOCK_JDBC_PASSWORD);
+        this.table = props.getProperty(PROPERTY_LOCK_JDBC_TABLE);
+        this.clusterName = props.getProperty(PROPERTY_LOCK_JDBC_CLUSTERNAME);
+        String time = props.getProperty(PROPERTY_LOCK_JDBC_TIMEOUT);
+        this.lockConnection = null;
+        if (table == null) { table = "SERVICEMIX_LOCK"; }
+        if ( clusterName == null) { clusterName = "smx4"; }
+        this.statements = new Statements(table, clusterName);
+        if (time != null) { 
+            this.timeout = Integer.parseInt(time) * 1000; 
+        } else {
+            this.timeout = 10000; // 10 seconds
+        }
+        if (user == null) { user = ""; }
+        if (password == null) { password = ""; }
+        try {
+            obtainLock();
+        } catch (Exception e) {
+            System.err.println("Error occured while attempting to obtain connection: " + e.getMessage());
+        }
+    }
+
+    /**
+     * obtainLock - obtain the lock connection.
+     *
+     * @throws Exception
+     */
+    private void obtainLock() throws Exception {
+        PreparedStatement statement = null;
+        while (true) {
+            try {
+                lockConnection = getConnection(driver, url, user, password);
+                lockConnection.setAutoCommit(false);
+                statements.init(lockConnection);
+                String sql = statements.testLockTableStatus();
+                statement = lockConnection.prepareStatement(sql);
+                statement.execute();
+                break;
+            } catch (Exception e) {
+                System.err.println("Could not obtain lock: " + e.getMessage());
+                Thread.sleep(this.timeout);
+            } finally {
+                if (null != statement) {
+                    try {
+                        statement.close();
+                    } catch (SQLException e1) {
+                        System.err.println("Caught while closing statement: " + e1.getMessage());
+                    }
+                    statement = null;
+                }
+            }
+            Thread.sleep(this.timeout);
+        }
+        System.out.println("Connected to data source: " + url);
+    }
+
+    /**
+     * lock - a KeepAlive function to maintain lock. 
+     *
+     * @return true if connection lock retained, false otherwise.
+     */
+    public boolean lock() {
+        PreparedStatement statement = null;
+        boolean result = false;
+        try {
+            if (lockConnection.isClosed()) { obtainLock(); } 
+            long time = System.currentTimeMillis();
+            statement = lockConnection.prepareStatement(statements.getLockUpdateStatement(time));
+            int rows = statement.executeUpdate();
+            if (rows == 1) {
+                result=true;
+            }
+        } catch (Exception e) {
+            System.err.println("Failed to acquire database lock: " + e.getMessage());
+        }finally {
+            if (statement != null) {
+                try {
+                    statement.close();
+                } catch (SQLException e) {
+                    System.err.println("Failed to close statement" + e);
+                }
+            }
+        }
+        return result;
+    }
+
+    /**
+     * release - terminate the lock connection safely.
+     */
+    public void release() throws Exception {
+        if (lockConnection != null && !lockConnection.isClosed()) {
+            lockConnection.rollback();
+            lockConnection.close();
+        }
+    }
+
+    /**
+     * isAlive - test if lock still exists.
+     */
+    public boolean isAlive() throws Exception {
+        if (lockConnection == null) { return false; }
+        PreparedStatement statement = null;
+        try { 
+            lockConnection.setAutoCommit(false);
+            statements.init(lockConnection);
+            String sql = statements.testLockTableStatus();
+            statement = lockConnection.prepareStatement(sql);
+            statement.execute();
+        } catch (Exception ex) {
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * getConnection - Obtain connection to database via jdbc driver.
+     *
+     * @throws Exception
+     * @param driver, the JDBC driver class.
+     * @param url, url to data source.
+     * @param username, user to access data source.
+     * @param password, password for specified user.
+     * @return connection, null returned if conenction fails.
+     */
+    private Connection getConnection(String driver, String url, 
+                                     String username, String password) throws Exception {
+        Connection conn = null;
+        try {
+            Class.forName(driver);
+            conn = DriverManager.getConnection(url + ";create=true", username, password);
+        } catch (Exception e) {
+            throw e; 
+        }
+        return conn;
+    }
+
+}

Modified: servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main/Lock.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main/Lock.java?rev=732947&r1=732946&r2=732947&view=diff
==============================================================================
--- servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main/Lock.java (original)
+++ servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main/Lock.java Fri Jan  9 00:00:37 2009
@@ -24,4 +24,6 @@
 
     void release() throws Exception;
 
+    boolean isAlive() throws Exception;
+
 }

Added: servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main/LockMonitor.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main/LockMonitor.java?rev=732947&view=auto
==============================================================================
--- servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main/LockMonitor.java (added)
+++ servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main/LockMonitor.java Fri Jan  9 00:00:37 2009
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.servicemix.kernel.main;
+
+import org.apache.felix.framework.Felix;
+
+/**
+ * LockMonitor monitors the status of the startup lock,
+ * if monitor determines that the lock has been lost
+ * then it will signal the SMX process to shutdown and 
+ * return to attempting to get the lock.
+ *
+ * @version $Revision: $
+ */
+class LockMonitor extends Thread {
+
+    private Lock lock;
+    private Felix felix;
+    private int timeout;
+
+    LockMonitor (Lock lock, Felix felix, int timeout) {
+        this.lock = lock;
+        this.felix = felix;
+        this.timeout = timeout;
+    }
+
+    public void run() {
+        try {
+            for (;;) {
+                if (!lock.isAlive()) {
+                    break;
+                }
+                Thread.sleep(timeout);
+            }
+            System.out.println("Lost the lock, stopping this instance ...");
+            felix.stop();
+        } catch (Exception ex) {
+            System.err.println("An error has occured while monitoring failover lock: " + ex.getMessage());
+        } finally {
+            System.exit(1);
+        }
+    }
+
+}

Modified: servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main/Main.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main/Main.java?rev=732947&r1=732946&r2=732947&view=diff
==============================================================================
--- servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main/Main.java (original)
+++ servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main/Main.java Fri Jan  9 00:00:37 2009
@@ -123,6 +123,20 @@
 
     public static final String PROPERTY_LOCK_CLASS_DEFAULT = SimpleFileLock.class.getName();
 
+    public static final String PROPERTY_LOCK_URL = "servicemix.lock.jdbc.url";
+ 
+    public static final String PROPERTY_LOCK_JDBC_DRIVER = "servicemix.lock.jdbc.driver";
+
+    public static final String PROPERTY_LOCK_JDBC_USER = "servicemix.lock.jdbc.user";
+
+    public static final String PROPERTY_LOCK_JDBC_PASSWORD = "servicemix.lock.jdbc.password";
+
+    public static final String PROPERTY_LOCK_JDBC_TABLE = "servicemix.lock.jdbc.table";
+
+    public static final String PROPERTY_LOCK_JDBC_CLUSTERNAME = "servicemix.lock.jdbc.clustername";
+
+    public static final String PROPERTY_LOCK_JDBC_TIMEOUT = "servicemix.lock.jdbc.timeout";
+
     private File servicemixHome;
     private File servicemixBase;
     private static Properties m_configProps = null;
@@ -187,6 +201,10 @@
             lock(m_configProps);
             // Start up the OSGI framework
             m_felix = new Felix(new StringMap(m_configProps, false), activations);
+            // Start lock monitor
+            int pollLock = 30000;
+            Thread lockMonitor = new LockMonitor(lock, m_felix, pollLock);
+            lockMonitor.start();
             m_felix.start();
         }
         catch (Exception ex) {

Modified: servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main/SimpleFileLock.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main/SimpleFileLock.java?rev=732947&r1=732946&r2=732947&view=diff
==============================================================================
--- servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main/SimpleFileLock.java (original)
+++ servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main/SimpleFileLock.java Fri Jan  9 00:00:37 2009
@@ -63,6 +63,10 @@
         }
         lock = null;
     }
+ 
+    public boolean isAlive() throws Exception {
+        return lock != null;
+    }
 
     private static File getServiceMixLock(File lock,Properties props) {
         File rc = null;

Added: servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main/Statements.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main/Statements.java?rev=732947&view=auto
==============================================================================
--- servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main/Statements.java (added)
+++ servicemix/smx4/kernel/trunk/main/src/main/java/org/apache/servicemix/kernel/main/Statements.java Fri Jan  9 00:00:37 2009
@@ -0,0 +1,93 @@
+package org.apache.servicemix.kernel.main;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+public class Statements {
+
+    private String lockTableName = "SERVICEMIX_LOCK";
+    private String clusterName = "smx4";
+    private String lockCreateStatement;
+    private String lockPopulateStatement;
+    private String lockUpdateStatement;
+
+    public Statements(String tableName, String clusterName) {
+        this.lockTableName = tableName; 
+        this.clusterName = clusterName;
+        this.lockCreateStatement="create table " + lockTableName + " (TIME bigint, CLUSTER varchar(20))";
+        this.lockPopulateStatement="insert into " + lockTableName + " (TIME, CLUSTER) values (1, '" + clusterName + "')";
+    }
+
+    public String testLockTableStatus() {
+        String test = "SELECT * FROM " + lockTableName + " FOR UPDATE";
+        return test;
+    }
+
+    public String getLockUpdateStatement(long timeStamp) {
+        lockUpdateStatement = "UPDATE " + lockTableName + 
+                              " SET TIME=" + timeStamp + 
+                              " WHERE CLUSTER = '" + clusterName + "'";
+        return lockUpdateStatement;
+    }
+
+    /**
+     * init - initialize db
+     */
+    public void init (Connection lockConnection) {
+        Statement s = null;
+        try {
+            // Check to see if the table already exists. If it does, then don't
+            // log warnings during startup.
+            // Need to run the scripts anyways since they may contain ALTER
+            // statements that upgrade a previous version
+            // of the table
+            boolean alreadyExists = false;
+            ResultSet rs = null;
+            try {
+                rs = lockConnection.getMetaData().getTables(null, null, lockTableName, new String[] {"TABLE"});
+                alreadyExists = rs.next();
+            } catch (Throwable ignore) {
+                System.err.println(ignore);
+            } finally {
+                close(rs);
+            }
+            if (alreadyExists) {
+                return;
+            }
+            s = lockConnection.createStatement();
+            String[] createStatments = {lockCreateStatement, lockPopulateStatement};
+            for (int i = 0; i < createStatments.length; i++) {
+                // This will fail usually since the tables will be
+                // created already.
+                try {
+                    s.execute(createStatments[i]);
+                } catch (SQLException e) {
+                    System.err.println("Could not create JDBC tables; they could already exist."
+                                 + " Failure was: " + createStatments[i] + " Message: " + e.getMessage()
+                                 + " SQLState: " + e.getSQLState() + " Vendor code: " + e.getErrorCode());
+                }
+            }
+            lockConnection.commit();
+        } catch (Exception ignore) {
+            System.err.println(ignore);
+        } finally {
+            try {
+                s.close();
+            } catch (Throwable e) {
+                // ignore
+            }
+        }
+    }
+
+    private static void close(ResultSet rs) {
+        try {
+            rs.close();
+        } catch (Throwable e) {
+            // ignore
+        }
+    }
+
+}