You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by da...@apache.org on 2013/10/23 21:43:46 UTC

[29/47] New Transaction API

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f62e28c1/framework/db/src/com/cloud/utils/db/TransactionLegacy.java
----------------------------------------------------------------------
diff --git a/framework/db/src/com/cloud/utils/db/TransactionLegacy.java b/framework/db/src/com/cloud/utils/db/TransactionLegacy.java
new file mode 100755
index 0000000..b191491
--- /dev/null
+++ b/framework/db/src/com/cloud/utils/db/TransactionLegacy.java
@@ -0,0 +1,1174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.utils.db;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.sql.DataSource;
+
+import org.apache.commons.dbcp.ConnectionFactory;
+import org.apache.commons.dbcp.DriverManagerConnectionFactory;
+import org.apache.commons.dbcp.PoolableConnectionFactory;
+import org.apache.commons.dbcp.PoolingDataSource;
+import org.apache.commons.pool.KeyedObjectPoolFactory;
+import org.apache.commons.pool.impl.GenericObjectPool;
+import org.apache.commons.pool.impl.StackKeyedObjectPoolFactory;
+import org.apache.log4j.Logger;
+import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
+import org.jasypt.properties.EncryptableProperties;
+
+import com.cloud.utils.Pair;
+import com.cloud.utils.PropertiesUtil;
+import com.cloud.utils.crypt.EncryptionSecretKeyChecker;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.utils.mgmt.JmxUtil;
+
+/**
+ * Transaction abstracts away the Connection object in JDBC.  It allows the
+ * following things that the Connection object does not.
+ * 
+ *   1. Transaction can be started at an entry point and whether the DB
+ *      actions should be auto-commit or not determined at that point.
+ *   2. DB Connection is allocated only when it is needed.
+ *   3. Code does not need to know if a transaction has been started or not.
+ *      It just starts/ends a transaction and we resolve it correctly with
+ *      the previous actions.
+ *
+ * Note that this class is not synchronous but it doesn't need to be because
+ * it is stored with TLS and is one per thread.  Use appropriately.
+ */
+public class TransactionLegacy {
+    private static final Logger s_logger = Logger.getLogger(Transaction.class.getName() + "." + "Transaction");
+    private static final Logger s_stmtLogger = Logger.getLogger(Transaction.class.getName() + "." + "Statement");
+    private static final Logger s_lockLogger = Logger.getLogger(Transaction.class.getName() + "." + "Lock");
+    private static final Logger s_connLogger = Logger.getLogger(Transaction.class.getName() + "." + "Connection");
+
+    private static final ThreadLocal<TransactionLegacy> tls = new ThreadLocal<TransactionLegacy>();
+    private static final String START_TXN = "start_txn";
+    private static final String CURRENT_TXN = "current_txn";
+    private static final String CREATE_TXN = "create_txn";
+    private static final String CREATE_CONN = "create_conn";
+    private static final String STATEMENT = "statement";
+    private static final String ATTACHMENT = "attachment";
+
+    public static final short CLOUD_DB = 0;
+    public static final short USAGE_DB = 1;
+    public static final short AWSAPI_DB = 2;
+    public static final short SIMULATOR_DB = 3;
+
+    public static final short CONNECTED_DB = -1;
+
+    private static AtomicLong s_id = new AtomicLong();
+    private static final TransactionMBeanImpl s_mbean = new TransactionMBeanImpl();
+    static {
+        try {
+            JmxUtil.registerMBean("Transaction", "Transaction", s_mbean);
+        } catch (Exception e) {
+            s_logger.error("Unable to register mbean for transaction", e);
+        }
+        
+        /* FIXME: We need a better solution for this
+         * Initialize encryption if we need it for db.properties
+         */ 
+        EncryptionSecretKeyChecker enc = new EncryptionSecretKeyChecker();
+        enc.check();  
+    }
+
+    private final LinkedList<StackElement> _stack;
+    private long _id;
+
+    private final LinkedList<Pair<String, Long>> _lockTimes = new LinkedList<Pair<String, Long>>();
+
+    private String _name;
+    private Connection _conn;
+    private boolean _txn;
+    private short _dbId;
+    private long _txnTime;
+    private Statement _stmt;
+    private String _creator;
+
+    private TransactionLegacy _prev = null;
+
+    public static TransactionLegacy currentTxn() {
+        TransactionLegacy txn = tls.get();
+        assert txn != null : "No Transaction on stack.  Did you mark the method with @DB?";
+
+        assert checkAnnotation(3, txn) : "Did you even read the guide to use Transaction...IOW...other people's code? Try method can't be private.  What about @DB? hmmm... could that be it? " + txn;
+        return txn;
+    }
+
+    public static TransactionLegacy open(final short databaseId) {
+        String name = buildName();
+        if (name == null) {
+            name = CURRENT_TXN;
+        }
+        return open(name, databaseId, true);
+    }
+
+    //
+    // Usage of this transaction setup should be limited, it will always open a new transaction context regardless of whether or not there is other
+    // transaction context in the stack. It is used in special use cases that we want to control DB connection explicitly and in the mean time utilize
+    // the existing DAO features
+    //
+    public void transitToUserManagedConnection(Connection conn) {
+        assert(_conn == null /*&& _stack.size() <= 1*/) : "Can't change to a user managed connection unless the stack is empty and the db connection is null, you may have forgotten to invoke transitToAutoManagedConnection to close out the DB connection: " + toString();
+        _conn = conn;
+        _dbId = CONNECTED_DB;
+    }
+
+    public void transitToAutoManagedConnection(short dbId) {
+        // assert(_stack.size() <= 1) : "Can't change to auto managed connection unless your stack is empty";
+        _dbId = dbId;
+        _conn = null;
+    }
+
+    public static TransactionLegacy open(final String name) {
+        return open(name, TransactionLegacy.CLOUD_DB, false);
+    }
+
+    public static TransactionLegacy open(final String name, final short databaseId, final boolean forceDbChange) {
+        TransactionLegacy txn = tls.get();
+        boolean isNew = false;
+        if (txn == null) {
+            if (s_logger.isTraceEnabled()) {
+                s_logger.trace("Creating the transaction: " + name);
+            }
+            txn = new TransactionLegacy(name, false, databaseId);
+            tls.set(txn);
+            isNew = true;
+        } else if (forceDbChange) {
+            final short currentDbId = txn.getDatabaseId();
+            if (currentDbId != databaseId) {
+                // we need to end the current transaction and switch databases
+                txn.close(txn.getName());
+
+                txn = new TransactionLegacy(name, false, databaseId);
+                tls.set(txn);
+                isNew = true;
+            }
+        }
+
+        txn.takeOver(name, false);
+        if (isNew) {
+            s_mbean.addTransaction(txn);
+        }
+        return txn;
+    }
+
+    protected StackElement peekInStack(Object obj) {
+        final Iterator<StackElement> it = _stack.iterator();
+        while (it.hasNext()) {
+            StackElement next = it.next();
+            if (next.type == obj) {
+                return next;
+            }
+        }
+        return null;
+    }
+
+    public void registerLock(String sql) {
+        if (_txn && s_lockLogger.isDebugEnabled()) {
+            Pair<String, Long> time = new Pair<String, Long>(sql, System.currentTimeMillis());
+            _lockTimes.add(time);
+        }
+    }
+
+    public boolean dbTxnStarted() {
+        return _txn;
+    }
+
+    public static Connection getStandaloneConnectionWithException() throws SQLException {
+        Connection conn = s_ds.getConnection();
+        if (s_connLogger.isTraceEnabled()) {
+            s_connLogger.trace("Retrieving a standalone connection: dbconn" + System.identityHashCode(conn));
+        }
+        return conn;
+    }
+
+    public static Connection getStandaloneConnection() {
+        try {
+            return getStandaloneConnectionWithException();
+        } catch (SQLException e) {
+            s_logger.error("Unexpected exception: ", e);
+            return null;
+        }
+    }
+
+    public static Connection getStandaloneUsageConnection() {
+        try {
+            Connection conn = s_usageDS.getConnection();
+            if (s_connLogger.isTraceEnabled()) {
+                s_connLogger.trace("Retrieving a standalone connection for usage: dbconn" + System.identityHashCode(conn));
+            }
+            return conn;
+        } catch (SQLException e) {
+            s_logger.warn("Unexpected exception: ", e);
+            return null;
+        }
+    }
+
+    public static Connection getStandaloneAwsapiConnection() {
+        try {
+            Connection conn = s_awsapiDS.getConnection();
+            if (s_connLogger.isTraceEnabled()) {
+                s_connLogger.trace("Retrieving a standalone connection for usage: dbconn" + System.identityHashCode(conn));
+            }
+            return conn;
+        } catch (SQLException e) {
+            s_logger.warn("Unexpected exception: ", e);
+            return null;
+        }
+    }
+    
+    public static Connection getStandaloneSimulatorConnection() {
+    	try {
+    		Connection conn = s_simulatorDS.getConnection();
+    		if (s_connLogger.isTraceEnabled()) {
+                s_connLogger.trace("Retrieving a standalone connection for simulator: dbconn" + System.identityHashCode(conn));
+            }
+            return conn;
+        } catch (SQLException e) {
+            s_logger.warn("Unexpected exception: ", e);
+            return null;
+        }
+    }
+
+    protected void attach(TransactionAttachment value) {
+        _stack.push(new StackElement(ATTACHMENT, value));
+    }
+
+    protected TransactionAttachment detach(String name) {
+        Iterator<StackElement> it = _stack.descendingIterator();
+        while (it.hasNext()) {
+            StackElement element = it.next();
+            if (element.type == ATTACHMENT) {
+                TransactionAttachment att = (TransactionAttachment)element.ref;
+                if (name.equals(att.getName())) {
+                    it.remove();
+                    return att;
+                }
+            }
+        }
+        assert false : "Are you sure you attached this: " + name;
+        return null;
+    }
+
+    public static void attachToTxn(TransactionAttachment value) {
+        TransactionLegacy txn = tls.get();
+        assert txn != null && txn.peekInStack(CURRENT_TXN) != null: "Come on....how can we attach something to the transaction if you haven't started it?";
+
+        txn.attach(value);
+    }
+
+    public static TransactionAttachment detachFromTxn(String name) {
+        TransactionLegacy txn = tls.get();
+        assert txn != null : "No Transaction in TLS";
+        return txn.detach(name);
+    }
+
+    protected static boolean checkAnnotation(int stack, TransactionLegacy txn) {
+        final StackTraceElement[] stacks = Thread.currentThread().getStackTrace();
+        StackElement se = txn.peekInStack(CURRENT_TXN);
+        if (se == null) {
+            return false;
+        }
+        
+        StringBuffer sb = new StringBuffer();
+        for (; stack < stacks.length; stack++) {
+            String methodName = stacks[stack].getMethodName();
+            sb.append(" ").append(methodName);
+            if (methodName.equals(se.ref)){
+                return true;
+            }
+        }
+        
+        // relax stack structure for several places that @DB required injection is not in place
+        s_logger.warn("Non-standard stack context that Transaction context is manaully placed into the calling chain. Stack chain: " + sb);
+        return true;
+    }
+
+    protected static String buildName() {
+        if (s_logger.isDebugEnabled()) {
+            final StackTraceElement[] stacks = Thread.currentThread().getStackTrace();
+            final StringBuilder str = new StringBuilder();
+            int i = 3, j = 3;
+            while (j < 15 && i < stacks.length) {
+                StackTraceElement element = stacks[i];
+                String filename = element.getFileName();
+                String method = element.getMethodName();
+                if ((filename != null && filename.equals("<generated>")) || (method != null && method.equals("invokeSuper"))) {
+                    i++;
+                    continue;
+                }
+
+                str.append("-").append(stacks[i].getClassName().substring(stacks[i].getClassName().lastIndexOf(".") + 1)).append(".").append(stacks[i].getMethodName()).append(":").append(stacks[i].getLineNumber());
+                j++;
+                i++;
+            }
+            return str.toString();
+        }
+
+        return "";
+    }
+
+    public TransactionLegacy(final String name, final boolean forLocking, final short databaseId) {
+        _name = name;
+        _conn = null;
+        _stack = new LinkedList<StackElement>();
+        _txn = false;
+        _dbId = databaseId;
+        _id = s_id.incrementAndGet();
+        _creator = Thread.currentThread().getName();
+    }
+
+    public String getCreator() {
+        return _creator;
+    }
+
+    public long getId() {
+        return _id;
+    }
+
+    public String getName() {
+        return _name;
+    }
+
+    public Short getDatabaseId() {
+        return _dbId;
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder str = new StringBuilder((_name != null ? _name : ""));
+        str.append(" : ");
+        for (final StackElement se : _stack) {
+            if (se.type == CURRENT_TXN) {
+                str.append(se.ref).append(", ");
+            }
+        }
+
+        return str.toString();
+    }
+
+    protected void mark(final String name) {
+        _stack.push(new StackElement(CURRENT_TXN, name));
+    }
+
+    public boolean lock(final String name, final int timeoutSeconds) {
+        Merovingian2 lockMaster = Merovingian2.getLockMaster();
+        if (lockMaster == null) {
+            throw new CloudRuntimeException("There's no support for locking yet");
+        }
+        return lockMaster.acquire(name, timeoutSeconds);
+    }
+
+    public boolean release(final String name) {
+        Merovingian2 lockMaster = Merovingian2.getLockMaster();
+        if (lockMaster == null) {
+            throw new CloudRuntimeException("There's no support for locking yet");
+        }
+        return lockMaster.release(name);
+    }
+
+    public void start() {
+        if (s_logger.isTraceEnabled()) {
+            s_logger.trace("txn: start requested by: " + buildName());
+        }
+
+        _stack.push(new StackElement(START_TXN, null));
+
+        if (_txn) {
+            s_logger.trace("txn: has already been started.");
+            return;
+        }
+
+        _txn = true;
+
+        _txnTime = System.currentTimeMillis();
+        if (_conn != null) {
+            try {
+                s_logger.trace("txn: set auto commit to false");
+                _conn.setAutoCommit(false);
+            } catch (final SQLException e) {
+                s_logger.warn("Unable to set auto commit: ", e);
+                throw new CloudRuntimeException("Unable to set auto commit: ", e);
+            }
+        }
+    }
+
+    protected void closePreviousStatement() {
+        if (_stmt != null) {
+            try {
+                if (s_stmtLogger.isTraceEnabled()) {
+                    s_stmtLogger.trace("Closing: " + _stmt.toString());
+                }
+                try {
+                    ResultSet rs = _stmt.getResultSet();
+                    if (rs != null && _stmt.getResultSetHoldability() != ResultSet.HOLD_CURSORS_OVER_COMMIT) {
+                        rs.close();
+                    }
+                } catch(SQLException e) {
+                    s_stmtLogger.trace("Unable to close resultset");
+                }
+                _stmt.close();
+            } catch (final SQLException e) {
+                s_stmtLogger.trace("Unable to close statement: " + _stmt.toString());
+            } finally {
+                _stmt = null;
+            }
+        }
+    }
+
+    /**
+     * Prepares an auto close statement.  The statement is closed automatically if it is
+     * retrieved with this method.
+     * 
+     * @param sql sql String
+     * @return PreparedStatement
+     * @throws SQLException if problem with JDBC layer.
+     * 
+     * @see java.sql.Connection
+     */
+    public PreparedStatement prepareAutoCloseStatement(final String sql) throws SQLException {
+        PreparedStatement stmt = prepareStatement(sql);
+        closePreviousStatement();
+        _stmt = stmt;
+        return stmt;
+    }
+
+    public PreparedStatement prepareStatement(final String sql) throws SQLException {
+        final Connection conn = getConnection();
+        final PreparedStatement pstmt = conn.prepareStatement(sql);
+        if (s_stmtLogger.isTraceEnabled()) {
+            s_stmtLogger.trace("Preparing: " + sql);
+        }
+        return pstmt;
+    }
+
+    /**
+     * Prepares an auto close statement.  The statement is closed automatically if it is
+     * retrieved with this method.
+     * 
+     * @param sql sql String
+     * @param autoGeneratedKeys keys that are generated
+     * @return PreparedStatement
+     * @throws SQLException if problem with JDBC layer.
+     * 
+     * @see java.sql.Connection
+     */
+    public PreparedStatement prepareAutoCloseStatement(final String sql, final int autoGeneratedKeys) throws SQLException {
+        final Connection conn = getConnection();
+        final PreparedStatement pstmt = conn.prepareStatement(sql, autoGeneratedKeys);
+        if (s_stmtLogger.isTraceEnabled()) {
+            s_stmtLogger.trace("Preparing: " + sql);
+        }
+        closePreviousStatement();
+        _stmt = pstmt;
+        return pstmt;
+    }
+
+    /**
+     * Prepares an auto close statement.  The statement is closed automatically if it is
+     * retrieved with this method.
+     * 
+     * @param sql sql String
+     * @param columnNames names of the columns
+     * @return PreparedStatement
+     * @throws SQLException if problem with JDBC layer.
+     * 
+     * @see java.sql.Connection
+     */
+    public PreparedStatement prepareAutoCloseStatement(final String sql, final String[] columnNames) throws SQLException {
+        final Connection conn = getConnection();
+        final PreparedStatement pstmt = conn.prepareStatement(sql, columnNames);
+        if (s_stmtLogger.isTraceEnabled()) {
+            s_stmtLogger.trace("Preparing: " + sql);
+        }
+        closePreviousStatement();
+        _stmt = pstmt;
+        return pstmt;
+    }
+
+    /**
+     * Prepares an auto close statement.  The statement is closed automatically if it is
+     * retrieved with this method.
+     * 
+     * @param sql sql String
+     * @return PreparedStatement
+     * @throws SQLException if problem with JDBC layer.
+     * 
+     * @see java.sql.Connection
+     */
+    public PreparedStatement prepareAutoCloseStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+        final Connection conn = getConnection();
+        final PreparedStatement pstmt = conn.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
+        if (s_stmtLogger.isTraceEnabled()) {
+            s_stmtLogger.trace("Preparing: " + sql);
+        }
+        closePreviousStatement();
+        _stmt = pstmt;
+        return pstmt;
+    }
+
+    /**
+     * Returns the db connection.
+     * 
+     * Note: that you can call getConnection() but beaware that
+     * all prepare statements from the Connection are not garbage
+     * collected!
+     * 
+     * @return DB Connection but make sure you understand that
+     *         you are responsible for closing the PreparedStatement.
+     * @throws SQLException
+     */
+    public Connection getConnection() throws SQLException {
+        if (_conn == null) {
+            switch (_dbId) {
+            case CLOUD_DB:
+                if(s_ds != null) {
+                    _conn = s_ds.getConnection();
+                } else {
+                    s_logger.warn("A static-initialized variable becomes null, process is dying?");
+                    throw new CloudRuntimeException("Database is not initialized, process is dying?");
+                }
+                break;
+            case USAGE_DB:
+                if(s_usageDS != null) {
+                    _conn = s_usageDS.getConnection();
+                } else {
+                    s_logger.warn("A static-initialized variable becomes null, process is dying?");
+                    throw new CloudRuntimeException("Database is not initialized, process is dying?");
+                }
+                break;
+            case AWSAPI_DB:
+        	if(s_awsapiDS != null) {
+        	    _conn = s_awsapiDS.getConnection();
+        	} else {
+        	    s_logger.warn("A static-initialized variable becomes null, process is dying?");
+                throw new CloudRuntimeException("Database is not initialized, process is dying?");
+        	}
+                break;
+
+            case SIMULATOR_DB:
+                if(s_simulatorDS != null) {
+                    _conn = s_simulatorDS.getConnection();
+                } else {
+                    s_logger.warn("A static-initialized variable becomes null, process is dying?");
+                    throw new CloudRuntimeException("Database is not initialized, process is dying?");
+                }
+                break;
+            default:
+
+        	throw new CloudRuntimeException("No database selected for the transaction");
+            }
+            _conn.setAutoCommit(!_txn);
+
+            //
+            // MySQL default transaction isolation level is REPEATABLE READ,
+            // to reduce chances of DB deadlock, we will use READ COMMITED isolation level instead
+            // see http://dev.mysql.com/doc/refman/5.0/en/innodb-deadlocks.html
+            //
+            _stack.push(new StackElement(CREATE_CONN, null));
+            if (s_connLogger.isTraceEnabled()) {
+                s_connLogger.trace("Creating a DB connection with " + (_txn ? " txn: " : " no txn: ") + " for " + _dbId + ": dbconn" + System.identityHashCode(_conn) + ". Stack: " + buildName());
+            }
+        } else {
+            s_logger.trace("conn: Using existing DB connection");
+        }
+
+        return _conn;
+    }
+
+    protected boolean takeOver(final String name, final boolean create) {
+        if (_stack.size() != 0) {
+            if (!create) {
+                // If it is not a create transaction, then let's just use the current one.
+                if (s_logger.isTraceEnabled()) {
+                    s_logger.trace("Using current transaction: " + toString());
+                }
+                mark(name);
+                return false;
+            }
+
+            final StackElement se = _stack.getFirst();
+            if (se.type == CREATE_TXN) {
+                // This create is called inside of another create.  Which is ok?
+                // We will let that create be responsible for cleaning up.
+                if (s_logger.isTraceEnabled()) {
+                    s_logger.trace("Create using current transaction: " + toString());
+                }
+                mark(name);
+                return false;
+            }
+
+            s_logger.warn("Encountered a transaction that has leaked.  Cleaning up. " + toString());
+            cleanup();
+        }
+
+        if (s_logger.isTraceEnabled()) {
+            s_logger.trace("Took over the transaction: " + name);
+        }
+        _stack.push(new StackElement(create ? CREATE_TXN : CURRENT_TXN, name));
+        _name = name;
+        return true;
+    }
+
+    public void cleanup() {
+        closePreviousStatement();
+
+        removeUpTo(null, null);
+        if (_txn) {
+            rollbackTransaction();
+        }
+        _txn = false;
+        _name = null;
+
+        closeConnection();
+
+        _stack.clear();
+        Merovingian2 lockMaster = Merovingian2.getLockMaster();
+        if (lockMaster != null) {
+            lockMaster.cleanupThread();
+        }
+    }
+
+    public void close() {
+        removeUpTo(CURRENT_TXN, null);
+
+        if (_stack.size() == 0) {
+            s_logger.trace("Transaction is done");
+            cleanup();
+        }
+    }
+
+    /**
+     * close() is used by endTxn to close the connection.  This method only
+     * closes the connection if the name is the same as what's stored.
+     * 
+     * @param name
+     * @return true if this close actually closes the connection.  false if not.
+     */
+    public boolean close(final String name) {
+        if (_name == null) {    // Already cleaned up.
+            if (s_logger.isTraceEnabled()) {
+                s_logger.trace("Already cleaned up." + buildName());
+            }
+            return true;
+        }
+
+        if (!_name.equals(name)) {
+            close();
+            return false;
+        }
+
+        if (s_logger.isDebugEnabled() && _stack.size() > 2) {
+            s_logger.debug("Transaction is not closed properly: " + toString() + ".  Called by " + buildName());
+        }
+
+        cleanup();
+
+        s_logger.trace("All done");
+        return true;
+    }
+
+    protected boolean hasTxnInStack() {
+        return peekInStack(START_TXN) != null;
+    }
+
+    protected void clearLockTimes() {
+        if (s_lockLogger.isDebugEnabled()) {
+            for (Pair<String, Long> time : _lockTimes) {
+                s_lockLogger.trace("SQL " + time.first() + " took " + (System.currentTimeMillis() - time.second()));
+            }
+            _lockTimes.clear();
+        }
+    }
+
+    public boolean commit() {
+        if (!_txn) {
+            s_logger.warn("txn: Commit called when it is not a transaction: " + buildName());
+            return false;
+        }
+
+        Iterator<StackElement> it = _stack.iterator();
+        while (it.hasNext()) {
+            StackElement st = it.next();
+            if (st.type == START_TXN) {
+                it.remove();
+                break;
+            }
+        }
+
+        if (hasTxnInStack()) {
+            if (s_logger.isTraceEnabled()) {
+                s_logger.trace("txn: Not committing because transaction started elsewhere: " + buildName() + " / " + toString());
+            }
+            return false;
+        }
+
+        _txn = false;
+        try {
+            if (_conn != null) {
+                _conn.commit();
+                s_logger.trace("txn: DB Changes committed. Time = " + (System.currentTimeMillis() - _txnTime));
+                clearLockTimes();
+                closeConnection();
+            }
+            return true;
+        } catch (final SQLException e) {
+            rollbackTransaction();
+            throw new CloudRuntimeException("Unable to commit or close the connection. ", e);
+        }
+    }
+
+    protected void closeConnection() {
+        closePreviousStatement();
+
+        if (_conn == null) {
+            return;
+        }
+
+        if (_txn) {
+            s_connLogger.trace("txn: Not closing DB connection because we're still in a transaction.");
+            return;
+        }
+
+        try {
+            // we should only close db connection when it is not user managed
+            if (this._dbId != CONNECTED_DB) {
+                if (s_connLogger.isTraceEnabled()) {
+                    s_connLogger.trace("Closing DB connection: dbconn" + System.identityHashCode(_conn));
+                }                                
+                _conn.close();
+                _conn = null;  
+            }
+
+        } catch (final SQLException e) {
+            s_logger.warn("Unable to close connection", e);
+        }
+    }
+
+    protected void removeUpTo(String type, Object ref) {
+        boolean rollback = false;
+        Iterator<StackElement> it = _stack.iterator();
+        while (it.hasNext()) {
+            StackElement item = it.next();
+
+            it.remove();
+
+            try {
+                if (item.type == type && (ref == null || item.ref == ref)) {
+                    break;
+                }
+
+                if (item.type == CURRENT_TXN) {
+                    if (s_logger.isTraceEnabled()) {
+                        s_logger.trace("Releasing the current txn: " + (item.ref != null ? item.ref : ""));
+                    }
+                } else if (item.type == CREATE_CONN) {
+                    closeConnection();
+                } else if (item.type == START_TXN) {
+                    if (item.ref == null) {
+                        rollback = true;
+                    } else {
+                        try {
+                            _conn.rollback((Savepoint)ref);
+                            rollback = false;
+                        } catch (final SQLException e) {
+                            s_logger.warn("Unable to rollback Txn.", e);
+                        }
+                    }
+                } else if (item.type == STATEMENT) {
+                    try {
+                        if (s_stmtLogger.isTraceEnabled()) {
+                            s_stmtLogger.trace("Closing: " + ref.toString());
+                        }
+                        Statement stmt = (Statement)ref;
+                        try {
+                            ResultSet rs = stmt.getResultSet();
+                            if (rs != null) {
+                                rs.close();
+                            }
+                        } catch(SQLException e) {
+                            s_stmtLogger.trace("Unable to close resultset");
+                        }
+                        stmt.close();
+                    } catch (final SQLException e) {
+                        s_stmtLogger.trace("Unable to close statement: " + item);
+                    }
+                } else if (item.type == ATTACHMENT) {
+                    TransactionAttachment att = (TransactionAttachment)item.ref;
+                    if (s_logger.isTraceEnabled()) {
+                        s_logger.trace("Cleaning up " + att.getName());
+                    }
+                    att.cleanup();
+                }
+            } catch(Exception e) {
+                s_logger.error("Unable to clean up " + item, e);
+            }
+        }
+
+        if (rollback) {
+            rollback();
+        }
+    }
+
+    protected void rollbackTransaction() {
+        closePreviousStatement();
+        if (!_txn) {
+            if (s_logger.isTraceEnabled()) {
+                s_logger.trace("Rollback called for " + _name + " when there's no transaction: " + buildName());
+            }
+            return;
+        }
+        assert (!hasTxnInStack()) : "Who's rolling back transaction when there's still txn in stack?";
+        _txn = false;
+        try {
+            if (_conn != null) {
+                if (s_logger.isDebugEnabled()) {
+                    s_logger.debug("Rolling back the transaction: Time = " + (System.currentTimeMillis() - _txnTime) + " Name =  " + _name + "; called by " + buildName());
+                }
+                _conn.rollback();
+            }
+            clearLockTimes();
+            closeConnection();
+        } catch(final SQLException e) {
+            s_logger.warn("Unable to rollback", e);
+        }
+    }
+
+    protected void rollbackSavepoint(Savepoint sp) {
+        try {
+            if (_conn != null) {
+                _conn.rollback(sp);
+            }
+        } catch (SQLException e) {
+            s_logger.warn("Unable to rollback to savepoint " + sp);
+        }
+
+        if (!hasTxnInStack()) {
+            _txn = false;
+            closeConnection();
+        }
+    }
+
+    public void rollback() {
+        Iterator<StackElement> it = _stack.iterator();
+        while (it.hasNext()) {
+            StackElement st = it.next();
+            if (st.type == START_TXN) {
+                if (st.ref == null) {
+                    it.remove();
+                } else  {
+                    rollback((Savepoint)st.ref);
+                    return;
+                }
+            }
+        }
+
+        rollbackTransaction();
+    }
+
+    public Savepoint setSavepoint() throws SQLException {
+        _txn = true;
+        StackElement st = new StackElement(START_TXN, null);
+        _stack.push(st);
+        final Connection conn = getConnection();
+        final Savepoint sp = conn.setSavepoint();
+        st.ref = sp;
+
+        return sp;
+    }
+
+    public Savepoint setSavepoint(final String name) throws SQLException {
+        _txn = true;
+        StackElement st = new StackElement(START_TXN, null);
+        _stack.push(st);
+        final Connection conn = getConnection();
+        final Savepoint sp = conn.setSavepoint(name);
+        st.ref = sp;
+
+        return sp;
+    }
+
+    public void releaseSavepoint(final Savepoint sp) throws SQLException {
+        removeTxn(sp);
+        if (_conn != null) {
+            _conn.releaseSavepoint(sp);
+        }
+
+        if (!hasTxnInStack()) {
+            _txn = false;
+            closeConnection();
+        }
+    }
+
+    protected boolean hasSavepointInStack(Savepoint sp) {
+        Iterator<StackElement> it = _stack.iterator();
+        while (it.hasNext()) {
+            StackElement se = it.next();
+            if (se.type == START_TXN && se.ref == sp) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    protected void removeTxn(Savepoint sp) {
+        assert hasSavepointInStack(sp) : "Removing a save point that's not in the stack";
+
+        if (!hasSavepointInStack(sp)) {
+            return;
+        }
+
+        Iterator<StackElement> it = _stack.iterator();
+        while (it.hasNext()) {
+            StackElement se = it.next();
+            if (se.type == START_TXN) {
+                it.remove();
+                if (se.ref == sp) {
+                    return;
+                }
+            }
+        }
+    }
+
+    public void rollback(final Savepoint sp) {
+        removeTxn(sp);
+
+        rollbackSavepoint(sp);
+    }
+
+    public Connection getCurrentConnection() {
+        return _conn;
+    }
+
+    public List<StackElement> getStack() {
+        return _stack;
+    }
+
+    protected TransactionLegacy() {
+        _name = null;
+        _conn = null;
+        _stack = null;
+        _txn = false;
+        _dbId = -1;
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        if (!(_conn == null && (_stack == null || _stack.size() == 0))) {
+            assert (false) : "Oh Alex oh alex...something is wrong with how we're doing this";
+            s_logger.error("Something went wrong that a transaction is orphaned before db connection is closed");
+            cleanup();
+        }
+    }
+
+    protected class StackElement {
+        public String type;
+        public Object ref;
+
+        public StackElement (String type, Object ref) {
+            this.type = type;
+            this.ref = ref;
+        }
+
+        @Override
+        public String toString() {
+            return type + "-" + ref;
+        }
+    }
+
+    private static DataSource s_ds;
+    private static DataSource s_usageDS;
+    private static DataSource s_awsapiDS;
+    private static DataSource s_simulatorDS;
+
+    static {
+        // Initialize with assumed db.properties file
+        initDataSource("db.properties");
+    }
+
+    public static void initDataSource(String propsFileName) {
+        try {
+            File dbPropsFile = PropertiesUtil.findConfigFile(propsFileName);
+            final Properties dbProps;
+            if (EncryptionSecretKeyChecker.useEncryption()) {
+                StandardPBEStringEncryptor encryptor = EncryptionSecretKeyChecker.getEncryptor();
+                dbProps = new EncryptableProperties(encryptor);
+            } else {
+                dbProps = new Properties();
+            }
+            try {
+                dbProps.load(new FileInputStream(dbPropsFile));
+            } catch (IOException e) {
+                s_logger.fatal("Unable to load db properties file, pl. check the classpath and file path configuration", e);
+                return;
+            } catch (NullPointerException e) {
+                s_logger.fatal("Unable to locate db properties file within classpath or absolute path: " + propsFileName);
+                return;
+            }
+
+            // FIXME:  If params are missing...default them????
+            final int cloudMaxActive = Integer.parseInt(dbProps.getProperty("db.cloud.maxActive"));
+            final int cloudMaxIdle = Integer.parseInt(dbProps.getProperty("db.cloud.maxIdle"));
+            final long cloudMaxWait = Long.parseLong(dbProps.getProperty("db.cloud.maxWait"));
+            final String cloudUsername = dbProps.getProperty("db.cloud.username");
+            final String cloudPassword = dbProps.getProperty("db.cloud.password");
+            final String cloudHost = dbProps.getProperty("db.cloud.host");
+            final int cloudPort = Integer.parseInt(dbProps.getProperty("db.cloud.port"));
+            final String cloudDbName = dbProps.getProperty("db.cloud.name");
+            final boolean cloudAutoReconnect = Boolean.parseBoolean(dbProps.getProperty("db.cloud.autoReconnect"));
+            final String cloudValidationQuery = dbProps.getProperty("db.cloud.validationQuery");
+            final String cloudIsolationLevel = dbProps.getProperty("db.cloud.isolation.level");
+
+            int isolationLevel = Connection.TRANSACTION_READ_COMMITTED;
+            if (cloudIsolationLevel == null) {
+                isolationLevel = Connection.TRANSACTION_READ_COMMITTED;
+            } else if (cloudIsolationLevel.equalsIgnoreCase("readcommitted")) {
+                isolationLevel = Connection.TRANSACTION_READ_COMMITTED;
+            } else if (cloudIsolationLevel.equalsIgnoreCase("repeatableread")) {
+                isolationLevel = Connection.TRANSACTION_REPEATABLE_READ;
+            } else if (cloudIsolationLevel.equalsIgnoreCase("serializable")) {
+                isolationLevel = Connection.TRANSACTION_SERIALIZABLE;
+            } else if (cloudIsolationLevel.equalsIgnoreCase("readuncommitted")) {
+                isolationLevel = Connection.TRANSACTION_READ_UNCOMMITTED;
+            } else {
+                s_logger.warn("Unknown isolation level " + cloudIsolationLevel + ".  Using read uncommitted");
+            }
+
+            final boolean cloudTestOnBorrow = Boolean.parseBoolean(dbProps.getProperty("db.cloud.testOnBorrow"));
+            final boolean cloudTestWhileIdle = Boolean.parseBoolean(dbProps.getProperty("db.cloud.testWhileIdle"));
+            final long cloudTimeBtwEvictionRunsMillis = Long.parseLong(dbProps.getProperty("db.cloud.timeBetweenEvictionRunsMillis"));
+            final long cloudMinEvcitableIdleTimeMillis = Long.parseLong(dbProps.getProperty("db.cloud.minEvictableIdleTimeMillis"));
+            final boolean cloudPoolPreparedStatements = Boolean.parseBoolean(dbProps.getProperty("db.cloud.poolPreparedStatements"));
+            final String url = dbProps.getProperty("db.cloud.url.params");
+
+            final boolean useSSL = Boolean.parseBoolean(dbProps.getProperty("db.cloud.useSSL"));
+            if (useSSL) {
+                System.setProperty("javax.net.ssl.keyStore", dbProps.getProperty("db.cloud.keyStore"));
+                System.setProperty("javax.net.ssl.keyStorePassword", dbProps.getProperty("db.cloud.keyStorePassword"));
+                System.setProperty("javax.net.ssl.trustStore", dbProps.getProperty("db.cloud.trustStore"));
+                System.setProperty("javax.net.ssl.trustStorePassword", dbProps.getProperty("db.cloud.trustStorePassword"));
+            }
+
+            final GenericObjectPool cloudConnectionPool = new GenericObjectPool(null, cloudMaxActive, GenericObjectPool.DEFAULT_WHEN_EXHAUSTED_ACTION,
+                    cloudMaxWait, cloudMaxIdle, cloudTestOnBorrow, false, cloudTimeBtwEvictionRunsMillis, 1, cloudMinEvcitableIdleTimeMillis, cloudTestWhileIdle);
+
+            final ConnectionFactory cloudConnectionFactory = new DriverManagerConnectionFactory("jdbc:mysql://" + cloudHost + ":" + cloudPort + "/" + cloudDbName +
+                    "?autoReconnect=" + cloudAutoReconnect + (url != null ? "&" + url : "") + (useSSL ? "&useSSL=true" : ""), cloudUsername, cloudPassword);
+
+            final KeyedObjectPoolFactory poolableObjFactory = (cloudPoolPreparedStatements ? new StackKeyedObjectPoolFactory() : null);
+
+            final PoolableConnectionFactory cloudPoolableConnectionFactory = new PoolableConnectionFactory(cloudConnectionFactory, cloudConnectionPool, poolableObjFactory,
+                    cloudValidationQuery, false, false, isolationLevel);
+
+            // Default Data Source for CloudStack
+            s_ds = new PoolingDataSource(cloudPoolableConnectionFactory.getPool());
+
+            // Configure the usage db
+            final int usageMaxActive = Integer.parseInt(dbProps.getProperty("db.usage.maxActive"));
+            final int usageMaxIdle = Integer.parseInt(dbProps.getProperty("db.usage.maxIdle"));
+            final long usageMaxWait = Long.parseLong(dbProps.getProperty("db.usage.maxWait"));
+            final String usageUsername = dbProps.getProperty("db.usage.username");
+            final String usagePassword = dbProps.getProperty("db.usage.password");
+            final String usageHost = dbProps.getProperty("db.usage.host");
+            final int usagePort = Integer.parseInt(dbProps.getProperty("db.usage.port"));
+            final String usageDbName = dbProps.getProperty("db.usage.name");
+            final boolean usageAutoReconnect = Boolean.parseBoolean(dbProps.getProperty("db.usage.autoReconnect"));
+            final String usageUrl = dbProps.getProperty("db.usage.url.params");
+
+            final GenericObjectPool usageConnectionPool = new GenericObjectPool(null, usageMaxActive, GenericObjectPool.DEFAULT_WHEN_EXHAUSTED_ACTION,
+                    usageMaxWait, usageMaxIdle);
+
+            final ConnectionFactory usageConnectionFactory = new DriverManagerConnectionFactory("jdbc:mysql://" + usageHost + ":" + usagePort + "/" + usageDbName +
+                    "?autoReconnect=" + usageAutoReconnect + (usageUrl != null ? "&" + usageUrl : ""), usageUsername, usagePassword);
+
+            final PoolableConnectionFactory usagePoolableConnectionFactory = new PoolableConnectionFactory(usageConnectionFactory, usageConnectionPool,
+                    new StackKeyedObjectPoolFactory(), null, false, false);
+
+            // Data Source for usage server
+            s_usageDS = new PoolingDataSource(usagePoolableConnectionFactory.getPool());
+
+            // Configure awsapi db
+            final String awsapiDbName = dbProps.getProperty("db.awsapi.name");
+            final GenericObjectPool awsapiConnectionPool = new GenericObjectPool(null, usageMaxActive, GenericObjectPool.DEFAULT_WHEN_EXHAUSTED_ACTION,
+                    usageMaxWait, usageMaxIdle);
+            final ConnectionFactory awsapiConnectionFactory = new DriverManagerConnectionFactory("jdbc:mysql://" + cloudHost + ":" + cloudPort + "/" + awsapiDbName +
+                    "?autoReconnect=" + usageAutoReconnect, cloudUsername, cloudPassword);
+            final PoolableConnectionFactory awsapiPoolableConnectionFactory = new PoolableConnectionFactory(awsapiConnectionFactory, awsapiConnectionPool,
+                    new StackKeyedObjectPoolFactory(), null, false, false);
+
+            // Data Source for awsapi
+            s_awsapiDS = new PoolingDataSource(awsapiPoolableConnectionFactory.getPool());
+
+            try {
+                // Configure the simulator db
+                final int simulatorMaxActive = Integer.parseInt(dbProps.getProperty("db.simulator.maxActive"));
+                final int simulatorMaxIdle = Integer.parseInt(dbProps.getProperty("db.simulator.maxIdle"));
+                final long simulatorMaxWait = Long.parseLong(dbProps.getProperty("db.simulator.maxWait"));
+                final String simulatorUsername = dbProps.getProperty("db.simulator.username");
+                final String simulatorPassword = dbProps.getProperty("db.simulator.password");
+                final String simulatorHost = dbProps.getProperty("db.simulator.host");
+                final int simulatorPort = Integer.parseInt(dbProps.getProperty("db.simulator.port"));
+                final String simulatorDbName = dbProps.getProperty("db.simulator.name");
+                final boolean simulatorAutoReconnect = Boolean.parseBoolean(dbProps.getProperty("db.simulator.autoReconnect"));
+
+                final GenericObjectPool simulatorConnectionPool = new GenericObjectPool(null, simulatorMaxActive, GenericObjectPool.DEFAULT_WHEN_EXHAUSTED_ACTION,
+                        simulatorMaxWait, simulatorMaxIdle);
+
+                final ConnectionFactory simulatorConnectionFactory = new DriverManagerConnectionFactory("jdbc:mysql://" + simulatorHost + ":" + simulatorPort + "/" + simulatorDbName +
+                        "?autoReconnect=" + simulatorAutoReconnect, simulatorUsername, simulatorPassword);
+
+                final PoolableConnectionFactory simulatorPoolableConnectionFactory = new PoolableConnectionFactory(simulatorConnectionFactory, simulatorConnectionPool,
+                        new StackKeyedObjectPoolFactory(), null, false, false);
+                s_simulatorDS = new PoolingDataSource(simulatorPoolableConnectionFactory.getPool());
+            } catch (Exception e) {
+                s_logger.debug("Simulator DB properties are not available. Not initializing simulator DS");
+            }
+        } catch (final Exception e) {
+            s_ds = getDefaultDataSource("cloud");
+            s_usageDS = getDefaultDataSource("cloud_usage");
+            s_simulatorDS = getDefaultDataSource("cloud_simulator");
+            s_logger.warn("Unable to load db configuration, using defaults with 5 connections. Falling back on assumed datasource on localhost:3306 using username:password=cloud:cloud. Please check your configuration", e);
+        }
+    }
+
+    private static DataSource getDefaultDataSource(final String database) {
+        final GenericObjectPool connectionPool = new GenericObjectPool(null, 5);
+        final ConnectionFactory connectionFactory = new DriverManagerConnectionFactory(
+           "jdbc:mysql://localhost:3306/" + database, "cloud", "cloud");
+        final PoolableConnectionFactory poolableConnectionFactory = new PoolableConnectionFactory(
+           connectionFactory, connectionPool, null, null, false, true);
+        return new PoolingDataSource(
+           /* connectionPool */poolableConnectionFactory.getPool());
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f62e28c1/framework/db/src/com/cloud/utils/db/TransactionMBeanImpl.java
----------------------------------------------------------------------
diff --git a/framework/db/src/com/cloud/utils/db/TransactionMBeanImpl.java b/framework/db/src/com/cloud/utils/db/TransactionMBeanImpl.java
index d51a9bd..73511b1 100644
--- a/framework/db/src/com/cloud/utils/db/TransactionMBeanImpl.java
+++ b/framework/db/src/com/cloud/utils/db/TransactionMBeanImpl.java
@@ -25,21 +25,21 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import javax.management.StandardMBean;
 
-import com.cloud.utils.db.Transaction.StackElement;
+import com.cloud.utils.db.TransactionLegacy.StackElement;
 
 public class TransactionMBeanImpl extends StandardMBean implements TransactionMBean {
     
-    Map<Long, Transaction> _txns = new ConcurrentHashMap<Long, Transaction>();
+    Map<Long, TransactionLegacy> _txns = new ConcurrentHashMap<Long, TransactionLegacy>();
     
     public TransactionMBeanImpl() {
         super(TransactionMBean.class, false);
     }
     
-    public void addTransaction(Transaction txn) {
+    public void addTransaction(TransactionLegacy txn) {
         _txns.put(txn.getId(), txn);
     }
     
-    public void removeTransaction(Transaction txn) {
+    public void removeTransaction(TransactionLegacy txn) {
         _txns.remove(txn.getId());
     }
     
@@ -53,7 +53,7 @@ public class TransactionMBeanImpl extends StandardMBean implements TransactionMB
         int[] count = new int[2];
         count[0] = 0;
         count[1] = 0;
-        for (Transaction txn : _txns.values()) {
+        for (TransactionLegacy txn : _txns.values()) {
             if (txn.getStack().size() > 0) {
                 count[0]++;
             }
@@ -67,7 +67,7 @@ public class TransactionMBeanImpl extends StandardMBean implements TransactionMB
     @Override
     public List<Map<String, String>> getTransactions() {
         ArrayList<Map<String, String>> txns = new ArrayList<Map<String, String>>();
-        for (Transaction info : _txns.values()) {
+        for (TransactionLegacy info : _txns.values()) {
             txns.add(toMap(info));
         }
         return txns;
@@ -76,7 +76,7 @@ public class TransactionMBeanImpl extends StandardMBean implements TransactionMB
     @Override
     public List<Map<String, String>> getActiveTransactions() {
         ArrayList<Map<String, String>> txns = new ArrayList<Map<String, String>>();
-        for (Transaction txn : _txns.values()) {
+        for (TransactionLegacy txn : _txns.values()) {
             if (txn.getStack().size() > 0 || txn.getCurrentConnection() != null) {
                 txns.add(toMap(txn));
             }
@@ -84,7 +84,7 @@ public class TransactionMBeanImpl extends StandardMBean implements TransactionMB
         return txns;
     }
     
-    protected Map<String, String> toMap(Transaction txn) {
+    protected Map<String, String> toMap(TransactionLegacy txn) {
         Map<String, String> map = new HashMap<String, String>();
         map.put("name", txn.getName());
         map.put("id", Long.toString(txn.getId()));
@@ -103,7 +103,7 @@ public class TransactionMBeanImpl extends StandardMBean implements TransactionMB
     @Override
     public List<Map<String, String>> getTransactionsWithDatabaseConnection() {
         ArrayList<Map<String, String>> txns = new ArrayList<Map<String, String>>();
-        for (Transaction txn : _txns.values()) {
+        for (TransactionLegacy txn : _txns.values()) {
             if (txn.getCurrentConnection() != null) {
                 txns.add(toMap(txn));
             }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f62e28c1/framework/db/src/com/cloud/utils/db/TransactionStatus.java
----------------------------------------------------------------------
diff --git a/framework/db/src/com/cloud/utils/db/TransactionStatus.java b/framework/db/src/com/cloud/utils/db/TransactionStatus.java
new file mode 100644
index 0000000..a167797
--- /dev/null
+++ b/framework/db/src/com/cloud/utils/db/TransactionStatus.java
@@ -0,0 +1,7 @@
+package com.cloud.utils.db;
+
+/**
+ * Placeholder for possible future features
+ */
+public interface TransactionStatus {
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f62e28c1/framework/db/src/com/cloud/utils/db/TransactionWrappedExeception.java
----------------------------------------------------------------------
diff --git a/framework/db/src/com/cloud/utils/db/TransactionWrappedExeception.java b/framework/db/src/com/cloud/utils/db/TransactionWrappedExeception.java
new file mode 100644
index 0000000..70d4c16
--- /dev/null
+++ b/framework/db/src/com/cloud/utils/db/TransactionWrappedExeception.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.cloud.utils.db;
+
+import com.cloud.utils.exception.CloudRuntimeException;
+
+public class TransactionWrappedExeception extends CloudRuntimeException {
+
+    private static final long serialVersionUID = -3254037624055143300L;
+
+    Exception e;
+
+    public TransactionWrappedExeception(Exception e) {
+        this.e = e;
+    }
+
+    public Exception getWrapped() {
+        return e;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f62e28c1/framework/db/test/com/cloud/utils/db/DbTestDao.java
----------------------------------------------------------------------
diff --git a/framework/db/test/com/cloud/utils/db/DbTestDao.java b/framework/db/test/com/cloud/utils/db/DbTestDao.java
index 9530b3b..7db5ba8 100644
--- a/framework/db/test/com/cloud/utils/db/DbTestDao.java
+++ b/framework/db/test/com/cloud/utils/db/DbTestDao.java
@@ -29,7 +29,7 @@ public class DbTestDao extends GenericDaoBase<DbTestVO, Long> implements Generic
 
     @DB
     public void create(int fldInt, long fldLong, String fldString) {
-        Transaction txn = Transaction.currentTxn();
+        TransactionLegacy txn = TransactionLegacy.currentTxn();
         PreparedStatement pstmt = null;
         try {
             txn.start();
@@ -48,7 +48,7 @@ public class DbTestDao extends GenericDaoBase<DbTestVO, Long> implements Generic
 
     @DB
     public void update(int fldInt, long fldLong, String fldString) {
-        Transaction txn = Transaction.currentTxn();
+        TransactionLegacy txn = TransactionLegacy.currentTxn();
         PreparedStatement pstmt = null;
         try {
             txn.start();

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f62e28c1/framework/db/test/com/cloud/utils/db/DbTestUtils.java
----------------------------------------------------------------------
diff --git a/framework/db/test/com/cloud/utils/db/DbTestUtils.java b/framework/db/test/com/cloud/utils/db/DbTestUtils.java
index 11ae1aa..2458b8c 100644
--- a/framework/db/test/com/cloud/utils/db/DbTestUtils.java
+++ b/framework/db/test/com/cloud/utils/db/DbTestUtils.java
@@ -33,7 +33,7 @@ public class DbTestUtils {
             throw new RuntimeException("Unable to clean the database because I can't find " + file);
         }
         
-        Connection conn = Transaction.getStandaloneConnection();
+        Connection conn = TransactionLegacy.getStandaloneConnection();
         
         ScriptRunner runner = new ScriptRunner(conn, autoCommit, stopOnError);
         FileReader reader;
@@ -63,7 +63,7 @@ public class DbTestUtils {
             throw new RuntimeException("Unable to clean the database because I can't find " + file);
         }
         
-        Connection conn = Transaction.getStandaloneUsageConnection();
+        Connection conn = TransactionLegacy.getStandaloneUsageConnection();
         
         ScriptRunner runner = new ScriptRunner(conn, autoCommit, stopOnError);
         FileReader reader;

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f62e28c1/framework/db/test/com/cloud/utils/db/TransactionTest.java
----------------------------------------------------------------------
diff --git a/framework/db/test/com/cloud/utils/db/TransactionTest.java b/framework/db/test/com/cloud/utils/db/TransactionTest.java
index 101a533..92b2f36 100644
--- a/framework/db/test/com/cloud/utils/db/TransactionTest.java
+++ b/framework/db/test/com/cloud/utils/db/TransactionTest.java
@@ -41,7 +41,7 @@ public class TransactionTest {
         Connection conn = null;
         PreparedStatement pstmt = null;
         try {
-            conn = Transaction.getStandaloneConnection();
+            conn = TransactionLegacy.getStandaloneConnection();
 
             pstmt = conn.prepareStatement("CREATE TABLE `cloud`.`test` ("
                     + "`id` bigint unsigned NOT NULL UNIQUE AUTO_INCREMENT," + "`fld_int` int unsigned,"
@@ -75,27 +75,27 @@ public class TransactionTest {
      */
     public void testUserManagedConnection() {
         DbTestDao testDao = ComponentContext.inject(DbTestDao.class);
-        Transaction txn = Transaction.open("SingleConnectionThread");
+        TransactionLegacy txn = TransactionLegacy.open("SingleConnectionThread");
         Connection conn = null;
         try {
-            conn = Transaction.getStandaloneConnectionWithException();
+            conn = TransactionLegacy.getStandaloneConnectionWithException();
             txn.transitToUserManagedConnection(conn);
             // try two SQLs to make sure that they are using the same connection
             // acquired above.
             testDao.create(1, 1, "Record 1");
-            Connection checkConn = Transaction.currentTxn().getConnection();
+            Connection checkConn = TransactionLegacy.currentTxn().getConnection();
             if (checkConn != conn) {
                 Assert.fail("A new db connection is acquired instead of using old one after create sql");
             }
             testDao.update(2, 2, "Record 1");
-            Connection checkConn2 = Transaction.currentTxn().getConnection();
+            Connection checkConn2 = TransactionLegacy.currentTxn().getConnection();
             if (checkConn2 != conn) {
                 Assert.fail("A new db connection is acquired instead of using old one after update sql");
             }
         } catch (SQLException e) {
             Assert.fail(e.getMessage());
         } finally {
-            txn.transitToAutoManagedConnection(Transaction.CLOUD_DB);
+            txn.transitToAutoManagedConnection(TransactionLegacy.CLOUD_DB);
             txn.close();
 
             if (conn != null) {
@@ -117,28 +117,28 @@ public class TransactionTest {
         // acquire a db connection and keep it
         Connection conn = null;
         try {
-            conn = Transaction.getStandaloneConnectionWithException();
+            conn = TransactionLegacy.getStandaloneConnectionWithException();
         } catch (SQLException ex) {
             throw new CloudRuntimeException("Problem with getting db connection", ex);
         }
 
         // start heartbeat loop, make sure that each loop still use the same
         // connection
-        Transaction txn = null;
+        TransactionLegacy txn = null;
         for (int i = 0; i < 3; i++) {
-            txn = Transaction.open("HeartbeatSimulator");
+            txn = TransactionLegacy.open("HeartbeatSimulator");
             try {
 
                 txn.transitToUserManagedConnection(conn);
                 testDao.create(i, i, "Record " + i);
-                Connection checkConn = Transaction.currentTxn().getConnection();
+                Connection checkConn = TransactionLegacy.currentTxn().getConnection();
                 if (checkConn != conn) {
                     Assert.fail("A new db connection is acquired instead of using old one in loop " + i);
                 }
             } catch (SQLException e) {
                 Assert.fail(e.getMessage());
             } finally {
-                txn.transitToAutoManagedConnection(Transaction.CLOUD_DB);
+                txn.transitToAutoManagedConnection(TransactionLegacy.CLOUD_DB);
                 txn.close();
             }
         }
@@ -161,7 +161,7 @@ public class TransactionTest {
         Connection conn = null;
         PreparedStatement pstmt = null;
         try {
-            conn = Transaction.getStandaloneConnection();
+            conn = TransactionLegacy.getStandaloneConnection();
 
             pstmt = conn.prepareStatement("truncate table `cloud`.`test`");
             pstmt.execute();
@@ -189,7 +189,7 @@ public class TransactionTest {
         Connection conn = null;
         PreparedStatement pstmt = null;
         try {
-            conn = Transaction.getStandaloneConnection();
+            conn = TransactionLegacy.getStandaloneConnection();
 
             pstmt = conn.prepareStatement("DROP TABLE IF EXISTS `cloud`.`test`");
             pstmt.execute();

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f62e28c1/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
index fb3845c..ed161e7 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
@@ -32,7 +32,7 @@ import com.cloud.utils.db.GenericDaoBase;
 import com.cloud.utils.db.SearchBuilder;
 import com.cloud.utils.db.SearchCriteria;
 import com.cloud.utils.db.SearchCriteria.Op;
-import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.TransactionLegacy;
 
 public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements AsyncJobDao {
     private static final Logger s_logger = Logger.getLogger(AsyncJobDaoImpl.class.getName());
@@ -182,7 +182,7 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
                 + ", job_result='" + jobResultMessage + "' where job_status=" + JobInfo.Status.IN_PROGRESS.ordinal()
                 + " AND (job_executing_msid=? OR (job_executing_msid IS NULL AND job_init_msid=?))";
 		
-        Transaction txn = Transaction.currentTxn();
+        TransactionLegacy txn = TransactionLegacy.currentTxn();
         PreparedStatement pstmt = null;
         try {
             pstmt = txn.prepareAutoCloseStatement(sql);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f62e28c1/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
index 20d8ba6..d4ca0d7 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
@@ -34,7 +34,7 @@ import com.cloud.utils.db.GenericDaoBase;
 import com.cloud.utils.db.SearchBuilder;
 import com.cloud.utils.db.SearchCriteria;
 import com.cloud.utils.db.SearchCriteria.Op;
-import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.TransactionLegacy;
 import com.cloud.utils.db.UpdateBuilder;
 import com.cloud.utils.exception.CloudRuntimeException;
 
@@ -157,7 +157,7 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
 //
 //		Date cutDate = DateUtil.currentGMTTime();
 //
-//		Transaction txn = Transaction.currentTxn();
+//		TransactionLegacy txn = TransactionLegacy.currentTxn();
 //        PreparedStatement pstmt = null;
 //        try {
 //			txn.start();
@@ -213,7 +213,7 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
     public List<Long> findJobsToWake(long joinedJobId) {
         // TODO: We should fix this.  We shouldn't be crossing daos in a dao code.
         List<Long> standaloneList = new ArrayList<Long>();
-        Transaction txn = Transaction.currentTxn();
+        TransactionLegacy txn = TransactionLegacy.currentTxn();
         String sql = "SELECT job_id FROM async_job_join_map WHERE join_job_id = ? AND job_id NOT IN (SELECT content_id FROM sync_queue_item)";
         try {
             PreparedStatement pstmt = txn.prepareStatement(sql);
@@ -231,7 +231,7 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
     @Override
     public List<Long> findJobsToWakeBetween(Date cutDate) {
         List<Long> standaloneList = new ArrayList<Long>();
-        Transaction txn = Transaction.currentTxn();
+        TransactionLegacy txn = TransactionLegacy.currentTxn();
         try {
             String sql = "SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ? AND job_id NOT IN (SELECT content_id FROM sync_queue_item)";
             PreparedStatement pstmt = txn.prepareStatement(sql);
@@ -260,7 +260,7 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
 //    public List<Long> wakeupByJoinedJobCompletion(long joinedJobId) {
 //        List<Long> standaloneList = new ArrayList<Long>();
 //
-//        Transaction txn = Transaction.currentTxn();
+//        TransactionLegacy txn = TransactionLegacy.currentTxn();
 //        PreparedStatement pstmt = null;
 //        try {
 //            txn.start();

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f62e28c1/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDaoImpl.java
index f7d9d72..01efc4e 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDaoImpl.java
@@ -30,7 +30,7 @@ import com.cloud.utils.DateUtil;
 import com.cloud.utils.db.GenericDaoBase;
 import com.cloud.utils.db.SearchBuilder;
 import com.cloud.utils.db.SearchCriteria;
-import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.TransactionLegacy;
 
 public class SyncQueueDaoImpl extends GenericDaoBase<SyncQueueVO, Long> implements SyncQueueDao {
     private static final Logger s_logger = Logger.getLogger(SyncQueueDaoImpl.class.getName());
@@ -51,7 +51,7 @@ public class SyncQueueDaoImpl extends GenericDaoBase<SyncQueueVO, Long> implemen
         String sql = "INSERT IGNORE INTO sync_queue(sync_objtype, sync_objid, created, last_updated)" +
                 " values(?, ?, ?, ?)";
 		
-        Transaction txn = Transaction.currentTxn();
+        TransactionLegacy txn = TransactionLegacy.currentTxn();
         PreparedStatement pstmt = null;
         try {
             pstmt = txn.prepareAutoCloseStatement(sql);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f62e28c1/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
index 0cd231f..2f04a7c 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
@@ -37,7 +37,7 @@ import com.cloud.utils.db.GenericSearchBuilder;
 import com.cloud.utils.db.SearchBuilder;
 import com.cloud.utils.db.SearchCriteria;
 import com.cloud.utils.db.SearchCriteria.Op;
-import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.TransactionLegacy;
 
 @DB
 public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long> implements SyncQueueItemDao {
@@ -83,7 +83,7 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long>
 					 " ORDER BY i.id " +
 					 " LIMIT 0, ?";
 
-        Transaction txn = Transaction.currentTxn();
+        TransactionLegacy txn = TransactionLegacy.currentTxn();
         PreparedStatement pstmt = null;
         try {
             pstmt = txn.prepareAutoCloseStatement(sql);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f62e28c1/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
index 93d50c1..ffc7b3a 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
@@ -70,8 +70,11 @@ import com.cloud.utils.db.GenericSearchBuilder;
 import com.cloud.utils.db.GlobalLock;
 import com.cloud.utils.db.SearchBuilder;
 import com.cloud.utils.db.SearchCriteria;
+import com.cloud.utils.db.TransactionCallback;
+import com.cloud.utils.db.TransactionCallbackNoReturn;
 import com.cloud.utils.db.SearchCriteria.Op;
 import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.TransactionStatus;
 import com.cloud.utils.exception.CloudRuntimeException;
 import com.cloud.utils.exception.ExceptionUtil;
 import com.cloud.utils.mgmt.JmxUtil;
@@ -177,19 +180,22 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
     @SuppressWarnings("unchecked")
     @Override
     @DB
-    public long submitAsyncJob(AsyncJob job, String syncObjType, long syncObjId) {
-        Transaction txt = Transaction.currentTxn();
+    public long submitAsyncJob(final AsyncJob job, final String syncObjType, final long syncObjId) {
         try {
             @SuppressWarnings("rawtypes")
-            GenericDao dao = GenericDaoBase.getDao(job.getClass());
+            final GenericDao dao = GenericDaoBase.getDao(job.getClass());
 
-            txt.start();
-            job.setInitMsid(getMsid());
-            dao.persist(job);
+            return Transaction.execute(new TransactionCallback<Long>() {
+                @Override
+                public Long doInTransaction(TransactionStatus status) {
+                    job.setInitMsid(getMsid());
+                    dao.persist(job);
+
+                    syncAsyncJobExecution(job, syncObjType, syncObjId, 1);
 
-            syncAsyncJobExecution(job, syncObjType, syncObjId, 1);
-            txt.commit();
-            return job.getId();
+                    return job.getId();
+                }
+            });
         } catch (Exception e) {
             String errMsg = "Unable to schedule async job for command " + job.getCmd() + ", unexpected exception.";
             s_logger.warn(errMsg, e);
@@ -199,123 +205,110 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
 
     @Override
     @DB
-    public void completeAsyncJob(long jobId, Status jobStatus, int resultCode, String resultObject) {
+    public void completeAsyncJob(final long jobId, final Status jobStatus, final int resultCode, final String resultObject) {
         if (s_logger.isDebugEnabled()) {
             s_logger.debug("Complete async job-" + jobId + ", jobStatus: " + jobStatus + ", resultCode: " + resultCode + ", result: " + resultObject);
         }
 
-        Transaction txn = Transaction.currentTxn();
-        try {
-            txn.start();
-            AsyncJobVO job = _jobDao.findById(jobId);
-            if (job == null) {
-                if (s_logger.isDebugEnabled()) {
-                    s_logger.debug("job-" + jobId + " no longer exists, we just log completion info here. " + jobStatus + ", resultCode: " + resultCode + ", result: " +
-                                   resultObject);
-                }
-
-                txn.rollback();
-                return;
+        final AsyncJobVO job = _jobDao.findById(jobId);
+        if (job == null) {
+            if (s_logger.isDebugEnabled()) {
+                s_logger.debug("job-" + jobId + " no longer exists, we just log completion info here. " + jobStatus + ", resultCode: " + resultCode + ", result: " +
+                               resultObject);
             }
 
-            if (job.getStatus() != JobInfo.Status.IN_PROGRESS) {
-                if (s_logger.isDebugEnabled()) {
-                    s_logger.debug("job-" + jobId + " is already completed.");
-                }
+            return;
+        }
 
-                txn.rollback();
-                return;
+        if (job.getStatus() != JobInfo.Status.IN_PROGRESS) {
+            if (s_logger.isDebugEnabled()) {
+                s_logger.debug("job-" + jobId + " is already completed.");
             }
 
-            job.setCompleteMsid(getMsid());
-            job.setStatus(jobStatus);
-            job.setResultCode(resultCode);
+            return;
+        }
 
-            // reset attached object
-            job.setInstanceType(null);
-            job.setInstanceId(null);
+        List<Long> wakeupList = Transaction.execute(new TransactionCallback<List<Long>>() {
+            @Override
+            public List<Long> doInTransaction(TransactionStatus status) {
+                job.setCompleteMsid(getMsid());
+                job.setStatus(jobStatus);
+                job.setResultCode(resultCode);
 
-            if (resultObject != null) {
-                job.setResult(resultObject);
-            }
+                // reset attached object
+                job.setInstanceType(null);
+                job.setInstanceId(null);
 
-            job.setLastUpdated(DateUtil.currentGMTTime());
-            _jobDao.update(jobId, job);
+                if (resultObject != null) {
+                    job.setResult(resultObject);
+                }
 
-            List<Long> wakeupList = wakeupByJoinedJobCompletion(jobId);
-            _joinMapDao.disjoinAllJobs(jobId);
+                job.setLastUpdated(DateUtil.currentGMTTime());
+                _jobDao.update(jobId, job);
 
-            txn.commit();
+                List<Long> wakeupList = wakeupByJoinedJobCompletion(jobId);
+                _joinMapDao.disjoinAllJobs(jobId);
 
-            for (Long id : wakeupList) {
-                // TODO, we assume that all jobs in this category is API job only
-                AsyncJobVO jobToWakeup = _jobDao.findById(id);
-                if (jobToWakeup != null && (jobToWakeup.getPendingSignals() & AsyncJob.Constants.SIGNAL_MASK_WAKEUP) != 0)
-                    scheduleExecution(jobToWakeup, false);
+                return wakeupList;
             }
+        });
 
-            _messageBus.publish(null, AsyncJob.Topics.JOB_STATE, PublishScope.GLOBAL, jobId);
-        } catch (Exception e) {
-            s_logger.error("Unexpected exception while completing async job-" + jobId, e);
-            txn.rollback();
+        for (Long id : wakeupList) {
+            // TODO, we assume that all jobs in this category is API job only
+            AsyncJobVO jobToWakeup = _jobDao.findById(id);
+            if (jobToWakeup != null && (jobToWakeup.getPendingSignals() & AsyncJob.Constants.SIGNAL_MASK_WAKEUP) != 0)
+                scheduleExecution(jobToWakeup, false);
         }
+
+        _messageBus.publish(null, AsyncJob.Topics.JOB_STATE, PublishScope.GLOBAL, jobId);
     }
 
     @Override
     @DB
-    public void updateAsyncJobStatus(long jobId, int processStatus, String resultObject) {
+    public void updateAsyncJobStatus(final long jobId, final int processStatus, final String resultObject) {
         if (s_logger.isDebugEnabled()) {
             s_logger.debug("Update async-job progress, job-" + jobId + ", processStatus: " + processStatus + ", result: " + resultObject);
         }
 
-        Transaction txt = Transaction.currentTxn();
-        try {
-            txt.start();
-            AsyncJobVO job = _jobDao.findById(jobId);
-            if (job == null) {
-                if (s_logger.isDebugEnabled()) {
-                    s_logger.debug("job-" + jobId + " no longer exists, we just log progress info here. progress status: " + processStatus);
-                }
-
-                txt.rollback();
-                return;
+        final AsyncJobVO job = _jobDao.findById(jobId);
+        if (job == null) {
+            if (s_logger.isDebugEnabled()) {
+                s_logger.debug("job-" + jobId + " no longer exists, we just log progress info here. progress status: " + processStatus);
             }
 
-            job.setProcessStatus(processStatus);
-            if (resultObject != null) {
-                job.setResult(resultObject);
-            }
-            job.setLastUpdated(DateUtil.currentGMTTime());
-            _jobDao.update(jobId, job);
-            txt.commit();
-        } catch (Exception e) {
-            s_logger.error("Unexpected exception while updating async job-" + jobId + " status: ", e);
-            txt.rollback();
+            return;
         }
+
+        Transaction.execute(new TransactionCallbackNoReturn() {
+            @Override
+            public void doInTransactionWithoutResult(TransactionStatus status) {
+                job.setProcessStatus(processStatus);
+                if (resultObject != null) {
+                    job.setResult(resultObject);
+                }
+                job.setLastUpdated(DateUtil.currentGMTTime());
+                _jobDao.update(jobId, job);
+            }
+        });
     }
 
     @Override
     @DB
-    public void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId) {
+    public void updateAsyncJobAttachment(final long jobId, final String instanceType, final Long instanceId) {
         if (s_logger.isDebugEnabled()) {
             s_logger.debug("Update async-job attachment, job-" + jobId + ", instanceType: " + instanceType + ", instanceId: " + instanceId);
         }
 
-        Transaction txt = Transaction.currentTxn();
-        try {
-            txt.start();
-
-            AsyncJobVO job = _jobDao.createForUpdate();
-            job.setInstanceType(instanceType);
-            job.setInstanceId(instanceId);
-            job.setLastUpdated(DateUtil.currentGMTTime());
-            _jobDao.update(jobId, job);
-
-            txt.commit();
-        } catch (Exception e) {
-            s_logger.error("Unexpected exception while updating async job-" + jobId + " attachment: ", e);
-            txt.rollback();
-        }
+        Transaction.execute(new TransactionCallbackNoReturn() {
+            @Override
+            public void doInTransactionWithoutResult(TransactionStatus status) {
+                AsyncJobVO job = _jobDao.createForUpdate();
+                job.setInstanceType(instanceType);
+                job.setInstanceId(instanceId);
+                job.setLastUpdated(DateUtil.currentGMTTime());
+                _jobDao.update(jobId, job);
+            }
+        });
     }
 
     @Override
@@ -493,15 +486,12 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
         return new ManagedContextRunnable() {
             @Override
             protected void runInContext() {
-                Transaction txn = null;
                 long runNumber = getJobRunNumber();
 
                 try {
                     //
                     // setup execution environment
                     //
-                    txn = Transaction.open(Transaction.CLOUD_DB);
-
                     try {
                         JmxUtil.registerMBean("AsyncJobManager", "Active Job " + job.getId(), new AsyncJobMBeanImpl(job));
                     } catch (Exception e) {
@@ -564,9 +554,6 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
                                 s_logger.trace("Unable to unregister job " + job.getId() + " to JMX monitoring due to exception " + ExceptionUtil.toString(e));
                         }
 
-                        if (txn != null)
-                            txn.close();
-
                         //
                         // clean execution environment
                         //
@@ -690,7 +677,6 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
         return new ManagedContextRunnable() {
             @Override
             protected void runInContext() {
-                Transaction txn = Transaction.open("AsyncJobManagerImpl.getHeartbeatTask");
                 try {
                     List<SyncQueueItemVO> l = _queueMgr.dequeueFromAny(getMsid(), MAX_ONETIME_SCHEDULE_SIZE);
                     if (l != null && l.size() > 0) {
@@ -711,12 +697,6 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
                     }
                 } catch (Throwable e) {
                     s_logger.error("Unexpected exception when trying to execute queue item, ", e);
-                } finally {
-                    try {
-                        txn.close();
-                    } catch (Throwable e) {
-                        s_logger.error("Unexpected exception", e);
-                    }
                 }
             }
         };
@@ -785,13 +765,15 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
     }
 
     @DB
-    protected void expungeAsyncJob(AsyncJobVO job) {
-        Transaction txn = Transaction.currentTxn();
-        txn.start();
-        _jobDao.expunge(job.getId());
-        //purge corresponding sync queue item
-        _queueMgr.purgeAsyncJobQueueItemId(job.getId());
-        txn.commit();
+    protected void expungeAsyncJob(final AsyncJobVO job) {
+        Transaction.execute(new TransactionCallbackNoReturn() {
+            @Override
+            public void doInTransactionWithoutResult(TransactionStatus status) {
+                _jobDao.expunge(job.getId());
+                //purge corresponding sync queue item
+                _queueMgr.purgeAsyncJobQueueItemId(job.getId());
+            }
+        });
     }
 
     private long getMsid() {
@@ -825,58 +807,60 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
             Collections.sort(result);
             Long[] ids = result.toArray(new Long[result.size()]);
 
-            SearchCriteria<AsyncJobVO> jobsSC = JobIdsSearch.create("ids", ids);
-            SearchCriteria<SyncQueueItemVO> queueItemsSC = QueueJobIdsSearch.create("contentIds", ids);
+            final SearchCriteria<AsyncJobVO> jobsSC = JobIdsSearch.create("ids", ids);
+            final SearchCriteria<SyncQueueItemVO> queueItemsSC = QueueJobIdsSearch.create("contentIds", ids);
 
-            Transaction txn = Transaction.currentTxn();
-            txn.start();
-            AsyncJobVO job = _jobDao.createForUpdate();
-            job.setPendingSignals(AsyncJob.Constants.SIGNAL_MASK_WAKEUP);
-            _jobDao.update(job, jobsSC);
+            Transaction.execute(new TransactionCallbackNoReturn() {
+                @Override
+                public void doInTransactionWithoutResult(TransactionStatus status) {
+                    AsyncJobVO job = _jobDao.createForUpdate();
+                    job.setPendingSignals(AsyncJob.Constants.SIGNAL_MASK_WAKEUP);
+                    _jobDao.update(job, jobsSC);
 
-            SyncQueueItemVO item = _queueItemDao.createForUpdate();
-            item.setLastProcessNumber(null);
-            item.setLastProcessMsid(null);
-            _queueItemDao.update(item, queueItemsSC);
-            txn.commit();
+                    SyncQueueItemVO item = _queueItemDao.createForUpdate();
+                    item.setLastProcessNumber(null);
+                    item.setLastProcessMsid(null);
+                    _queueItemDao.update(item, queueItemsSC);
+                }
+            });
         }
         return _joinMapDao.findJobsToWake(joinedJobId);
     }
 
     @DB
     protected List<Long> wakeupScan() {
-        Date cutDate = DateUtil.currentGMTTime();
-        Transaction txn = Transaction.currentTxn();
+        final Date cutDate = DateUtil.currentGMTTime();
 
         SearchCriteria<Long> sc = JoinJobTimeSearch.create();
         sc.setParameters("beginTime", cutDate);
         sc.setParameters("endTime", cutDate);
 
-        List<Long> result = _joinMapDao.customSearch(sc, null);
-
-        txn.start();
-        if (result.size() > 0) {
-            Collections.sort(result);
-            Long[] ids = result.toArray(new Long[result.size()]);
+        final List<Long> result = _joinMapDao.customSearch(sc, null);
 
-            AsyncJobVO job = _jobDao.createForUpdate();
-            job.setPendingSignals(AsyncJob.Constants.SIGNAL_MASK_WAKEUP);
+        return Transaction.execute(new TransactionCallback<List<Long>>() {
+            @Override
+            public List<Long> doInTransaction(TransactionStatus status) {
+                if (result.size() > 0) {
+                    Collections.sort(result);
+                    Long[] ids = result.toArray(new Long[result.size()]);
 
-            SearchCriteria<AsyncJobVO> sc2 = JobIdsSearch.create("ids", ids);
-            SearchCriteria<SyncQueueItemVO> queueItemsSC = QueueJobIdsSearch.create("contentIds", ids);
+                    AsyncJobVO job = _jobDao.createForUpdate();
+                    job.setPendingSignals(AsyncJob.Constants.SIGNAL_MASK_WAKEUP);
 
-            _jobDao.update(job, sc2);
+                    SearchCriteria<AsyncJobVO> sc2 = JobIdsSearch.create("ids", ids);
+                    SearchCriteria<SyncQueueItemVO> queueItemsSC = QueueJobIdsSearch.create("contentIds", ids);
 
-            SyncQueueItemVO item = _queueItemDao.createForUpdate();
-            item.setLastProcessNumber(null);
-            item.setLastProcessMsid(null);
-            _queueItemDao.update(item, queueItemsSC);
-        }
+                    _jobDao.update(job, sc2);
 
-        List<Long> wakupIds = _joinMapDao.findJobsToWakeBetween(cutDate);
-        txn.commit();
+                    SyncQueueItemVO item = _queueItemDao.createForUpdate();
+                    item.setLastProcessNumber(null);
+                    item.setLastProcessMsid(null);
+                    _queueItemDao.update(item, queueItemsSC);
+                }
 
-        return wakupIds;
+                return _joinMapDao.findJobsToWakeBetween(cutDate);
+            }
+        });
     }
 
     @Override
@@ -933,18 +917,18 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
 
     @Override
     public void onManagementNodeLeft(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
-        for (ManagementServerHost msHost : nodeList) {
-            Transaction txn = Transaction.open(Transaction.CLOUD_DB);
+        for (final ManagementServerHost msHost : nodeList) {
             try {
-                txn.start();
-                List<SyncQueueItemVO> items = _queueMgr.getActiveQueueItems(msHost.getId(), true);
-                cleanupPendingJobs(items);
-                _jobDao.resetJobProcess(msHost.getId(), ApiErrorCode.INTERNAL_ERROR.getHttpCode(), "job cancelled because of management server restart");
-                txn.commit();
+                Transaction.execute(new TransactionCallbackNoReturn() {
+                    @Override
+                    public void doInTransactionWithoutResult(TransactionStatus status) {
+                        List<SyncQueueItemVO> items = _queueMgr.getActiveQueueItems(msHost.getId(), true);
+                        cleanupPendingJobs(items);
+                        _jobDao.resetJobProcess(msHost.getId(), ApiErrorCode.INTERNAL_ERROR.getHttpCode(), "job cancelled because of management server restart");
+                    }
+                });
             } catch (Throwable e) {
                 s_logger.warn("Unexpected exception ", e);
-            } finally {
-                txn.close();
             }
         }
     }