You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@karaf.apache.org by gn...@apache.org on 2013/12/18 10:08:03 UTC

[1/3] git commit: [KARAF-1572] JDBC Lock without using long running transactions

Updated Branches:
  refs/heads/karaf-2.x 86b531bf2 -> 411b9e420


[KARAF-1572] JDBC Lock without using long running transactions


Project: http://git-wip-us.apache.org/repos/asf/karaf/repo
Commit: http://git-wip-us.apache.org/repos/asf/karaf/commit/411b9e42
Tree: http://git-wip-us.apache.org/repos/asf/karaf/tree/411b9e42
Diff: http://git-wip-us.apache.org/repos/asf/karaf/diff/411b9e42

Branch: refs/heads/karaf-2.x
Commit: 411b9e42098529a30642f9fbcd1df5be179437ce
Parents: a32bab6
Author: Guillaume Nodet <gn...@gmail.com>
Authored: Wed Dec 18 10:06:06 2013 +0100
Committer: Guillaume Nodet <gn...@gmail.com>
Committed: Wed Dec 18 10:07:40 2013 +0100

----------------------------------------------------------------------
 .../org/apache/karaf/main/DefaultJDBCLock.java  | 109 +++-
 .../org/apache/karaf/main/GenericJDBCLock.java  | 585 +++++++++++++++++++
 .../apache/karaf/main/GenericStatements.java    | 243 ++++++++
 .../main/java/org/apache/karaf/main/Main.java   |   6 +-
 4 files changed, 916 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/karaf/blob/411b9e42/main/src/main/java/org/apache/karaf/main/DefaultJDBCLock.java
----------------------------------------------------------------------
diff --git a/main/src/main/java/org/apache/karaf/main/DefaultJDBCLock.java b/main/src/main/java/org/apache/karaf/main/DefaultJDBCLock.java
index 452fbe6..8377605 100644
--- a/main/src/main/java/org/apache/karaf/main/DefaultJDBCLock.java
+++ b/main/src/main/java/org/apache/karaf/main/DefaultJDBCLock.java
@@ -25,6 +25,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Properties;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 /**
@@ -74,12 +75,17 @@ public class DefaultJDBCLock implements Lock {
         this.table = props.getProperty(PROPERTY_LOCK_JDBC_TABLE, DEFAULT_TABLE);
         this.clusterName = props.getProperty(PROPERTY_LOCK_JDBC_CLUSTERNAME, DEFAULT_CLUSTERNAME);
         this.timeout = Integer.parseInt(props.getProperty(PROPERTY_LOCK_JDBC_TIMEOUT, DEFAULT_TIMEOUT));
-        
+
         this.statements = createStatements();
         
         init();
     }
-    
+
+    /**
+     * This method is called to create an instance of the Statements instance.
+     *
+     * @return an instance of a Statements object
+     */
     Statements createStatements() {
         Statements statements = new Statements();
         statements.setTableName(table);
@@ -92,14 +98,17 @@ public class DefaultJDBCLock implements Lock {
             createDatabase();
             createSchema();
         } catch (Exception e) {
-            LOG.severe("Error occured while attempting to obtain connection: " + e);
+            LOG.log(Level.SEVERE, "Error occured while attempting to obtain connection", e);
         }
     }
-    
+
     void createDatabase() {
         // do nothing in the default implementation
     }
 
+    /**
+     * This method is called to check and create the required schemas that are used by this instance.
+     */
     void createSchema() {
         if (schemaExists()) {
             return;
@@ -107,35 +116,59 @@ public class DefaultJDBCLock implements Lock {
         
         String[] createStatments = this.statements.getLockCreateSchemaStatements(getCurrentTimeMillis());
         Statement statement = null;
-        
+        Connection connection = null;
+
         try {
-            statement = getConnection().createStatement();
-            
+            connection = getConnection();
+            statement = connection.createStatement();
+
             for (String stmt : createStatments) {
+                LOG.info("Executing statement: " + stmt);
                 statement.execute(stmt);
             }
             
             getConnection().commit();
         } catch (Exception e) {
-            LOG.severe("Could not create schema: " + e );
+            LOG.log(Level.SEVERE, "Could not create schema", e);
+            try {
+                // Rollback transaction if and only if there was a failure...
+                if (connection != null)
+                    connection.rollback();
+            } catch (Exception ie) {
+                // Do nothing....
+            }
         } finally {
             closeSafely(statement);
         }
     }
 
+    /**
+     * This method is called to determine if the required database schemas have already been created or not.
+     *
+     * @return true, if the schemas are available else false.
+     */
     boolean schemaExists() {
+        return schemaExist(statements.getFullLockTableName());
+    }
+
+    /**
+     * This method is called to determine if the required table is available or not.
+     *
+     * @param tableName  The name of the table to determine if it exists
+     *
+     * @return true, if the table exists else false
+     */
+    boolean schemaExist(String tableName) {
         ResultSet rs = null;
         boolean schemaExists = false;
-        
         try {
-            rs = getConnection().getMetaData().getTables(null, null, statements.getFullLockTableName(), new String[] {"TABLE"});
+            rs = getConnection().getMetaData().getTables(null, null, tableName, new String[] {"TABLE"});
             schemaExists = rs.next();
         } catch (Exception ignore) {
-            LOG.severe("Error testing for db table: " + ignore);
+            LOG.log(Level.SEVERE, "Error testing for db table", ignore);
         } finally {
             closeSafely(rs);
         }
-        
         return schemaExists;
     }
 
@@ -163,8 +196,9 @@ public class DefaultJDBCLock implements Lock {
             preparedStatement.setQueryTimeout(timeout);
             lockAquired = preparedStatement.execute();
         } catch (Exception e) {
-            LOG.warning("Failed to acquire database lock: " + e);
-        }finally {
+            // Do we want to display this message everytime???
+            LOG.log(Level.WARNING, "Failed to acquire database lock", e);
+        } finally {
             closeSafely(preparedStatement);
         }
         
@@ -182,8 +216,8 @@ public class DefaultJDBCLock implements Lock {
             int rows = preparedStatement.executeUpdate();
             lockUpdated = (rows == 1);
         } catch (Exception e) {
-            LOG.warning("Failed to update database lock: " + e);
-        }finally {
+            LOG.log(Level.WARNING, "Failed to update database lock", e);
+        } finally {
             closeSafely(preparedStatement);
         }
         
@@ -199,12 +233,12 @@ public class DefaultJDBCLock implements Lock {
             try {
                 getConnection().rollback();
             } catch (SQLException e) {
-                LOG.severe("Exception while rollbacking the connection on release: " + e);
+                LOG.log(Level.SEVERE, "Exception while rollbacking the connection on release", e);
             } finally {
                 try {
                     getConnection().close();
                 } catch (SQLException ignored) {
-                    LOG.fine("Exception while closing connection on release: " + ignored);
+                    LOG.log(Level.FINE, "Exception while closing connection on release", ignored);
                 }
             }
         }
@@ -224,31 +258,56 @@ public class DefaultJDBCLock implements Lock {
 
         return updateLock();
     }
-    
+
+    /**
+     * This method is called to determine if this instance jdbc connection is
+     * still connected.
+     *
+     * @return true, if the connection is still connected else false
+     *
+     * @throws SQLException
+     */
     boolean isConnected() throws SQLException {
         return lockConnection != null && !lockConnection.isClosed();
     }
-    
+
+    /**
+     * This method is called to safely close a Statement.
+     *
+     * @param preparedStatement The statement to be closed
+     */
     void closeSafely(Statement preparedStatement) {
         if (preparedStatement != null) {
             try {
                 preparedStatement.close();
             } catch (SQLException e) {
-                LOG.severe("Failed to close statement: " + e);
+                LOG.log(Level.SEVERE, "Failed to close statement", e);
             }
         }
     }
-    
+
+    /**
+     * This method is called to safely close a ResultSet instance.
+     *
+     * @param rs The result set to be closed
+     */
     void closeSafely(ResultSet rs) {
         if (rs != null) {
             try {
                 rs.close();
             } catch (SQLException e) {
-                LOG.severe("Error occured while releasing ResultSet: " + e);
+                LOG.log(Level.SEVERE, "Error occured while releasing ResultSet", e);
             }
         }
     }
-    
+
+    /**
+     * This method will return an active connection for this given jdbc driver.
+     *
+     * @return jdbc Connection instance
+     *
+     * @throws Exception
+     */
     Connection getConnection() throws Exception {
         if (!isConnected()) {
             lockConnection = createConnection(driver, url, user, password);
@@ -276,7 +335,7 @@ public class DefaultJDBCLock implements Lock {
         try {
             return doCreateConnection(driver, url, username, password);
         } catch (Exception e) {
-            LOG.severe("Error occured while setting up JDBC connection: " + e);
+            LOG.log(Level.SEVERE, "Error occured while setting up JDBC connection", e);
             throw e; 
         }
     }

http://git-wip-us.apache.org/repos/asf/karaf/blob/411b9e42/main/src/main/java/org/apache/karaf/main/GenericJDBCLock.java
----------------------------------------------------------------------
diff --git a/main/src/main/java/org/apache/karaf/main/GenericJDBCLock.java b/main/src/main/java/org/apache/karaf/main/GenericJDBCLock.java
new file mode 100644
index 0000000..ee9d47a
--- /dev/null
+++ b/main/src/main/java/org/apache/karaf/main/GenericJDBCLock.java
@@ -0,0 +1,585 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.karaf.main;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * This classs is the base class used to provide a master/slave configuration for
+ * a given set of active karaf instances using JDBC. </p>
+ *
+ * This implementation uses two different tables.  A KARAF_NODE_ID, and KARAF_LOCK tables.  The
+ * KARAF_NODE_ID table is used to generate a unique id for each instance in the cluster.  While
+ * the KARAF_LOCK table is used to determine who is the master of these instances. </p>
+ *
+ * The tables configurations for the different tables are. </p>
+ *
+ * <pre>
+ *   CREATE TABLE KARAF_NODE_ID ( ID INTEGER DEFAULT 0 )
+ *   CREATE TABLE KARAF_LOCK ( ID INTEGER DEFAULT 0, STATE INTEGER DEFAULT 0, LOCK_DELAY INTEGER DEFAULT 0 )
+ * </pre>
+ *
+ * The two tables will include a single row each that is created by a single instance in the cluster. </p>
+ *
+ * The KARAF_NODE_ID table will be updated once for each active karaf instance with there unique id compared
+ * to the other instances within the cluster.  The single row will contain the next available unique id and
+ * will not include each clustered instance unique id since these instances can come and go throughout the
+ * system lifetime. </p>
+ *
+ * The KARAF_LOCK table will be used to determine which of the instances will become the master. The master
+ * will set the STATE to an initial value and the LOCK_DELAY to a time in milliseconds of when the
+ * table will be updated.  It is the responsibility of the master instance to update the STATE field by the
+ * allocated lock delay by incrementing the state value.  If the STATE value has not been updated by the 
+ * LOCK_DELAY time then a slave has permission to attempt to become the master. </p>
+ *
+ * While the overview does not describe exactly how this is implemented.  Here is a description of how this
+ * is done and what is provides as a fail safe solution. </p>
+ *
+ * Each instance of this class provides an initialization step, a lock, isAlive and release interface. </p>
+ *
+ * INITIALIZE:</p>
+ *
+ * During the initialization step it will determine if the given tables exist within the database.  We only 
+ * check for a single table since we assume that if one is available then the other must exist. We then
+ * add a row to each of the tables.  The added row to the KARAF_NODE_ID table will set the ID to zero since 
+ * this is consider a non-existent karaf instance.  The added row to the KARAF_LOCK will set the ID to zero
+ * which allows a karaf instances to acquire the lock and become the master instance. </p>
+ *
+ *
+ * LOCK:</p>
+ *
+ * The current instance will try to acquire the master lock by using the following sql statement. </p>
+ *
+ * <pre>
+ *   UPDATE KARAF_LOCK SET ID = unique_id, STATE = state, LOCK_DELAY = lock_delay 
+ *       WHERE ID = 0 OR ID = curId
+ * </pre>
+ *
+ * Now you must be asking why are we using this update statement? The reason is that the statement will
+ * guarantee that only one instance will be able to update this row.  The curId is set to this instance
+ * unique id or to the prior master unique id if the row was not updated within that master lock_delay. </p>
+ *
+ * The current update command will set the curId to this instance unique id.  If this fails then it will
+ * determine if the current master has not updated the row within its lock_delay.  If it hasn't updated
+ * the row within the allocated time then this instance can try to become the master. </p>
+ *
+ * The current slave instance will then try to steal the lock from the master instance.  Why are we trying
+ * to steal the lock from the master?  The reason is that it is possible that the master instance had a 
+ * hard failure and there is no mechanisms to determine if that is the case. We then assume that it has 
+ * crashed without releasing the lock gracefully.  The slave instance then used the following update statement. </p>
+ *
+ * <pre>
+ *   UPDATE KARAF_LOCK SET ID = unique_id, STATE = state, LOCK_DELAY = lock_delay 
+ *       WHERE ( ID = 0 OR ID = curId ) AND STATE = curState
+ * </pre>
+ *
+ * Now why are we using the state value as part of the where clause?  The reason that even though the row was 
+ * not updated by the allocated delay time.  It is possible that the update statement was performed just after 
+ * the current slave check.  This update will insure that the row will be updated if and only if the state was
+ * also not updated.  It is possible that the master instance updated the row after the current slave check 
+ * and we do not want the slave to update the row and make itself the master.  This will insure that that will
+ * not be the case. </p>
+ *
+ * ISALIVE: </p>
+ *
+ * This just checks if the connection is active and then just updates the row's STATE by using the lock
+ * update call mentioned above. </p>
+ *
+ * RELEASE: </p>
+ *
+ * The release process just updates the KARAF_LOCK ID to zero so that other instances will have a chance
+ * to become the master. </p>
+ *
+ * There are two main scenarios that we need to worry about.  Soft and Hard failures.  The soft failure 
+ * basically allows the master instance to release the master lock and allow other instances to become
+ * the master.  As for a hard failure, the current karaf instance crashes and does not release the lock
+ * then the other karaf instances will notice that the KARAF_LOCK has not been updated for the current
+ * master id and then they can compete for the master lock. </p>
+ *
+ * @author Claudio Corsi
+ *
+ */
+public class GenericJDBCLock implements Lock {
+
+    final Logger LOG = Logger.getLogger(this.getClass().getName());
+
+    public static final String PROPERTY_LOCK_URL               = "karaf.lock.jdbc.url";
+    public static final String PROPERTY_LOCK_JDBC_DRIVER       = "karaf.lock.jdbc.driver";
+    public static final String PROPERTY_LOCK_JDBC_USER         = "karaf.lock.jdbc.user";
+    public static final String PROPERTY_LOCK_JDBC_PASSWORD     = "karaf.lock.jdbc.password";
+    public static final String PROPERTY_LOCK_JDBC_TABLE        = "karaf.lock.jdbc.table";
+    public static final String PROPERTY_LOCK_JDBC_TABLE_ID     = "karaf.lock.jdbc.table_id";
+    public static final String PROPERTY_LOCK_JDBC_CLUSTERNAME  = "karaf.lock.jdbc.clustername";
+
+    public static final String DEFAULT_PASSWORD = "";
+    public static final String DEFAULT_USER = "";
+    public static final String DEFAULT_TABLE = "KARAF_LOCK";
+    public static final String DEFAULT_TABLE_ID = "KARAF_NODE_ID";
+    public static final String DEFAULT_CLUSTERNAME = "karaf";
+
+    final GenericStatements statements;
+    Connection lockConnection;
+    String url;
+    String driver;
+    String user;
+    String password;
+    String table;
+    String clusterName;
+    String table_id;
+    int lock_delay;
+
+    // My lock settings
+    private int uniqueId = 0;
+    private int state = 0;
+
+    // Current master instance lock settings
+    private int currentId = 0;
+    private int currentState = 0;
+
+    // The last clock time that the master instance state was updated as noticed by this instance.
+    private long currentStateTime;
+    // The master lock delay time in milliseconds that the master is expected to update the karaf_lock 
+    // table state
+    private int currentLockDelay;
+
+    public GenericJDBCLock(Properties props) {
+        try {
+            LOG.addHandler(BootstrapLogManager.getDefaultHandler());
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        this.url = props.getProperty(PROPERTY_LOCK_URL);
+        this.driver = props.getProperty(PROPERTY_LOCK_JDBC_DRIVER);
+        this.user = props.getProperty(PROPERTY_LOCK_JDBC_USER, DEFAULT_USER);
+        this.password = props.getProperty(PROPERTY_LOCK_JDBC_PASSWORD, DEFAULT_PASSWORD);
+        this.table = props.getProperty(PROPERTY_LOCK_JDBC_TABLE, DEFAULT_TABLE);
+        this.clusterName = props.getProperty(PROPERTY_LOCK_JDBC_CLUSTERNAME, DEFAULT_CLUSTERNAME);
+        this.table_id = props.getProperty(PROPERTY_LOCK_JDBC_TABLE_ID, DEFAULT_TABLE_ID);
+        this.lock_delay = Integer.parseInt(props.getProperty(Main.PROPERTY_LOCK_DELAY, Main.DEFAULT_LOCK_DELAY));
+
+        this.statements = createStatements();
+
+        init();
+    }
+    
+    /**
+     * This method is called to create an instance of the JDBCStatements instance.
+     *
+     * @return an instance of a JDBCStatement object
+     */
+    GenericStatements createStatements() {
+        GenericStatements statements = new GenericStatements(table, table_id, clusterName);
+        return statements;
+    }
+    
+    void init() {
+        try {
+            createDatabase();
+            createSchema();
+            generateUniqueId();
+        } catch (Exception e) {
+            LOG.log(Level.SEVERE, "Error occured while attempting to obtain connection", e);
+        }
+    }
+    
+    void createDatabase() {
+        // do nothing in the default implementation
+    }
+
+    /**
+     * This method is called to check and create the required schemas that are used by this instance.
+     */
+    void createSchema() {
+        if (schemaExists()) {
+            return;
+        }
+
+        String[] createStatments = this.statements.getLockCreateSchemaStatements(System.currentTimeMillis());
+        Statement statement = null;
+        Connection connection = null;
+
+        try {
+            connection = getConnection();
+            statement = connection.createStatement();
+            connection.setAutoCommit(false);
+
+            for (String stmt : createStatments) {
+                LOG.info("Executing statement: " + stmt);
+                statement.execute(stmt);
+            }
+
+            connection.commit();
+        } catch (Exception e) {
+            LOG.log(Level.SEVERE, "Could not create schema", e );
+            try {
+                // Rollback transaction if and only if there was a failure...
+                if (connection != null)
+                    connection.rollback();
+            } catch (Exception ie) {
+                // Do nothing....
+            }
+        } finally {
+            closeSafely(statement);
+            try {
+                // Reset the auto commit to true
+                connection.setAutoCommit(true);
+            } catch (SQLException ignored) {
+                LOG.log(Level.FINE, "Exception while setting the connection auto commit", ignored);
+            }
+        }
+    }
+
+    /**
+     * This method is called to determine if the required database schemas have already been created or not.
+     *
+     * @return true, if the schemas are available else false.
+     */
+    boolean schemaExists() {
+        return schemaExist(this.statements.getLockTableName())
+                && schemaExist(this.statements.getLockIdTableName());
+    }
+
+    /**
+     * This method is called to determine if the required table is available or not.
+     *
+     * @param tableName  The name of the table to determine if it exists
+     *
+     * @return true, if the table exists else false
+     */
+    private boolean schemaExist(String tableName) {
+        ResultSet rs = null;
+        boolean schemaExists = false;
+        try {
+            rs = getConnection().getMetaData().getTables(null, null, tableName, new String[] {"TABLE"});
+            schemaExists = rs.next();
+        } catch (Exception ignore) {
+            LOG.log(Level.SEVERE, "Error testing for db table", ignore);
+        } finally {
+            closeSafely(rs);
+        }
+        return schemaExists;
+    }
+
+    /**
+     * This method will generate a unique id for this instance that is part of an active set of instances.
+     * This method uses a simple algorithm to insure that the id will be unique for all cases.
+     */
+    void generateUniqueId() {
+        boolean uniqueIdSet = false;
+        String selectString = this.statements.getLockIdSelectStatement();
+        PreparedStatement selectStatement = null, updateStatement = null;
+        try {
+            selectStatement = getConnection().prepareStatement(selectString);
+
+            // This loop can only be performed for so long and the chances that this will be 
+            // looping for more than a few times is unlikely since there will always be at
+            // least one instance that is successful.
+            while (!uniqueIdSet) {
+
+                ResultSet rs = null;
+
+                try {
+                    // Get the current ID from the karaf ids table
+                    rs = selectStatement.executeQuery();
+
+                    // Check if we were able to retrieve the result...
+                    if (rs.next()) {
+                        // Update the row with the next available id
+                        int currentId = this.statements.getIdFromLockIdSelectStatement(rs);
+                        
+						String updateString = this.statements.getLockIdUpdateIdStatement(currentId + 1, currentId);
+
+						updateStatement = getConnection().prepareStatement(updateString);
+                        
+                        int count = updateStatement.executeUpdate();
+                        
+                        // Set the uniqueId if and only if is it greater that zero
+                        uniqueId = ( uniqueIdSet = count > 0 ) ? currentId + 1 : 0;
+                        
+                        if (count > 1) {
+                            LOG.severe("OOPS there are more than one row within the table ids...");
+                        }
+                    } else {
+                        LOG.severe("No rows were found....");
+                    }
+                } catch (SQLException e) {
+                    LOG.log(Level.SEVERE, "Received an SQL exception while processing result set", e);
+                } finally {
+                    this.closeSafely(rs);
+                }
+            }
+        } catch (SQLException e) {
+            LOG.log(Level.SEVERE, "Received an SQL exception while generating a prepate statement", e);
+        } catch (Exception e) {
+            LOG.log(Level.SEVERE, "Received an exception while trying to get a reference to a connection", e);
+        } finally {
+            closeSafely(selectStatement);
+        }
+        LOG.info("INSTANCE unique id: " + uniqueId);
+    }
+    
+    /**
+     * This method is called to determine if this instance jdbc connection is
+     * still connected.
+     *
+     * @return true, if the connection is still connected else false
+     *
+     * @throws SQLException
+     */
+    boolean isConnected() throws SQLException {
+        return lockConnection != null && !lockConnection.isClosed();
+    }
+
+    /**
+     * This method is called to safely close a Statement.
+     *
+     * @param preparedStatement The statement to be closed
+     */
+    void closeSafely(Statement preparedStatement) {
+        if (preparedStatement != null) {
+            try {
+                preparedStatement.close();
+            } catch (SQLException e) {
+                LOG.log(Level.SEVERE, "Failed to close statement", e);
+            }
+        }
+    }
+
+    /**
+     * This method is called to safely close a ResultSet instance.
+     *
+     * @param rs The result set to be closed
+     */
+    void closeSafely(ResultSet rs) {
+        if (rs != null) {
+            try {
+                rs.close();
+            } catch (SQLException e) {
+                LOG.log(Level.SEVERE, "Error occured while releasing ResultSet", e);
+            }
+        }
+    }
+
+    /**
+     * This method will return an active connection for this given jdbc driver.
+     *
+     * @return jdbc Connection instance
+     *
+     * @throws Exception
+     */
+    protected Connection getConnection() throws Exception {
+        if (!isConnected()) {
+            lockConnection = createConnection(driver, url, user, password);
+        }
+
+        return lockConnection;
+    }
+
+    /**
+     * Create a new jdbc connection.
+     *
+     * @param driver The fully qualified driver class name
+     * @param url  The database connection url
+     * @param username The username for the database
+     * @param password  The password for the data
+     * @return a new jdbc connection
+     * @throws Exception
+     */
+    protected Connection createConnection(String driver, String url, String username, String password) throws Exception {
+        if (url.toLowerCase().startsWith("jdbc:derby")) {
+            url = (url.toLowerCase().contains("create=true")) ? url : url + ";create=true";
+        }
+
+        try {
+            return doCreateConnection(driver, url, username, password);
+        } catch (Exception e) {
+            LOG.log(Level.SEVERE, "Error occured while setting up JDBC connection", e);
+            throw e;
+        }
+    }
+
+    /**
+     * This method could be used to inject a mock jdbc connection for testing purposes.
+     *
+     * @param driver
+     * @param url
+     * @param username
+     * @param password
+     * @return
+     * @throws ClassNotFoundException
+     * @throws SQLException
+     */
+    protected Connection doCreateConnection(String driver, String url, String username, String password) throws ClassNotFoundException, SQLException {
+        Class.forName(driver);
+        return DriverManager.getConnection(url, username, password);
+    }
+
+    /**
+     * This method is called whenever we want to acquire/steal or update the lock.
+     * The different option depend if we are the competing for the master or the lock delay time
+     * has been exceeded or that we are the master and are update the state.
+     *
+     * @return true, if we are the master instance else false
+     *
+     * @see org.apache.karaf.main.Lock#lock()
+     */
+    public boolean lock() throws Exception {
+       
+        // Try to acquire/update the lock state
+        boolean lockAquired = acquireLock(statements.getLockUpdateIdStatement(uniqueId, ++state, lock_delay, uniqueId));
+        
+        if (!lockAquired) {
+            
+            String lockSelectStatement = statements.getLockSelectStatement();
+            PreparedStatement statement = null;
+            ResultSet rs = null;
+            
+            try {
+                statement = getConnection().prepareStatement(lockSelectStatement);
+                // Get the current master id and compare with information that we have locally....
+                rs = statement.executeQuery();
+                
+                if (rs.next()) {
+                    int currentId = statements.getIdFromLockSelectStatement(rs); // The current master unique id or 0
+                    int currentState = statements.getStateFromLockSelectStatement(rs); // The current master state or whatever
+                    
+                    if (this.currentId == currentId) {
+                        // It is the same instance that locked the table
+                        if (this.currentState == currentState) {
+                            // Its state has not been updated....
+                            if ( (this.currentStateTime + this.currentLockDelay + this.currentLockDelay) < System.currentTimeMillis() ) {
+                                // The state was not been updated for more than twice the lock_delay value of the current master...
+                                // Try to steal the lock....
+                                lockAquired = acquireLock(statements.getLockUpdateIdStatementToStealLock(uniqueId, state, lock_delay, currentId, currentState));
+                            }
+                        } else {
+                            // Set the current time to be used to determine if we can 
+                            // try to steal the lock later...
+                            this.currentStateTime = System.currentTimeMillis();
+                            this.currentState = currentState;
+                        }
+                    } else {
+                        // This is a different currentId that is being used...
+                        // at this time, it does not matter if the new master id is zero we can try to acquire it
+                        // during the next lock call...
+                        this.currentId = currentId;
+                        this.currentState = currentState;
+                        // Update the current state time since this is a new lock service...
+                        this.currentStateTime = System.currentTimeMillis();
+                        // Get the lock delay value which is specific to the current master...
+                        this.currentLockDelay = statements.getLockDelayFromLockSelectStatement(rs);
+                    }
+                }
+            } catch( Exception e ) {
+                LOG.log(Level.SEVERE, "Unable to determine if the lock was obtain", e);
+            } finally {
+                closeSafely(statement);
+                closeSafely(rs);
+            }
+        }
+        
+        return lockAquired;
+    }
+
+    /**
+     * This method is called to try and acquire the lock and/or update the state for when this instance
+     * is already the master instance.  It will try to update the row given the passed data and will
+     * succeed if and only if the generated where clause was valid else it would not update the row.
+     *
+     * @param lockUpdateIdStatement The sql statement used to execute the update
+     *
+     * @return true, if the row was updated else false
+     */
+    private boolean acquireLock(String lockUpdateIdStatement) {
+        PreparedStatement preparedStatement = null;
+        boolean lockAquired = false;
+        
+        try {
+            preparedStatement = getConnection().prepareStatement(lockUpdateIdStatement);
+            // This will only update the row that contains the ID of 0 or curId
+            lockAquired = preparedStatement.executeUpdate() > 0;
+        } catch (Exception e) {
+            // Do we want to display this message everytime???
+            LOG.log(Level.WARNING, "Failed to acquire database lock", e);
+        } finally {
+            closeSafely(preparedStatement);
+        }
+        
+        return lockAquired;
+    }
+
+    /**
+     * This method will release the lock that the current master has by setting the karaf_lock table
+     * id to 0.  This tells the others that the master has relinquished the lock and someone else can
+     * try to acquire that lock and become a master.
+     *
+     * @see org.apache.karaf.main.Lock#release()
+     */
+    public void release() throws Exception {
+        if (isConnected()) {
+            String lockResetIdStatement = statements.getLockResetIdStatement(uniqueId);
+            PreparedStatement preparedStatement = null;
+            
+            try {
+                preparedStatement = getConnection().prepareStatement(lockResetIdStatement);
+                // This statement will set the ID to 0 and allow others to steal the lock...
+                preparedStatement.executeUpdate();
+            } catch (SQLException e) {
+                LOG.log(Level.SEVERE, "Exception while rollbacking the connection on release", e);
+            } finally {
+                closeSafely(preparedStatement);
+                try {
+                    getConnection().close();
+                } catch (SQLException ignored) {
+                    LOG.log(Level.FINE, "Exception while closing connection on release", ignored);
+                }
+            }
+        }
+        
+        lockConnection = null;
+    }
+
+    /**
+     * This method will check if the jdbc connection is still active and if we were able to 
+     * acquire or update the karaf_table information.
+     *
+     * @return true, if the connection is still active and we still have the lock
+     *
+     * @see org.apache.karaf.main.Lock#isAlive()
+     *
+     */
+    public boolean isAlive() throws Exception {
+        if (!isConnected()) { 
+            LOG.severe("Lost lock!");
+            return false; 
+        }
+
+        return lock();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/karaf/blob/411b9e42/main/src/main/java/org/apache/karaf/main/GenericStatements.java
----------------------------------------------------------------------
diff --git a/main/src/main/java/org/apache/karaf/main/GenericStatements.java b/main/src/main/java/org/apache/karaf/main/GenericStatements.java
new file mode 100644
index 0000000..1dd2f08
--- /dev/null
+++ b/main/src/main/java/org/apache/karaf/main/GenericStatements.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.main;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * This class is used to create the sql statements for the karaf lock tables that are used
+ * for clustering of karaf instances.
+ * 
+ * It will generate sql statement to create two separate tables, a lock table and a lock id table
+ * 
+ *   CREATE TABLE LOCK ( ID INTEGER DEFAULT 0, STATE INTEGER DEFAULT 0, LOCK_DELAY INTEGER DEFAULT 0 )
+ *   
+ *   CREATE TABLE LOCK_ID ( ID INTEGER DEFAULT 0 )
+ *   
+ * @author Claudio Corsi
+ * 
+ */
+public class GenericStatements {
+
+	private final String lockTableName;
+	private final String lockIdTableName;
+	private final String clusterName;
+
+	/**
+	 * This constructor is used to determine the name of the karaf lock table, the karaf lock id
+	 * table and the name of the clustered instances.
+	 *
+	 * @param lockTableName The name of the karaf lock table
+	 * @param lockIdTableName The name of the karaf lock id table
+	 * @param clusterName the name of the cluster being used
+	 */
+	public GenericStatements(String lockTableName, String lockIdTableName, String clusterName) {
+		this.lockTableName   = lockTableName;
+		this.lockIdTableName = lockIdTableName;
+		this.clusterName     = clusterName;
+	}
+
+	/**
+	 * This method will return the name of the cluster that the instances are using to compete for the
+	 * master lock.
+	 *
+	 * @return cluster node name
+	 */
+	public final String getNodeName() {
+		return this.clusterName;
+	}
+
+	/**
+	 * This method will return the name of the karaf lock table.
+	 *
+	 * @return name of the karaf lock table
+	 */
+	public final String getLockTableName() {
+		return lockTableName;
+	}
+
+	/**
+	 * This method will return the insert statement used to create a row in the Lock table and will
+	 * generate the following sql statement.
+	 *
+	 * INSERT INTO KARAF_LOCK (ID, STATE, LOCK_DELAY) VALUES (0, 0, 0)
+	 * 
+	 * @return sql insert statement
+	 */
+	private String getLockTableInitialInsertStatement() {
+		return "INSERT INTO " + this.getLockTableName() + "(ID, STATE, LOCK_DELAY) VALUES (0, 0, 0)";
+	}
+
+	/**
+	 * This will be called when trying to acquire the lock and will generate the following sql statemnt.
+	 *
+	 *  UPDATE KARAF_LOCK SET ID = ?, STATE = ?, LOCK_DELAY = ? WHERE ID = 0 OR ID = ?
+	 * 
+	 * You are then expected to assign the values associated with the sql statement.
+	 *
+	 * @return sql update statement
+	 */
+	public String getLockUpdateIdStatement(int id, int state, int lock_delay, int curId) {
+		return String.format("UPDATE %s SET ID = %d, STATE = %d, LOCK_DELAY = %d WHERE ID = 0 OR ID = %d", 
+							 this.getLockTableName(), id, state, lock_delay, curId);
+	}
+
+	/**
+	 * This will be called when trying to steal the lock and will generate the following sql statemnt.
+	 *
+	 *  UPDATE KARAF_LOCK SET ID = ?, STATE = ?, LOCK_DELAY = ? WHERE ( ID = 0 OR ID = ? ) AND STATE = ?
+	 *
+	 * You are then responsible to assign the values of the different fields using standard jdbc statement
+	 * calls.
+	 * 
+	 * @return sql update statement
+	 */
+	public String getLockUpdateIdStatementToStealLock(int id, int state, int lock_delay, int curId, int curState) {
+		return String.format("UPDATE %s SET ID = %d, STATE = %d, LOCK_DELAY = %d WHERE ( ID = 0 OR ID = %d ) AND STATE = %d", 
+							 this.getLockTableName(), id, state, lock_delay, curId, curState) ;
+	}
+
+	/**
+	 * This method is called only when we are releasing the lock and will generate the following sql
+	 * statement.
+	 *
+	 *  UPDATE KARAF_LOCK SET ID = 0 WHERE ID = ?
+	 * 
+	 * @return sql update statement
+	 */
+	public String getLockResetIdStatement(int id) {
+		return String.format("UPDATE %s SET ID = 0 WHERE ID = %d", this.getLockTableName(), id);
+	}
+
+	/**
+	 * This will be called to determine the current master instance for the lock table and will 
+	 * generate the following sql statement.
+	 *
+	 * SELECT ID, STATE, LOCK_DELAY FROM KARAF_LOCK
+	 *
+	 * @return sql select statement
+	 */
+	public String getLockSelectStatement() {
+		return "SELECT ID, STATE, LOCK_DELAY FROM " + this.getLockTableName();
+	}
+
+	public int getIdFromLockSelectStatement(ResultSet rs) throws SQLException {
+		return rs.getInt(1);
+	}
+
+	public int getStateFromLockSelectStatement(ResultSet rs) throws SQLException {
+		return rs.getInt(2);
+	}
+
+	public int getLockDelayFromLockSelectStatement(ResultSet rs) throws SQLException {
+		return rs.getInt(3);
+	}
+
+	/**
+	 * This method should only be called during the creation of the KARAF_LOCK table and will
+	 * generate the following sql statement.
+	 *
+	 * CREATE TABLE KARAF_LOCK (ID INTEGER DEFAULT 0, STATE INTEGER DEFAULT 0, LOCK_DELAY INTEGER DEFAULT 0)
+	 * 
+	 * @return sql create table statement
+	 */
+	private String getLockTableCreateStatement() {
+		return "CREATE TABLE " + this.getLockTableName() 
+			   + " ( ID INTEGER DEFAULT 0, STATE INTEGER DEFAULT 0 , LOCK_DELAY INTEGER DEFAULT 0 )";
+	}
+
+	
+	//  ==================  LOCK ID TABLE ========================
+
+	/**
+	 * This method will generate the create table sql statement to create the karaf id table and will
+	 * generate the following sql statement.
+	 *
+	 * CREATE TABLE KARAF_ID ( ID INTEGER DEFAULT 0 )
+	 *
+	 * @return sql create table statement
+	 */
+	private String getLockIdTableCreateStatement() {
+		return "CREATE TABLE " + this.getLockIdTableName() 
+			    + " ( ID INTEGER DEFAULT 0 )";
+	}
+	
+	/**
+	 * This method will return the sql statement to retreive the id of the lock id table and will
+	 * generate the following sql statement.
+	 *
+	 * SELECT ID FROM KARAF_ID
+	 *
+	 * @return sql select statement
+	 */
+	public String getLockIdSelectStatement() {
+		return "SELECT ID FROM " + this.getLockIdTableName();
+	}
+
+	public int getIdFromLockIdSelectStatement(ResultSet rs) throws SQLException {
+		return rs.getInt(1);
+	}
+
+	/**
+	 * This method will return the update statement for the lock id table and will generate the
+	 * following sql statement.
+	 *
+	 * UPDATE KARAF_ID SET ID = ? WHERE ID = ?
+	 *
+	 * @return sql update statement
+	 */
+	public String getLockIdUpdateIdStatement(int id, int curId) {
+		return String.format("UPDATE %s SET ID = %d WHERE ID = %d", this.getLockIdTableName(), id, curId);
+	}
+	
+	/**
+	 * This method will return the name of the karaf lock id table.
+	 *
+	 * @return name of the karaf lock id table
+	 */
+	public final String getLockIdTableName() {
+		return lockIdTableName;
+	}
+
+	/**
+	 * This method will return the required sql statements to initialize the lock database.
+	 *
+	 * @return array of sql statements
+	 */
+	public String[] getLockCreateSchemaStatements(long moment) {
+		return new String[] {
+			getLockTableCreateStatement(),
+			getLockIdTableCreateStatement(),
+			getLockTableInitialInsertStatement(),
+			getLockIdTableInitialInsertStatement(),
+		};
+	}
+
+	/**
+	 * This method will return the insert statement to insert a row in the lock id table and will
+	 * generate the following sql statement.
+	 *
+	 * INSERT INTO KARAF_ID (ID) VALUES (0)
+	 *
+	 * @return sql insert statement
+	 */
+	private String getLockIdTableInitialInsertStatement() {
+		return "INSERT INTO " + this.getLockIdTableName() + "(ID) VALUES (0)";
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/karaf/blob/411b9e42/main/src/main/java/org/apache/karaf/main/Main.java
----------------------------------------------------------------------
diff --git a/main/src/main/java/org/apache/karaf/main/Main.java b/main/src/main/java/org/apache/karaf/main/Main.java
index 2f1bff1..16a7d22 100644
--- a/main/src/main/java/org/apache/karaf/main/Main.java
+++ b/main/src/main/java/org/apache/karaf/main/Main.java
@@ -227,6 +227,8 @@ public class Main {
 
 	public static final String OVERRIDE_PREFIX = "karaf.override.";
 
+    public static final String DEFAULT_LOCK_DELAY = "1000";
+
     Logger LOG = Logger.getLogger(this.getClass().getName());
 
     private File karafHome;
@@ -241,7 +243,7 @@ public class Main {
     private Lock lock;
     private int defaultStartLevel = 100;
     private int lockStartLevel = 1;
-    private int lockDelay = 1000;
+    private int lockDelay = Integer.parseInt( DEFAULT_LOCK_DELAY );
     private int shutdownTimeout = 5 * 60 * 1000;
     private boolean exiting = false;
     private ShutdownCallback shutdownCallback;
@@ -333,7 +335,7 @@ public class Main {
         defaultStartLevel = Integer.parseInt(configProps.getProperty(Constants.FRAMEWORK_BEGINNING_STARTLEVEL));
         System.setProperty(Constants.FRAMEWORK_BEGINNING_STARTLEVEL, Integer.toString(this.defaultStartLevel));
         lockStartLevel = Integer.parseInt(configProps.getProperty(PROPERTY_LOCK_LEVEL, Integer.toString(lockStartLevel)));
-        lockDelay = Integer.parseInt(configProps.getProperty(PROPERTY_LOCK_DELAY, Integer.toString(lockDelay)));
+        lockDelay = Integer.parseInt(configProps.getProperty(PROPERTY_LOCK_DELAY, DEFAULT_LOCK_DELAY));
         configProps.setProperty(Constants.FRAMEWORK_BEGINNING_STARTLEVEL, Integer.toString(lockStartLevel));
         shutdownTimeout = Integer.parseInt(configProps.getProperty(KARAF_SHUTDOWN_TIMEOUT, Integer.toString(shutdownTimeout)));
         // Start up the OSGI framework


[2/3] git commit: Make sure the console thread for the non blocking input stream is correctly shut down

Posted by gn...@apache.org.
Make sure the console thread for the non blocking input stream is correctly shut down


Project: http://git-wip-us.apache.org/repos/asf/karaf/repo
Commit: http://git-wip-us.apache.org/repos/asf/karaf/commit/a32bab6f
Tree: http://git-wip-us.apache.org/repos/asf/karaf/tree/a32bab6f
Diff: http://git-wip-us.apache.org/repos/asf/karaf/diff/a32bab6f

Branch: refs/heads/karaf-2.x
Commit: a32bab6f51e047c00e98c5b3c506b0fcdb5080a8
Parents: 2978d50
Author: Guillaume Nodet <gn...@gmail.com>
Authored: Thu Nov 29 09:36:27 2012 +0100
Committer: Guillaume Nodet <gn...@gmail.com>
Committed: Wed Dec 18 10:07:40 2013 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/karaf/shell/console/jline/Console.java    | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/karaf/blob/a32bab6f/shell/console/src/main/java/org/apache/karaf/shell/console/jline/Console.java
----------------------------------------------------------------------
diff --git a/shell/console/src/main/java/org/apache/karaf/shell/console/jline/Console.java b/shell/console/src/main/java/org/apache/karaf/shell/console/jline/Console.java
index 4930d99..94f6d6d 100644
--- a/shell/console/src/main/java/org/apache/karaf/shell/console/jline/Console.java
+++ b/shell/console/src/main/java/org/apache/karaf/shell/console/jline/Console.java
@@ -161,6 +161,8 @@ public class Console implements Runnable
         running = false;
         CommandSessionHolder.unset();
         pipe.interrupt();
+        thread.interrupt();
+        reader.shutdown();
         if (closedByUser && closeCallback != null) {
             closeCallback.run();
         }


[3/3] git commit: provide better error messages in a couple cases

Posted by gn...@apache.org.
provide better error messages in a couple cases

git-svn-id: https://svn.apache.org/repos/asf/karaf/trunk@1102131 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/karaf/repo
Commit: http://git-wip-us.apache.org/repos/asf/karaf/commit/2978d509
Tree: http://git-wip-us.apache.org/repos/asf/karaf/tree/2978d509
Diff: http://git-wip-us.apache.org/repos/asf/karaf/diff/2978d509

Branch: refs/heads/karaf-2.x
Commit: 2978d509498e68d094065bb99d3b42b89402b833
Parents: 86b531b
Author: David Jencks <dj...@apache.org>
Authored: Wed May 11 23:09:23 2011 +0000
Committer: Guillaume Nodet <gn...@gmail.com>
Committed: Wed Dec 18 10:07:40 2013 +0100

----------------------------------------------------------------------
 .../karaf/features/internal/FeatureValidationUtil.java       | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/karaf/blob/2978d509/features/core/src/main/java/org/apache/karaf/features/internal/FeatureValidationUtil.java
----------------------------------------------------------------------
diff --git a/features/core/src/main/java/org/apache/karaf/features/internal/FeatureValidationUtil.java b/features/core/src/main/java/org/apache/karaf/features/internal/FeatureValidationUtil.java
index 7e21690..dda1a87 100644
--- a/features/core/src/main/java/org/apache/karaf/features/internal/FeatureValidationUtil.java
+++ b/features/core/src/main/java/org/apache/karaf/features/internal/FeatureValidationUtil.java
@@ -15,6 +15,7 @@
  */
 package org.apache.karaf.features.internal;
 
+import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.net.URLConnection;
@@ -41,7 +42,12 @@ public class FeatureValidationUtil {
      * @throws Exception When validation fails.
      */
     public static void validate(URI uri) throws Exception {
-        URLConnection conn = uri.toURL().openConnection();
+        URLConnection conn = null;
+        try {
+            conn = uri.toURL().openConnection();
+        } catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException("invalid URI: " + uri, e);
+        }
         conn.setDefaultUseCaches(false);
 
         InputStream stream = conn.getInputStream();