You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by da...@apache.org on 2013/10/16 18:50:25 UTC
[14/20] New Transaction API
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f62e28c1/framework/db/src/com/cloud/utils/db/TransactionLegacy.java
----------------------------------------------------------------------
diff --git a/framework/db/src/com/cloud/utils/db/TransactionLegacy.java b/framework/db/src/com/cloud/utils/db/TransactionLegacy.java
new file mode 100755
index 0000000..b191491
--- /dev/null
+++ b/framework/db/src/com/cloud/utils/db/TransactionLegacy.java
@@ -0,0 +1,1174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.utils.db;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.sql.DataSource;
+
+import org.apache.commons.dbcp.ConnectionFactory;
+import org.apache.commons.dbcp.DriverManagerConnectionFactory;
+import org.apache.commons.dbcp.PoolableConnectionFactory;
+import org.apache.commons.dbcp.PoolingDataSource;
+import org.apache.commons.pool.KeyedObjectPoolFactory;
+import org.apache.commons.pool.impl.GenericObjectPool;
+import org.apache.commons.pool.impl.StackKeyedObjectPoolFactory;
+import org.apache.log4j.Logger;
+import org.jasypt.encryption.pbe.StandardPBEStringEncryptor;
+import org.jasypt.properties.EncryptableProperties;
+
+import com.cloud.utils.Pair;
+import com.cloud.utils.PropertiesUtil;
+import com.cloud.utils.crypt.EncryptionSecretKeyChecker;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.utils.mgmt.JmxUtil;
+
+/**
+ * Transaction abstracts away the Connection object in JDBC. It allows the
+ * following things that the Connection object does not.
+ *
+ * 1. Transaction can be started at an entry point and whether the DB
+ * actions should be auto-commit or not determined at that point.
+ * 2. DB Connection is allocated only when it is needed.
+ * 3. Code does not need to know if a transaction has been started or not.
+ * It just starts/ends a transaction and we resolve it correctly with
+ * the previous actions.
+ *
+ * Note that this class is not synchronous but it doesn't need to be because
+ * it is stored with TLS and is one per thread. Use appropriately.
+ */
+public class TransactionLegacy {
+ private static final Logger s_logger = Logger.getLogger(Transaction.class.getName() + "." + "Transaction");
+ private static final Logger s_stmtLogger = Logger.getLogger(Transaction.class.getName() + "." + "Statement");
+ private static final Logger s_lockLogger = Logger.getLogger(Transaction.class.getName() + "." + "Lock");
+ private static final Logger s_connLogger = Logger.getLogger(Transaction.class.getName() + "." + "Connection");
+
+ private static final ThreadLocal<TransactionLegacy> tls = new ThreadLocal<TransactionLegacy>();
+ private static final String START_TXN = "start_txn";
+ private static final String CURRENT_TXN = "current_txn";
+ private static final String CREATE_TXN = "create_txn";
+ private static final String CREATE_CONN = "create_conn";
+ private static final String STATEMENT = "statement";
+ private static final String ATTACHMENT = "attachment";
+
+ public static final short CLOUD_DB = 0;
+ public static final short USAGE_DB = 1;
+ public static final short AWSAPI_DB = 2;
+ public static final short SIMULATOR_DB = 3;
+
+ public static final short CONNECTED_DB = -1;
+
+ private static AtomicLong s_id = new AtomicLong();
+ private static final TransactionMBeanImpl s_mbean = new TransactionMBeanImpl();
+ static {
+ try {
+ JmxUtil.registerMBean("Transaction", "Transaction", s_mbean);
+ } catch (Exception e) {
+ s_logger.error("Unable to register mbean for transaction", e);
+ }
+
+ /* FIXME: We need a better solution for this
+ * Initialize encryption if we need it for db.properties
+ */
+ EncryptionSecretKeyChecker enc = new EncryptionSecretKeyChecker();
+ enc.check();
+ }
+
+ private final LinkedList<StackElement> _stack;
+ private long _id;
+
+ private final LinkedList<Pair<String, Long>> _lockTimes = new LinkedList<Pair<String, Long>>();
+
+ private String _name;
+ private Connection _conn;
+ private boolean _txn;
+ private short _dbId;
+ private long _txnTime;
+ private Statement _stmt;
+ private String _creator;
+
+ private TransactionLegacy _prev = null;
+
+ public static TransactionLegacy currentTxn() {
+ TransactionLegacy txn = tls.get();
+ assert txn != null : "No Transaction on stack. Did you mark the method with @DB?";
+
+ assert checkAnnotation(3, txn) : "Did you even read the guide to use Transaction...IOW...other people's code? Try method can't be private. What about @DB? hmmm... could that be it? " + txn;
+ return txn;
+ }
+
+ public static TransactionLegacy open(final short databaseId) {
+ String name = buildName();
+ if (name == null) {
+ name = CURRENT_TXN;
+ }
+ return open(name, databaseId, true);
+ }
+
+ //
+ // Usage of this transaction setup should be limited, it will always open a new transaction context regardless of whether or not there is other
+ // transaction context in the stack. It is used in special use cases that we want to control DB connection explicitly and in the mean time utilize
+ // the existing DAO features
+ //
+ public void transitToUserManagedConnection(Connection conn) {
+ assert(_conn == null /*&& _stack.size() <= 1*/) : "Can't change to a user managed connection unless the stack is empty and the db connection is null, you may have forgotten to invoke transitToAutoManagedConnection to close out the DB connection: " + toString();
+ _conn = conn;
+ _dbId = CONNECTED_DB;
+ }
+
+ public void transitToAutoManagedConnection(short dbId) {
+ // assert(_stack.size() <= 1) : "Can't change to auto managed connection unless your stack is empty";
+ _dbId = dbId;
+ _conn = null;
+ }
+
+ public static TransactionLegacy open(final String name) {
+ return open(name, TransactionLegacy.CLOUD_DB, false);
+ }
+
+ public static TransactionLegacy open(final String name, final short databaseId, final boolean forceDbChange) {
+ TransactionLegacy txn = tls.get();
+ boolean isNew = false;
+ if (txn == null) {
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace("Creating the transaction: " + name);
+ }
+ txn = new TransactionLegacy(name, false, databaseId);
+ tls.set(txn);
+ isNew = true;
+ } else if (forceDbChange) {
+ final short currentDbId = txn.getDatabaseId();
+ if (currentDbId != databaseId) {
+ // we need to end the current transaction and switch databases
+ txn.close(txn.getName());
+
+ txn = new TransactionLegacy(name, false, databaseId);
+ tls.set(txn);
+ isNew = true;
+ }
+ }
+
+ txn.takeOver(name, false);
+ if (isNew) {
+ s_mbean.addTransaction(txn);
+ }
+ return txn;
+ }
+
+ protected StackElement peekInStack(Object obj) {
+ final Iterator<StackElement> it = _stack.iterator();
+ while (it.hasNext()) {
+ StackElement next = it.next();
+ if (next.type == obj) {
+ return next;
+ }
+ }
+ return null;
+ }
+
+ public void registerLock(String sql) {
+ if (_txn && s_lockLogger.isDebugEnabled()) {
+ Pair<String, Long> time = new Pair<String, Long>(sql, System.currentTimeMillis());
+ _lockTimes.add(time);
+ }
+ }
+
+ public boolean dbTxnStarted() {
+ return _txn;
+ }
+
+ public static Connection getStandaloneConnectionWithException() throws SQLException {
+ Connection conn = s_ds.getConnection();
+ if (s_connLogger.isTraceEnabled()) {
+ s_connLogger.trace("Retrieving a standalone connection: dbconn" + System.identityHashCode(conn));
+ }
+ return conn;
+ }
+
+ public static Connection getStandaloneConnection() {
+ try {
+ return getStandaloneConnectionWithException();
+ } catch (SQLException e) {
+ s_logger.error("Unexpected exception: ", e);
+ return null;
+ }
+ }
+
+ public static Connection getStandaloneUsageConnection() {
+ try {
+ Connection conn = s_usageDS.getConnection();
+ if (s_connLogger.isTraceEnabled()) {
+ s_connLogger.trace("Retrieving a standalone connection for usage: dbconn" + System.identityHashCode(conn));
+ }
+ return conn;
+ } catch (SQLException e) {
+ s_logger.warn("Unexpected exception: ", e);
+ return null;
+ }
+ }
+
+ public static Connection getStandaloneAwsapiConnection() {
+ try {
+ Connection conn = s_awsapiDS.getConnection();
+ if (s_connLogger.isTraceEnabled()) {
+ s_connLogger.trace("Retrieving a standalone connection for usage: dbconn" + System.identityHashCode(conn));
+ }
+ return conn;
+ } catch (SQLException e) {
+ s_logger.warn("Unexpected exception: ", e);
+ return null;
+ }
+ }
+
+ public static Connection getStandaloneSimulatorConnection() {
+ try {
+ Connection conn = s_simulatorDS.getConnection();
+ if (s_connLogger.isTraceEnabled()) {
+ s_connLogger.trace("Retrieving a standalone connection for simulator: dbconn" + System.identityHashCode(conn));
+ }
+ return conn;
+ } catch (SQLException e) {
+ s_logger.warn("Unexpected exception: ", e);
+ return null;
+ }
+ }
+
+ protected void attach(TransactionAttachment value) {
+ _stack.push(new StackElement(ATTACHMENT, value));
+ }
+
+ protected TransactionAttachment detach(String name) {
+ Iterator<StackElement> it = _stack.descendingIterator();
+ while (it.hasNext()) {
+ StackElement element = it.next();
+ if (element.type == ATTACHMENT) {
+ TransactionAttachment att = (TransactionAttachment)element.ref;
+ if (name.equals(att.getName())) {
+ it.remove();
+ return att;
+ }
+ }
+ }
+ assert false : "Are you sure you attached this: " + name;
+ return null;
+ }
+
+ public static void attachToTxn(TransactionAttachment value) {
+ TransactionLegacy txn = tls.get();
+ assert txn != null && txn.peekInStack(CURRENT_TXN) != null: "Come on....how can we attach something to the transaction if you haven't started it?";
+
+ txn.attach(value);
+ }
+
+ public static TransactionAttachment detachFromTxn(String name) {
+ TransactionLegacy txn = tls.get();
+ assert txn != null : "No Transaction in TLS";
+ return txn.detach(name);
+ }
+
+ protected static boolean checkAnnotation(int stack, TransactionLegacy txn) {
+ final StackTraceElement[] stacks = Thread.currentThread().getStackTrace();
+ StackElement se = txn.peekInStack(CURRENT_TXN);
+ if (se == null) {
+ return false;
+ }
+
+ StringBuffer sb = new StringBuffer();
+ for (; stack < stacks.length; stack++) {
+ String methodName = stacks[stack].getMethodName();
+ sb.append(" ").append(methodName);
+ if (methodName.equals(se.ref)){
+ return true;
+ }
+ }
+
+ // relax stack structure for several places that @DB required injection is not in place
+ s_logger.warn("Non-standard stack context that Transaction context is manaully placed into the calling chain. Stack chain: " + sb);
+ return true;
+ }
+
+ protected static String buildName() {
+ if (s_logger.isDebugEnabled()) {
+ final StackTraceElement[] stacks = Thread.currentThread().getStackTrace();
+ final StringBuilder str = new StringBuilder();
+ int i = 3, j = 3;
+ while (j < 15 && i < stacks.length) {
+ StackTraceElement element = stacks[i];
+ String filename = element.getFileName();
+ String method = element.getMethodName();
+ if ((filename != null && filename.equals("<generated>")) || (method != null && method.equals("invokeSuper"))) {
+ i++;
+ continue;
+ }
+
+ str.append("-").append(stacks[i].getClassName().substring(stacks[i].getClassName().lastIndexOf(".") + 1)).append(".").append(stacks[i].getMethodName()).append(":").append(stacks[i].getLineNumber());
+ j++;
+ i++;
+ }
+ return str.toString();
+ }
+
+ return "";
+ }
+
+ public TransactionLegacy(final String name, final boolean forLocking, final short databaseId) {
+ _name = name;
+ _conn = null;
+ _stack = new LinkedList<StackElement>();
+ _txn = false;
+ _dbId = databaseId;
+ _id = s_id.incrementAndGet();
+ _creator = Thread.currentThread().getName();
+ }
+
+ public String getCreator() {
+ return _creator;
+ }
+
+ public long getId() {
+ return _id;
+ }
+
+ public String getName() {
+ return _name;
+ }
+
+ public Short getDatabaseId() {
+ return _dbId;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder str = new StringBuilder((_name != null ? _name : ""));
+ str.append(" : ");
+ for (final StackElement se : _stack) {
+ if (se.type == CURRENT_TXN) {
+ str.append(se.ref).append(", ");
+ }
+ }
+
+ return str.toString();
+ }
+
+ protected void mark(final String name) {
+ _stack.push(new StackElement(CURRENT_TXN, name));
+ }
+
+ public boolean lock(final String name, final int timeoutSeconds) {
+ Merovingian2 lockMaster = Merovingian2.getLockMaster();
+ if (lockMaster == null) {
+ throw new CloudRuntimeException("There's no support for locking yet");
+ }
+ return lockMaster.acquire(name, timeoutSeconds);
+ }
+
+ public boolean release(final String name) {
+ Merovingian2 lockMaster = Merovingian2.getLockMaster();
+ if (lockMaster == null) {
+ throw new CloudRuntimeException("There's no support for locking yet");
+ }
+ return lockMaster.release(name);
+ }
+
+ public void start() {
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace("txn: start requested by: " + buildName());
+ }
+
+ _stack.push(new StackElement(START_TXN, null));
+
+ if (_txn) {
+ s_logger.trace("txn: has already been started.");
+ return;
+ }
+
+ _txn = true;
+
+ _txnTime = System.currentTimeMillis();
+ if (_conn != null) {
+ try {
+ s_logger.trace("txn: set auto commit to false");
+ _conn.setAutoCommit(false);
+ } catch (final SQLException e) {
+ s_logger.warn("Unable to set auto commit: ", e);
+ throw new CloudRuntimeException("Unable to set auto commit: ", e);
+ }
+ }
+ }
+
+ protected void closePreviousStatement() {
+ if (_stmt != null) {
+ try {
+ if (s_stmtLogger.isTraceEnabled()) {
+ s_stmtLogger.trace("Closing: " + _stmt.toString());
+ }
+ try {
+ ResultSet rs = _stmt.getResultSet();
+ if (rs != null && _stmt.getResultSetHoldability() != ResultSet.HOLD_CURSORS_OVER_COMMIT) {
+ rs.close();
+ }
+ } catch(SQLException e) {
+ s_stmtLogger.trace("Unable to close resultset");
+ }
+ _stmt.close();
+ } catch (final SQLException e) {
+ s_stmtLogger.trace("Unable to close statement: " + _stmt.toString());
+ } finally {
+ _stmt = null;
+ }
+ }
+ }
+
+ /**
+ * Prepares an auto close statement. The statement is closed automatically if it is
+ * retrieved with this method.
+ *
+ * @param sql sql String
+ * @return PreparedStatement
+ * @throws SQLException if problem with JDBC layer.
+ *
+ * @see java.sql.Connection
+ */
+ public PreparedStatement prepareAutoCloseStatement(final String sql) throws SQLException {
+ PreparedStatement stmt = prepareStatement(sql);
+ closePreviousStatement();
+ _stmt = stmt;
+ return stmt;
+ }
+
+ public PreparedStatement prepareStatement(final String sql) throws SQLException {
+ final Connection conn = getConnection();
+ final PreparedStatement pstmt = conn.prepareStatement(sql);
+ if (s_stmtLogger.isTraceEnabled()) {
+ s_stmtLogger.trace("Preparing: " + sql);
+ }
+ return pstmt;
+ }
+
+ /**
+ * Prepares an auto close statement. The statement is closed automatically if it is
+ * retrieved with this method.
+ *
+ * @param sql sql String
+ * @param autoGeneratedKeys keys that are generated
+ * @return PreparedStatement
+ * @throws SQLException if problem with JDBC layer.
+ *
+ * @see java.sql.Connection
+ */
+ public PreparedStatement prepareAutoCloseStatement(final String sql, final int autoGeneratedKeys) throws SQLException {
+ final Connection conn = getConnection();
+ final PreparedStatement pstmt = conn.prepareStatement(sql, autoGeneratedKeys);
+ if (s_stmtLogger.isTraceEnabled()) {
+ s_stmtLogger.trace("Preparing: " + sql);
+ }
+ closePreviousStatement();
+ _stmt = pstmt;
+ return pstmt;
+ }
+
+ /**
+ * Prepares an auto close statement. The statement is closed automatically if it is
+ * retrieved with this method.
+ *
+ * @param sql sql String
+ * @param columnNames names of the columns
+ * @return PreparedStatement
+ * @throws SQLException if problem with JDBC layer.
+ *
+ * @see java.sql.Connection
+ */
+ public PreparedStatement prepareAutoCloseStatement(final String sql, final String[] columnNames) throws SQLException {
+ final Connection conn = getConnection();
+ final PreparedStatement pstmt = conn.prepareStatement(sql, columnNames);
+ if (s_stmtLogger.isTraceEnabled()) {
+ s_stmtLogger.trace("Preparing: " + sql);
+ }
+ closePreviousStatement();
+ _stmt = pstmt;
+ return pstmt;
+ }
+
+ /**
+ * Prepares an auto close statement. The statement is closed automatically if it is
+ * retrieved with this method.
+ *
+ * @param sql sql String
+ * @return PreparedStatement
+ * @throws SQLException if problem with JDBC layer.
+ *
+ * @see java.sql.Connection
+ */
+ public PreparedStatement prepareAutoCloseStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+ final Connection conn = getConnection();
+ final PreparedStatement pstmt = conn.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
+ if (s_stmtLogger.isTraceEnabled()) {
+ s_stmtLogger.trace("Preparing: " + sql);
+ }
+ closePreviousStatement();
+ _stmt = pstmt;
+ return pstmt;
+ }
+
+ /**
+ * Returns the db connection.
+ *
+ * Note: that you can call getConnection() but beaware that
+ * all prepare statements from the Connection are not garbage
+ * collected!
+ *
+ * @return DB Connection but make sure you understand that
+ * you are responsible for closing the PreparedStatement.
+ * @throws SQLException
+ */
+ public Connection getConnection() throws SQLException {
+ if (_conn == null) {
+ switch (_dbId) {
+ case CLOUD_DB:
+ if(s_ds != null) {
+ _conn = s_ds.getConnection();
+ } else {
+ s_logger.warn("A static-initialized variable becomes null, process is dying?");
+ throw new CloudRuntimeException("Database is not initialized, process is dying?");
+ }
+ break;
+ case USAGE_DB:
+ if(s_usageDS != null) {
+ _conn = s_usageDS.getConnection();
+ } else {
+ s_logger.warn("A static-initialized variable becomes null, process is dying?");
+ throw new CloudRuntimeException("Database is not initialized, process is dying?");
+ }
+ break;
+ case AWSAPI_DB:
+ if(s_awsapiDS != null) {
+ _conn = s_awsapiDS.getConnection();
+ } else {
+ s_logger.warn("A static-initialized variable becomes null, process is dying?");
+ throw new CloudRuntimeException("Database is not initialized, process is dying?");
+ }
+ break;
+
+ case SIMULATOR_DB:
+ if(s_simulatorDS != null) {
+ _conn = s_simulatorDS.getConnection();
+ } else {
+ s_logger.warn("A static-initialized variable becomes null, process is dying?");
+ throw new CloudRuntimeException("Database is not initialized, process is dying?");
+ }
+ break;
+ default:
+
+ throw new CloudRuntimeException("No database selected for the transaction");
+ }
+ _conn.setAutoCommit(!_txn);
+
+ //
+ // MySQL default transaction isolation level is REPEATABLE READ,
+ // to reduce chances of DB deadlock, we will use READ COMMITED isolation level instead
+ // see http://dev.mysql.com/doc/refman/5.0/en/innodb-deadlocks.html
+ //
+ _stack.push(new StackElement(CREATE_CONN, null));
+ if (s_connLogger.isTraceEnabled()) {
+ s_connLogger.trace("Creating a DB connection with " + (_txn ? " txn: " : " no txn: ") + " for " + _dbId + ": dbconn" + System.identityHashCode(_conn) + ". Stack: " + buildName());
+ }
+ } else {
+ s_logger.trace("conn: Using existing DB connection");
+ }
+
+ return _conn;
+ }
+
+ protected boolean takeOver(final String name, final boolean create) {
+ if (_stack.size() != 0) {
+ if (!create) {
+ // If it is not a create transaction, then let's just use the current one.
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace("Using current transaction: " + toString());
+ }
+ mark(name);
+ return false;
+ }
+
+ final StackElement se = _stack.getFirst();
+ if (se.type == CREATE_TXN) {
+ // This create is called inside of another create. Which is ok?
+ // We will let that create be responsible for cleaning up.
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace("Create using current transaction: " + toString());
+ }
+ mark(name);
+ return false;
+ }
+
+ s_logger.warn("Encountered a transaction that has leaked. Cleaning up. " + toString());
+ cleanup();
+ }
+
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace("Took over the transaction: " + name);
+ }
+ _stack.push(new StackElement(create ? CREATE_TXN : CURRENT_TXN, name));
+ _name = name;
+ return true;
+ }
+
+ public void cleanup() {
+ closePreviousStatement();
+
+ removeUpTo(null, null);
+ if (_txn) {
+ rollbackTransaction();
+ }
+ _txn = false;
+ _name = null;
+
+ closeConnection();
+
+ _stack.clear();
+ Merovingian2 lockMaster = Merovingian2.getLockMaster();
+ if (lockMaster != null) {
+ lockMaster.cleanupThread();
+ }
+ }
+
+ public void close() {
+ removeUpTo(CURRENT_TXN, null);
+
+ if (_stack.size() == 0) {
+ s_logger.trace("Transaction is done");
+ cleanup();
+ }
+ }
+
+ /**
+ * close() is used by endTxn to close the connection. This method only
+ * closes the connection if the name is the same as what's stored.
+ *
+ * @param name
+ * @return true if this close actually closes the connection. false if not.
+ */
+ public boolean close(final String name) {
+ if (_name == null) { // Already cleaned up.
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace("Already cleaned up." + buildName());
+ }
+ return true;
+ }
+
+ if (!_name.equals(name)) {
+ close();
+ return false;
+ }
+
+ if (s_logger.isDebugEnabled() && _stack.size() > 2) {
+ s_logger.debug("Transaction is not closed properly: " + toString() + ". Called by " + buildName());
+ }
+
+ cleanup();
+
+ s_logger.trace("All done");
+ return true;
+ }
+
+ protected boolean hasTxnInStack() {
+ return peekInStack(START_TXN) != null;
+ }
+
+ protected void clearLockTimes() {
+ if (s_lockLogger.isDebugEnabled()) {
+ for (Pair<String, Long> time : _lockTimes) {
+ s_lockLogger.trace("SQL " + time.first() + " took " + (System.currentTimeMillis() - time.second()));
+ }
+ _lockTimes.clear();
+ }
+ }
+
+ public boolean commit() {
+ if (!_txn) {
+ s_logger.warn("txn: Commit called when it is not a transaction: " + buildName());
+ return false;
+ }
+
+ Iterator<StackElement> it = _stack.iterator();
+ while (it.hasNext()) {
+ StackElement st = it.next();
+ if (st.type == START_TXN) {
+ it.remove();
+ break;
+ }
+ }
+
+ if (hasTxnInStack()) {
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace("txn: Not committing because transaction started elsewhere: " + buildName() + " / " + toString());
+ }
+ return false;
+ }
+
+ _txn = false;
+ try {
+ if (_conn != null) {
+ _conn.commit();
+ s_logger.trace("txn: DB Changes committed. Time = " + (System.currentTimeMillis() - _txnTime));
+ clearLockTimes();
+ closeConnection();
+ }
+ return true;
+ } catch (final SQLException e) {
+ rollbackTransaction();
+ throw new CloudRuntimeException("Unable to commit or close the connection. ", e);
+ }
+ }
+
+ protected void closeConnection() {
+ closePreviousStatement();
+
+ if (_conn == null) {
+ return;
+ }
+
+ if (_txn) {
+ s_connLogger.trace("txn: Not closing DB connection because we're still in a transaction.");
+ return;
+ }
+
+ try {
+ // we should only close db connection when it is not user managed
+ if (this._dbId != CONNECTED_DB) {
+ if (s_connLogger.isTraceEnabled()) {
+ s_connLogger.trace("Closing DB connection: dbconn" + System.identityHashCode(_conn));
+ }
+ _conn.close();
+ _conn = null;
+ }
+
+ } catch (final SQLException e) {
+ s_logger.warn("Unable to close connection", e);
+ }
+ }
+
+ protected void removeUpTo(String type, Object ref) {
+ boolean rollback = false;
+ Iterator<StackElement> it = _stack.iterator();
+ while (it.hasNext()) {
+ StackElement item = it.next();
+
+ it.remove();
+
+ try {
+ if (item.type == type && (ref == null || item.ref == ref)) {
+ break;
+ }
+
+ if (item.type == CURRENT_TXN) {
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace("Releasing the current txn: " + (item.ref != null ? item.ref : ""));
+ }
+ } else if (item.type == CREATE_CONN) {
+ closeConnection();
+ } else if (item.type == START_TXN) {
+ if (item.ref == null) {
+ rollback = true;
+ } else {
+ try {
+ _conn.rollback((Savepoint)ref);
+ rollback = false;
+ } catch (final SQLException e) {
+ s_logger.warn("Unable to rollback Txn.", e);
+ }
+ }
+ } else if (item.type == STATEMENT) {
+ try {
+ if (s_stmtLogger.isTraceEnabled()) {
+ s_stmtLogger.trace("Closing: " + ref.toString());
+ }
+ Statement stmt = (Statement)ref;
+ try {
+ ResultSet rs = stmt.getResultSet();
+ if (rs != null) {
+ rs.close();
+ }
+ } catch(SQLException e) {
+ s_stmtLogger.trace("Unable to close resultset");
+ }
+ stmt.close();
+ } catch (final SQLException e) {
+ s_stmtLogger.trace("Unable to close statement: " + item);
+ }
+ } else if (item.type == ATTACHMENT) {
+ TransactionAttachment att = (TransactionAttachment)item.ref;
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace("Cleaning up " + att.getName());
+ }
+ att.cleanup();
+ }
+ } catch(Exception e) {
+ s_logger.error("Unable to clean up " + item, e);
+ }
+ }
+
+ if (rollback) {
+ rollback();
+ }
+ }
+
+ protected void rollbackTransaction() {
+ closePreviousStatement();
+ if (!_txn) {
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace("Rollback called for " + _name + " when there's no transaction: " + buildName());
+ }
+ return;
+ }
+ assert (!hasTxnInStack()) : "Who's rolling back transaction when there's still txn in stack?";
+ _txn = false;
+ try {
+ if (_conn != null) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Rolling back the transaction: Time = " + (System.currentTimeMillis() - _txnTime) + " Name = " + _name + "; called by " + buildName());
+ }
+ _conn.rollback();
+ }
+ clearLockTimes();
+ closeConnection();
+ } catch(final SQLException e) {
+ s_logger.warn("Unable to rollback", e);
+ }
+ }
+
+ protected void rollbackSavepoint(Savepoint sp) {
+ try {
+ if (_conn != null) {
+ _conn.rollback(sp);
+ }
+ } catch (SQLException e) {
+ s_logger.warn("Unable to rollback to savepoint " + sp);
+ }
+
+ if (!hasTxnInStack()) {
+ _txn = false;
+ closeConnection();
+ }
+ }
+
+ public void rollback() {
+ Iterator<StackElement> it = _stack.iterator();
+ while (it.hasNext()) {
+ StackElement st = it.next();
+ if (st.type == START_TXN) {
+ if (st.ref == null) {
+ it.remove();
+ } else {
+ rollback((Savepoint)st.ref);
+ return;
+ }
+ }
+ }
+
+ rollbackTransaction();
+ }
+
+ public Savepoint setSavepoint() throws SQLException {
+ _txn = true;
+ StackElement st = new StackElement(START_TXN, null);
+ _stack.push(st);
+ final Connection conn = getConnection();
+ final Savepoint sp = conn.setSavepoint();
+ st.ref = sp;
+
+ return sp;
+ }
+
+ public Savepoint setSavepoint(final String name) throws SQLException {
+ _txn = true;
+ StackElement st = new StackElement(START_TXN, null);
+ _stack.push(st);
+ final Connection conn = getConnection();
+ final Savepoint sp = conn.setSavepoint(name);
+ st.ref = sp;
+
+ return sp;
+ }
+
+ public void releaseSavepoint(final Savepoint sp) throws SQLException {
+ removeTxn(sp);
+ if (_conn != null) {
+ _conn.releaseSavepoint(sp);
+ }
+
+ if (!hasTxnInStack()) {
+ _txn = false;
+ closeConnection();
+ }
+ }
+
+ protected boolean hasSavepointInStack(Savepoint sp) {
+ Iterator<StackElement> it = _stack.iterator();
+ while (it.hasNext()) {
+ StackElement se = it.next();
+ if (se.type == START_TXN && se.ref == sp) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ protected void removeTxn(Savepoint sp) {
+ assert hasSavepointInStack(sp) : "Removing a save point that's not in the stack";
+
+ if (!hasSavepointInStack(sp)) {
+ return;
+ }
+
+ Iterator<StackElement> it = _stack.iterator();
+ while (it.hasNext()) {
+ StackElement se = it.next();
+ if (se.type == START_TXN) {
+ it.remove();
+ if (se.ref == sp) {
+ return;
+ }
+ }
+ }
+ }
+
+ public void rollback(final Savepoint sp) {
+ removeTxn(sp);
+
+ rollbackSavepoint(sp);
+ }
+
+ public Connection getCurrentConnection() {
+ return _conn;
+ }
+
+ public List<StackElement> getStack() {
+ return _stack;
+ }
+
+ protected TransactionLegacy() {
+ _name = null;
+ _conn = null;
+ _stack = null;
+ _txn = false;
+ _dbId = -1;
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ if (!(_conn == null && (_stack == null || _stack.size() == 0))) {
+ assert (false) : "Oh Alex oh alex...something is wrong with how we're doing this";
+ s_logger.error("Something went wrong that a transaction is orphaned before db connection is closed");
+ cleanup();
+ }
+ }
+
+ protected class StackElement {
+ public String type;
+ public Object ref;
+
+ public StackElement (String type, Object ref) {
+ this.type = type;
+ this.ref = ref;
+ }
+
+ @Override
+ public String toString() {
+ return type + "-" + ref;
+ }
+ }
+
+ private static DataSource s_ds;
+ private static DataSource s_usageDS;
+ private static DataSource s_awsapiDS;
+ private static DataSource s_simulatorDS;
+
+ static {
+ // Initialize with assumed db.properties file
+ initDataSource("db.properties");
+ }
+
+ public static void initDataSource(String propsFileName) {
+ try {
+ File dbPropsFile = PropertiesUtil.findConfigFile(propsFileName);
+ final Properties dbProps;
+ if (EncryptionSecretKeyChecker.useEncryption()) {
+ StandardPBEStringEncryptor encryptor = EncryptionSecretKeyChecker.getEncryptor();
+ dbProps = new EncryptableProperties(encryptor);
+ } else {
+ dbProps = new Properties();
+ }
+ try {
+ dbProps.load(new FileInputStream(dbPropsFile));
+ } catch (IOException e) {
+ s_logger.fatal("Unable to load db properties file, pl. check the classpath and file path configuration", e);
+ return;
+ } catch (NullPointerException e) {
+ s_logger.fatal("Unable to locate db properties file within classpath or absolute path: " + propsFileName);
+ return;
+ }
+
+ // FIXME: If params are missing...default them????
+ final int cloudMaxActive = Integer.parseInt(dbProps.getProperty("db.cloud.maxActive"));
+ final int cloudMaxIdle = Integer.parseInt(dbProps.getProperty("db.cloud.maxIdle"));
+ final long cloudMaxWait = Long.parseLong(dbProps.getProperty("db.cloud.maxWait"));
+ final String cloudUsername = dbProps.getProperty("db.cloud.username");
+ final String cloudPassword = dbProps.getProperty("db.cloud.password");
+ final String cloudHost = dbProps.getProperty("db.cloud.host");
+ final int cloudPort = Integer.parseInt(dbProps.getProperty("db.cloud.port"));
+ final String cloudDbName = dbProps.getProperty("db.cloud.name");
+ final boolean cloudAutoReconnect = Boolean.parseBoolean(dbProps.getProperty("db.cloud.autoReconnect"));
+ final String cloudValidationQuery = dbProps.getProperty("db.cloud.validationQuery");
+ final String cloudIsolationLevel = dbProps.getProperty("db.cloud.isolation.level");
+
+ int isolationLevel = Connection.TRANSACTION_READ_COMMITTED;
+ if (cloudIsolationLevel == null) {
+ isolationLevel = Connection.TRANSACTION_READ_COMMITTED;
+ } else if (cloudIsolationLevel.equalsIgnoreCase("readcommitted")) {
+ isolationLevel = Connection.TRANSACTION_READ_COMMITTED;
+ } else if (cloudIsolationLevel.equalsIgnoreCase("repeatableread")) {
+ isolationLevel = Connection.TRANSACTION_REPEATABLE_READ;
+ } else if (cloudIsolationLevel.equalsIgnoreCase("serializable")) {
+ isolationLevel = Connection.TRANSACTION_SERIALIZABLE;
+ } else if (cloudIsolationLevel.equalsIgnoreCase("readuncommitted")) {
+ isolationLevel = Connection.TRANSACTION_READ_UNCOMMITTED;
+ } else {
+ s_logger.warn("Unknown isolation level " + cloudIsolationLevel + ". Using read uncommitted");
+ }
+
+ final boolean cloudTestOnBorrow = Boolean.parseBoolean(dbProps.getProperty("db.cloud.testOnBorrow"));
+ final boolean cloudTestWhileIdle = Boolean.parseBoolean(dbProps.getProperty("db.cloud.testWhileIdle"));
+ final long cloudTimeBtwEvictionRunsMillis = Long.parseLong(dbProps.getProperty("db.cloud.timeBetweenEvictionRunsMillis"));
+ final long cloudMinEvcitableIdleTimeMillis = Long.parseLong(dbProps.getProperty("db.cloud.minEvictableIdleTimeMillis"));
+ final boolean cloudPoolPreparedStatements = Boolean.parseBoolean(dbProps.getProperty("db.cloud.poolPreparedStatements"));
+ final String url = dbProps.getProperty("db.cloud.url.params");
+
+ final boolean useSSL = Boolean.parseBoolean(dbProps.getProperty("db.cloud.useSSL"));
+ if (useSSL) {
+ System.setProperty("javax.net.ssl.keyStore", dbProps.getProperty("db.cloud.keyStore"));
+ System.setProperty("javax.net.ssl.keyStorePassword", dbProps.getProperty("db.cloud.keyStorePassword"));
+ System.setProperty("javax.net.ssl.trustStore", dbProps.getProperty("db.cloud.trustStore"));
+ System.setProperty("javax.net.ssl.trustStorePassword", dbProps.getProperty("db.cloud.trustStorePassword"));
+ }
+
+ final GenericObjectPool cloudConnectionPool = new GenericObjectPool(null, cloudMaxActive, GenericObjectPool.DEFAULT_WHEN_EXHAUSTED_ACTION,
+ cloudMaxWait, cloudMaxIdle, cloudTestOnBorrow, false, cloudTimeBtwEvictionRunsMillis, 1, cloudMinEvcitableIdleTimeMillis, cloudTestWhileIdle);
+
+ final ConnectionFactory cloudConnectionFactory = new DriverManagerConnectionFactory("jdbc:mysql://" + cloudHost + ":" + cloudPort + "/" + cloudDbName +
+ "?autoReconnect=" + cloudAutoReconnect + (url != null ? "&" + url : "") + (useSSL ? "&useSSL=true" : ""), cloudUsername, cloudPassword);
+
+ final KeyedObjectPoolFactory poolableObjFactory = (cloudPoolPreparedStatements ? new StackKeyedObjectPoolFactory() : null);
+
+ final PoolableConnectionFactory cloudPoolableConnectionFactory = new PoolableConnectionFactory(cloudConnectionFactory, cloudConnectionPool, poolableObjFactory,
+ cloudValidationQuery, false, false, isolationLevel);
+
+ // Default Data Source for CloudStack
+ s_ds = new PoolingDataSource(cloudPoolableConnectionFactory.getPool());
+
+ // Configure the usage db
+ final int usageMaxActive = Integer.parseInt(dbProps.getProperty("db.usage.maxActive"));
+ final int usageMaxIdle = Integer.parseInt(dbProps.getProperty("db.usage.maxIdle"));
+ final long usageMaxWait = Long.parseLong(dbProps.getProperty("db.usage.maxWait"));
+ final String usageUsername = dbProps.getProperty("db.usage.username");
+ final String usagePassword = dbProps.getProperty("db.usage.password");
+ final String usageHost = dbProps.getProperty("db.usage.host");
+ final int usagePort = Integer.parseInt(dbProps.getProperty("db.usage.port"));
+ final String usageDbName = dbProps.getProperty("db.usage.name");
+ final boolean usageAutoReconnect = Boolean.parseBoolean(dbProps.getProperty("db.usage.autoReconnect"));
+ final String usageUrl = dbProps.getProperty("db.usage.url.params");
+
+ final GenericObjectPool usageConnectionPool = new GenericObjectPool(null, usageMaxActive, GenericObjectPool.DEFAULT_WHEN_EXHAUSTED_ACTION,
+ usageMaxWait, usageMaxIdle);
+
+ final ConnectionFactory usageConnectionFactory = new DriverManagerConnectionFactory("jdbc:mysql://" + usageHost + ":" + usagePort + "/" + usageDbName +
+ "?autoReconnect=" + usageAutoReconnect + (usageUrl != null ? "&" + usageUrl : ""), usageUsername, usagePassword);
+
+ final PoolableConnectionFactory usagePoolableConnectionFactory = new PoolableConnectionFactory(usageConnectionFactory, usageConnectionPool,
+ new StackKeyedObjectPoolFactory(), null, false, false);
+
+ // Data Source for usage server
+ s_usageDS = new PoolingDataSource(usagePoolableConnectionFactory.getPool());
+
+ // Configure awsapi db
+ final String awsapiDbName = dbProps.getProperty("db.awsapi.name");
+ final GenericObjectPool awsapiConnectionPool = new GenericObjectPool(null, usageMaxActive, GenericObjectPool.DEFAULT_WHEN_EXHAUSTED_ACTION,
+ usageMaxWait, usageMaxIdle);
+ final ConnectionFactory awsapiConnectionFactory = new DriverManagerConnectionFactory("jdbc:mysql://" + cloudHost + ":" + cloudPort + "/" + awsapiDbName +
+ "?autoReconnect=" + usageAutoReconnect, cloudUsername, cloudPassword);
+ final PoolableConnectionFactory awsapiPoolableConnectionFactory = new PoolableConnectionFactory(awsapiConnectionFactory, awsapiConnectionPool,
+ new StackKeyedObjectPoolFactory(), null, false, false);
+
+ // Data Source for awsapi
+ s_awsapiDS = new PoolingDataSource(awsapiPoolableConnectionFactory.getPool());
+
+ try {
+ // Configure the simulator db
+ final int simulatorMaxActive = Integer.parseInt(dbProps.getProperty("db.simulator.maxActive"));
+ final int simulatorMaxIdle = Integer.parseInt(dbProps.getProperty("db.simulator.maxIdle"));
+ final long simulatorMaxWait = Long.parseLong(dbProps.getProperty("db.simulator.maxWait"));
+ final String simulatorUsername = dbProps.getProperty("db.simulator.username");
+ final String simulatorPassword = dbProps.getProperty("db.simulator.password");
+ final String simulatorHost = dbProps.getProperty("db.simulator.host");
+ final int simulatorPort = Integer.parseInt(dbProps.getProperty("db.simulator.port"));
+ final String simulatorDbName = dbProps.getProperty("db.simulator.name");
+ final boolean simulatorAutoReconnect = Boolean.parseBoolean(dbProps.getProperty("db.simulator.autoReconnect"));
+
+ final GenericObjectPool simulatorConnectionPool = new GenericObjectPool(null, simulatorMaxActive, GenericObjectPool.DEFAULT_WHEN_EXHAUSTED_ACTION,
+ simulatorMaxWait, simulatorMaxIdle);
+
+ final ConnectionFactory simulatorConnectionFactory = new DriverManagerConnectionFactory("jdbc:mysql://" + simulatorHost + ":" + simulatorPort + "/" + simulatorDbName +
+ "?autoReconnect=" + simulatorAutoReconnect, simulatorUsername, simulatorPassword);
+
+ final PoolableConnectionFactory simulatorPoolableConnectionFactory = new PoolableConnectionFactory(simulatorConnectionFactory, simulatorConnectionPool,
+ new StackKeyedObjectPoolFactory(), null, false, false);
+ s_simulatorDS = new PoolingDataSource(simulatorPoolableConnectionFactory.getPool());
+ } catch (Exception e) {
+ s_logger.debug("Simulator DB properties are not available. Not initializing simulator DS");
+ }
+ } catch (final Exception e) {
+ s_ds = getDefaultDataSource("cloud");
+ s_usageDS = getDefaultDataSource("cloud_usage");
+ s_simulatorDS = getDefaultDataSource("cloud_simulator");
+ s_logger.warn("Unable to load db configuration, using defaults with 5 connections. Falling back on assumed datasource on localhost:3306 using username:password=cloud:cloud. Please check your configuration", e);
+ }
+ }
+
+ private static DataSource getDefaultDataSource(final String database) {
+ final GenericObjectPool connectionPool = new GenericObjectPool(null, 5);
+ final ConnectionFactory connectionFactory = new DriverManagerConnectionFactory(
+ "jdbc:mysql://localhost:3306/" + database, "cloud", "cloud");
+ final PoolableConnectionFactory poolableConnectionFactory = new PoolableConnectionFactory(
+ connectionFactory, connectionPool, null, null, false, true);
+ return new PoolingDataSource(
+ /* connectionPool */poolableConnectionFactory.getPool());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f62e28c1/framework/db/src/com/cloud/utils/db/TransactionMBeanImpl.java
----------------------------------------------------------------------
diff --git a/framework/db/src/com/cloud/utils/db/TransactionMBeanImpl.java b/framework/db/src/com/cloud/utils/db/TransactionMBeanImpl.java
index d51a9bd..73511b1 100644
--- a/framework/db/src/com/cloud/utils/db/TransactionMBeanImpl.java
+++ b/framework/db/src/com/cloud/utils/db/TransactionMBeanImpl.java
@@ -25,21 +25,21 @@ import java.util.concurrent.ConcurrentHashMap;
import javax.management.StandardMBean;
-import com.cloud.utils.db.Transaction.StackElement;
+import com.cloud.utils.db.TransactionLegacy.StackElement;
public class TransactionMBeanImpl extends StandardMBean implements TransactionMBean {
- Map<Long, Transaction> _txns = new ConcurrentHashMap<Long, Transaction>();
+ Map<Long, TransactionLegacy> _txns = new ConcurrentHashMap<Long, TransactionLegacy>();
public TransactionMBeanImpl() {
super(TransactionMBean.class, false);
}
- public void addTransaction(Transaction txn) {
+ public void addTransaction(TransactionLegacy txn) {
_txns.put(txn.getId(), txn);
}
- public void removeTransaction(Transaction txn) {
+ public void removeTransaction(TransactionLegacy txn) {
_txns.remove(txn.getId());
}
@@ -53,7 +53,7 @@ public class TransactionMBeanImpl extends StandardMBean implements TransactionMB
int[] count = new int[2];
count[0] = 0;
count[1] = 0;
- for (Transaction txn : _txns.values()) {
+ for (TransactionLegacy txn : _txns.values()) {
if (txn.getStack().size() > 0) {
count[0]++;
}
@@ -67,7 +67,7 @@ public class TransactionMBeanImpl extends StandardMBean implements TransactionMB
@Override
public List<Map<String, String>> getTransactions() {
ArrayList<Map<String, String>> txns = new ArrayList<Map<String, String>>();
- for (Transaction info : _txns.values()) {
+ for (TransactionLegacy info : _txns.values()) {
txns.add(toMap(info));
}
return txns;
@@ -76,7 +76,7 @@ public class TransactionMBeanImpl extends StandardMBean implements TransactionMB
@Override
public List<Map<String, String>> getActiveTransactions() {
ArrayList<Map<String, String>> txns = new ArrayList<Map<String, String>>();
- for (Transaction txn : _txns.values()) {
+ for (TransactionLegacy txn : _txns.values()) {
if (txn.getStack().size() > 0 || txn.getCurrentConnection() != null) {
txns.add(toMap(txn));
}
@@ -84,7 +84,7 @@ public class TransactionMBeanImpl extends StandardMBean implements TransactionMB
return txns;
}
- protected Map<String, String> toMap(Transaction txn) {
+ protected Map<String, String> toMap(TransactionLegacy txn) {
Map<String, String> map = new HashMap<String, String>();
map.put("name", txn.getName());
map.put("id", Long.toString(txn.getId()));
@@ -103,7 +103,7 @@ public class TransactionMBeanImpl extends StandardMBean implements TransactionMB
@Override
public List<Map<String, String>> getTransactionsWithDatabaseConnection() {
ArrayList<Map<String, String>> txns = new ArrayList<Map<String, String>>();
- for (Transaction txn : _txns.values()) {
+ for (TransactionLegacy txn : _txns.values()) {
if (txn.getCurrentConnection() != null) {
txns.add(toMap(txn));
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f62e28c1/framework/db/src/com/cloud/utils/db/TransactionStatus.java
----------------------------------------------------------------------
diff --git a/framework/db/src/com/cloud/utils/db/TransactionStatus.java b/framework/db/src/com/cloud/utils/db/TransactionStatus.java
new file mode 100644
index 0000000..a167797
--- /dev/null
+++ b/framework/db/src/com/cloud/utils/db/TransactionStatus.java
@@ -0,0 +1,7 @@
+package com.cloud.utils.db;
+
+/**
+ * Placeholder for possible future features
+ */
+public interface TransactionStatus {
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f62e28c1/framework/db/src/com/cloud/utils/db/TransactionWrappedExeception.java
----------------------------------------------------------------------
diff --git a/framework/db/src/com/cloud/utils/db/TransactionWrappedExeception.java b/framework/db/src/com/cloud/utils/db/TransactionWrappedExeception.java
new file mode 100644
index 0000000..70d4c16
--- /dev/null
+++ b/framework/db/src/com/cloud/utils/db/TransactionWrappedExeception.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.cloud.utils.db;
+
+import com.cloud.utils.exception.CloudRuntimeException;
+
+public class TransactionWrappedExeception extends CloudRuntimeException {
+
+ private static final long serialVersionUID = -3254037624055143300L;
+
+ Exception e;
+
+ public TransactionWrappedExeception(Exception e) {
+ this.e = e;
+ }
+
+ public Exception getWrapped() {
+ return e;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f62e28c1/framework/db/test/com/cloud/utils/db/DbTestDao.java
----------------------------------------------------------------------
diff --git a/framework/db/test/com/cloud/utils/db/DbTestDao.java b/framework/db/test/com/cloud/utils/db/DbTestDao.java
index 9530b3b..7db5ba8 100644
--- a/framework/db/test/com/cloud/utils/db/DbTestDao.java
+++ b/framework/db/test/com/cloud/utils/db/DbTestDao.java
@@ -29,7 +29,7 @@ public class DbTestDao extends GenericDaoBase<DbTestVO, Long> implements Generic
@DB
public void create(int fldInt, long fldLong, String fldString) {
- Transaction txn = Transaction.currentTxn();
+ TransactionLegacy txn = TransactionLegacy.currentTxn();
PreparedStatement pstmt = null;
try {
txn.start();
@@ -48,7 +48,7 @@ public class DbTestDao extends GenericDaoBase<DbTestVO, Long> implements Generic
@DB
public void update(int fldInt, long fldLong, String fldString) {
- Transaction txn = Transaction.currentTxn();
+ TransactionLegacy txn = TransactionLegacy.currentTxn();
PreparedStatement pstmt = null;
try {
txn.start();
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f62e28c1/framework/db/test/com/cloud/utils/db/DbTestUtils.java
----------------------------------------------------------------------
diff --git a/framework/db/test/com/cloud/utils/db/DbTestUtils.java b/framework/db/test/com/cloud/utils/db/DbTestUtils.java
index 11ae1aa..2458b8c 100644
--- a/framework/db/test/com/cloud/utils/db/DbTestUtils.java
+++ b/framework/db/test/com/cloud/utils/db/DbTestUtils.java
@@ -33,7 +33,7 @@ public class DbTestUtils {
throw new RuntimeException("Unable to clean the database because I can't find " + file);
}
- Connection conn = Transaction.getStandaloneConnection();
+ Connection conn = TransactionLegacy.getStandaloneConnection();
ScriptRunner runner = new ScriptRunner(conn, autoCommit, stopOnError);
FileReader reader;
@@ -63,7 +63,7 @@ public class DbTestUtils {
throw new RuntimeException("Unable to clean the database because I can't find " + file);
}
- Connection conn = Transaction.getStandaloneUsageConnection();
+ Connection conn = TransactionLegacy.getStandaloneUsageConnection();
ScriptRunner runner = new ScriptRunner(conn, autoCommit, stopOnError);
FileReader reader;
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f62e28c1/framework/db/test/com/cloud/utils/db/TransactionTest.java
----------------------------------------------------------------------
diff --git a/framework/db/test/com/cloud/utils/db/TransactionTest.java b/framework/db/test/com/cloud/utils/db/TransactionTest.java
index 101a533..92b2f36 100644
--- a/framework/db/test/com/cloud/utils/db/TransactionTest.java
+++ b/framework/db/test/com/cloud/utils/db/TransactionTest.java
@@ -41,7 +41,7 @@ public class TransactionTest {
Connection conn = null;
PreparedStatement pstmt = null;
try {
- conn = Transaction.getStandaloneConnection();
+ conn = TransactionLegacy.getStandaloneConnection();
pstmt = conn.prepareStatement("CREATE TABLE `cloud`.`test` ("
+ "`id` bigint unsigned NOT NULL UNIQUE AUTO_INCREMENT," + "`fld_int` int unsigned,"
@@ -75,27 +75,27 @@ public class TransactionTest {
*/
public void testUserManagedConnection() {
DbTestDao testDao = ComponentContext.inject(DbTestDao.class);
- Transaction txn = Transaction.open("SingleConnectionThread");
+ TransactionLegacy txn = TransactionLegacy.open("SingleConnectionThread");
Connection conn = null;
try {
- conn = Transaction.getStandaloneConnectionWithException();
+ conn = TransactionLegacy.getStandaloneConnectionWithException();
txn.transitToUserManagedConnection(conn);
// try two SQLs to make sure that they are using the same connection
// acquired above.
testDao.create(1, 1, "Record 1");
- Connection checkConn = Transaction.currentTxn().getConnection();
+ Connection checkConn = TransactionLegacy.currentTxn().getConnection();
if (checkConn != conn) {
Assert.fail("A new db connection is acquired instead of using old one after create sql");
}
testDao.update(2, 2, "Record 1");
- Connection checkConn2 = Transaction.currentTxn().getConnection();
+ Connection checkConn2 = TransactionLegacy.currentTxn().getConnection();
if (checkConn2 != conn) {
Assert.fail("A new db connection is acquired instead of using old one after update sql");
}
} catch (SQLException e) {
Assert.fail(e.getMessage());
} finally {
- txn.transitToAutoManagedConnection(Transaction.CLOUD_DB);
+ txn.transitToAutoManagedConnection(TransactionLegacy.CLOUD_DB);
txn.close();
if (conn != null) {
@@ -117,28 +117,28 @@ public class TransactionTest {
// acquire a db connection and keep it
Connection conn = null;
try {
- conn = Transaction.getStandaloneConnectionWithException();
+ conn = TransactionLegacy.getStandaloneConnectionWithException();
} catch (SQLException ex) {
throw new CloudRuntimeException("Problem with getting db connection", ex);
}
// start heartbeat loop, make sure that each loop still use the same
// connection
- Transaction txn = null;
+ TransactionLegacy txn = null;
for (int i = 0; i < 3; i++) {
- txn = Transaction.open("HeartbeatSimulator");
+ txn = TransactionLegacy.open("HeartbeatSimulator");
try {
txn.transitToUserManagedConnection(conn);
testDao.create(i, i, "Record " + i);
- Connection checkConn = Transaction.currentTxn().getConnection();
+ Connection checkConn = TransactionLegacy.currentTxn().getConnection();
if (checkConn != conn) {
Assert.fail("A new db connection is acquired instead of using old one in loop " + i);
}
} catch (SQLException e) {
Assert.fail(e.getMessage());
} finally {
- txn.transitToAutoManagedConnection(Transaction.CLOUD_DB);
+ txn.transitToAutoManagedConnection(TransactionLegacy.CLOUD_DB);
txn.close();
}
}
@@ -161,7 +161,7 @@ public class TransactionTest {
Connection conn = null;
PreparedStatement pstmt = null;
try {
- conn = Transaction.getStandaloneConnection();
+ conn = TransactionLegacy.getStandaloneConnection();
pstmt = conn.prepareStatement("truncate table `cloud`.`test`");
pstmt.execute();
@@ -189,7 +189,7 @@ public class TransactionTest {
Connection conn = null;
PreparedStatement pstmt = null;
try {
- conn = Transaction.getStandaloneConnection();
+ conn = TransactionLegacy.getStandaloneConnection();
pstmt = conn.prepareStatement("DROP TABLE IF EXISTS `cloud`.`test`");
pstmt.execute();
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f62e28c1/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
index fb3845c..ed161e7 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
@@ -32,7 +32,7 @@ import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.SearchCriteria.Op;
-import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.TransactionLegacy;
public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements AsyncJobDao {
private static final Logger s_logger = Logger.getLogger(AsyncJobDaoImpl.class.getName());
@@ -182,7 +182,7 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
+ ", job_result='" + jobResultMessage + "' where job_status=" + JobInfo.Status.IN_PROGRESS.ordinal()
+ " AND (job_executing_msid=? OR (job_executing_msid IS NULL AND job_init_msid=?))";
- Transaction txn = Transaction.currentTxn();
+ TransactionLegacy txn = TransactionLegacy.currentTxn();
PreparedStatement pstmt = null;
try {
pstmt = txn.prepareAutoCloseStatement(sql);
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f62e28c1/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
index 20d8ba6..d4ca0d7 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobJoinMapDaoImpl.java
@@ -34,7 +34,7 @@ import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.SearchCriteria.Op;
-import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.TransactionLegacy;
import com.cloud.utils.db.UpdateBuilder;
import com.cloud.utils.exception.CloudRuntimeException;
@@ -157,7 +157,7 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
//
// Date cutDate = DateUtil.currentGMTTime();
//
-// Transaction txn = Transaction.currentTxn();
+// TransactionLegacy txn = TransactionLegacy.currentTxn();
// PreparedStatement pstmt = null;
// try {
// txn.start();
@@ -213,7 +213,7 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
public List<Long> findJobsToWake(long joinedJobId) {
// TODO: We should fix this. We shouldn't be crossing daos in a dao code.
List<Long> standaloneList = new ArrayList<Long>();
- Transaction txn = Transaction.currentTxn();
+ TransactionLegacy txn = TransactionLegacy.currentTxn();
String sql = "SELECT job_id FROM async_job_join_map WHERE join_job_id = ? AND job_id NOT IN (SELECT content_id FROM sync_queue_item)";
try {
PreparedStatement pstmt = txn.prepareStatement(sql);
@@ -231,7 +231,7 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
@Override
public List<Long> findJobsToWakeBetween(Date cutDate) {
List<Long> standaloneList = new ArrayList<Long>();
- Transaction txn = Transaction.currentTxn();
+ TransactionLegacy txn = TransactionLegacy.currentTxn();
try {
String sql = "SELECT job_id FROM async_job_join_map WHERE next_wakeup < ? AND expiration > ? AND job_id NOT IN (SELECT content_id FROM sync_queue_item)";
PreparedStatement pstmt = txn.prepareStatement(sql);
@@ -260,7 +260,7 @@ public class AsyncJobJoinMapDaoImpl extends GenericDaoBase<AsyncJobJoinMapVO, Lo
// public List<Long> wakeupByJoinedJobCompletion(long joinedJobId) {
// List<Long> standaloneList = new ArrayList<Long>();
//
-// Transaction txn = Transaction.currentTxn();
+// TransactionLegacy txn = TransactionLegacy.currentTxn();
// PreparedStatement pstmt = null;
// try {
// txn.start();
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f62e28c1/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDaoImpl.java
index f7d9d72..01efc4e 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueDaoImpl.java
@@ -30,7 +30,7 @@ import com.cloud.utils.DateUtil;
import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
-import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.TransactionLegacy;
public class SyncQueueDaoImpl extends GenericDaoBase<SyncQueueVO, Long> implements SyncQueueDao {
private static final Logger s_logger = Logger.getLogger(SyncQueueDaoImpl.class.getName());
@@ -51,7 +51,7 @@ public class SyncQueueDaoImpl extends GenericDaoBase<SyncQueueVO, Long> implemen
String sql = "INSERT IGNORE INTO sync_queue(sync_objtype, sync_objid, created, last_updated)" +
" values(?, ?, ?, ?)";
- Transaction txn = Transaction.currentTxn();
+ TransactionLegacy txn = TransactionLegacy.currentTxn();
PreparedStatement pstmt = null;
try {
pstmt = txn.prepareAutoCloseStatement(sql);
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f62e28c1/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
index 0cd231f..2f04a7c 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/SyncQueueItemDaoImpl.java
@@ -37,7 +37,7 @@ import com.cloud.utils.db.GenericSearchBuilder;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.SearchCriteria.Op;
-import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.TransactionLegacy;
@DB
public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long> implements SyncQueueItemDao {
@@ -83,7 +83,7 @@ public class SyncQueueItemDaoImpl extends GenericDaoBase<SyncQueueItemVO, Long>
" ORDER BY i.id " +
" LIMIT 0, ?";
- Transaction txn = Transaction.currentTxn();
+ TransactionLegacy txn = TransactionLegacy.currentTxn();
PreparedStatement pstmt = null;
try {
pstmt = txn.prepareAutoCloseStatement(sql);
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/f62e28c1/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
index 93d50c1..ffc7b3a 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
@@ -70,8 +70,11 @@ import com.cloud.utils.db.GenericSearchBuilder;
import com.cloud.utils.db.GlobalLock;
import com.cloud.utils.db.SearchBuilder;
import com.cloud.utils.db.SearchCriteria;
+import com.cloud.utils.db.TransactionCallback;
+import com.cloud.utils.db.TransactionCallbackNoReturn;
import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.TransactionStatus;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.exception.ExceptionUtil;
import com.cloud.utils.mgmt.JmxUtil;
@@ -177,19 +180,22 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
@SuppressWarnings("unchecked")
@Override
@DB
- public long submitAsyncJob(AsyncJob job, String syncObjType, long syncObjId) {
- Transaction txt = Transaction.currentTxn();
+ public long submitAsyncJob(final AsyncJob job, final String syncObjType, final long syncObjId) {
try {
@SuppressWarnings("rawtypes")
- GenericDao dao = GenericDaoBase.getDao(job.getClass());
+ final GenericDao dao = GenericDaoBase.getDao(job.getClass());
- txt.start();
- job.setInitMsid(getMsid());
- dao.persist(job);
+ return Transaction.execute(new TransactionCallback<Long>() {
+ @Override
+ public Long doInTransaction(TransactionStatus status) {
+ job.setInitMsid(getMsid());
+ dao.persist(job);
+
+ syncAsyncJobExecution(job, syncObjType, syncObjId, 1);
- syncAsyncJobExecution(job, syncObjType, syncObjId, 1);
- txt.commit();
- return job.getId();
+ return job.getId();
+ }
+ });
} catch (Exception e) {
String errMsg = "Unable to schedule async job for command " + job.getCmd() + ", unexpected exception.";
s_logger.warn(errMsg, e);
@@ -199,123 +205,110 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
@Override
@DB
- public void completeAsyncJob(long jobId, Status jobStatus, int resultCode, String resultObject) {
+ public void completeAsyncJob(final long jobId, final Status jobStatus, final int resultCode, final String resultObject) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Complete async job-" + jobId + ", jobStatus: " + jobStatus + ", resultCode: " + resultCode + ", result: " + resultObject);
}
- Transaction txn = Transaction.currentTxn();
- try {
- txn.start();
- AsyncJobVO job = _jobDao.findById(jobId);
- if (job == null) {
- if (s_logger.isDebugEnabled()) {
- s_logger.debug("job-" + jobId + " no longer exists, we just log completion info here. " + jobStatus + ", resultCode: " + resultCode + ", result: " +
- resultObject);
- }
-
- txn.rollback();
- return;
+ final AsyncJobVO job = _jobDao.findById(jobId);
+ if (job == null) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("job-" + jobId + " no longer exists, we just log completion info here. " + jobStatus + ", resultCode: " + resultCode + ", result: " +
+ resultObject);
}
- if (job.getStatus() != JobInfo.Status.IN_PROGRESS) {
- if (s_logger.isDebugEnabled()) {
- s_logger.debug("job-" + jobId + " is already completed.");
- }
+ return;
+ }
- txn.rollback();
- return;
+ if (job.getStatus() != JobInfo.Status.IN_PROGRESS) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("job-" + jobId + " is already completed.");
}
- job.setCompleteMsid(getMsid());
- job.setStatus(jobStatus);
- job.setResultCode(resultCode);
+ return;
+ }
- // reset attached object
- job.setInstanceType(null);
- job.setInstanceId(null);
+ List<Long> wakeupList = Transaction.execute(new TransactionCallback<List<Long>>() {
+ @Override
+ public List<Long> doInTransaction(TransactionStatus status) {
+ job.setCompleteMsid(getMsid());
+ job.setStatus(jobStatus);
+ job.setResultCode(resultCode);
- if (resultObject != null) {
- job.setResult(resultObject);
- }
+ // reset attached object
+ job.setInstanceType(null);
+ job.setInstanceId(null);
- job.setLastUpdated(DateUtil.currentGMTTime());
- _jobDao.update(jobId, job);
+ if (resultObject != null) {
+ job.setResult(resultObject);
+ }
- List<Long> wakeupList = wakeupByJoinedJobCompletion(jobId);
- _joinMapDao.disjoinAllJobs(jobId);
+ job.setLastUpdated(DateUtil.currentGMTTime());
+ _jobDao.update(jobId, job);
- txn.commit();
+ List<Long> wakeupList = wakeupByJoinedJobCompletion(jobId);
+ _joinMapDao.disjoinAllJobs(jobId);
- for (Long id : wakeupList) {
- // TODO, we assume that all jobs in this category is API job only
- AsyncJobVO jobToWakeup = _jobDao.findById(id);
- if (jobToWakeup != null && (jobToWakeup.getPendingSignals() & AsyncJob.Constants.SIGNAL_MASK_WAKEUP) != 0)
- scheduleExecution(jobToWakeup, false);
+ return wakeupList;
}
+ });
- _messageBus.publish(null, AsyncJob.Topics.JOB_STATE, PublishScope.GLOBAL, jobId);
- } catch (Exception e) {
- s_logger.error("Unexpected exception while completing async job-" + jobId, e);
- txn.rollback();
+ for (Long id : wakeupList) {
+ // TODO, we assume that all jobs in this category is API job only
+ AsyncJobVO jobToWakeup = _jobDao.findById(id);
+ if (jobToWakeup != null && (jobToWakeup.getPendingSignals() & AsyncJob.Constants.SIGNAL_MASK_WAKEUP) != 0)
+ scheduleExecution(jobToWakeup, false);
}
+
+ _messageBus.publish(null, AsyncJob.Topics.JOB_STATE, PublishScope.GLOBAL, jobId);
}
@Override
@DB
- public void updateAsyncJobStatus(long jobId, int processStatus, String resultObject) {
+ public void updateAsyncJobStatus(final long jobId, final int processStatus, final String resultObject) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Update async-job progress, job-" + jobId + ", processStatus: " + processStatus + ", result: " + resultObject);
}
- Transaction txt = Transaction.currentTxn();
- try {
- txt.start();
- AsyncJobVO job = _jobDao.findById(jobId);
- if (job == null) {
- if (s_logger.isDebugEnabled()) {
- s_logger.debug("job-" + jobId + " no longer exists, we just log progress info here. progress status: " + processStatus);
- }
-
- txt.rollback();
- return;
+ final AsyncJobVO job = _jobDao.findById(jobId);
+ if (job == null) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("job-" + jobId + " no longer exists, we just log progress info here. progress status: " + processStatus);
}
- job.setProcessStatus(processStatus);
- if (resultObject != null) {
- job.setResult(resultObject);
- }
- job.setLastUpdated(DateUtil.currentGMTTime());
- _jobDao.update(jobId, job);
- txt.commit();
- } catch (Exception e) {
- s_logger.error("Unexpected exception while updating async job-" + jobId + " status: ", e);
- txt.rollback();
+ return;
}
+
+ Transaction.execute(new TransactionCallbackNoReturn() {
+ @Override
+ public void doInTransactionWithoutResult(TransactionStatus status) {
+ job.setProcessStatus(processStatus);
+ if (resultObject != null) {
+ job.setResult(resultObject);
+ }
+ job.setLastUpdated(DateUtil.currentGMTTime());
+ _jobDao.update(jobId, job);
+ }
+ });
}
@Override
@DB
- public void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId) {
+ public void updateAsyncJobAttachment(final long jobId, final String instanceType, final Long instanceId) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Update async-job attachment, job-" + jobId + ", instanceType: " + instanceType + ", instanceId: " + instanceId);
}
- Transaction txt = Transaction.currentTxn();
- try {
- txt.start();
-
- AsyncJobVO job = _jobDao.createForUpdate();
- job.setInstanceType(instanceType);
- job.setInstanceId(instanceId);
- job.setLastUpdated(DateUtil.currentGMTTime());
- _jobDao.update(jobId, job);
-
- txt.commit();
- } catch (Exception e) {
- s_logger.error("Unexpected exception while updating async job-" + jobId + " attachment: ", e);
- txt.rollback();
- }
+ Transaction.execute(new TransactionCallbackNoReturn() {
+ @Override
+ public void doInTransactionWithoutResult(TransactionStatus status) {
+ AsyncJobVO job = _jobDao.createForUpdate();
+ job.setInstanceType(instanceType);
+ job.setInstanceId(instanceId);
+ job.setLastUpdated(DateUtil.currentGMTTime());
+ _jobDao.update(jobId, job);
+ }
+ });
}
@Override
@@ -493,15 +486,12 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
return new ManagedContextRunnable() {
@Override
protected void runInContext() {
- Transaction txn = null;
long runNumber = getJobRunNumber();
try {
//
// setup execution environment
//
- txn = Transaction.open(Transaction.CLOUD_DB);
-
try {
JmxUtil.registerMBean("AsyncJobManager", "Active Job " + job.getId(), new AsyncJobMBeanImpl(job));
} catch (Exception e) {
@@ -564,9 +554,6 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
s_logger.trace("Unable to unregister job " + job.getId() + " to JMX monitoring due to exception " + ExceptionUtil.toString(e));
}
- if (txn != null)
- txn.close();
-
//
// clean execution environment
//
@@ -690,7 +677,6 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
return new ManagedContextRunnable() {
@Override
protected void runInContext() {
- Transaction txn = Transaction.open("AsyncJobManagerImpl.getHeartbeatTask");
try {
List<SyncQueueItemVO> l = _queueMgr.dequeueFromAny(getMsid(), MAX_ONETIME_SCHEDULE_SIZE);
if (l != null && l.size() > 0) {
@@ -711,12 +697,6 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
} catch (Throwable e) {
s_logger.error("Unexpected exception when trying to execute queue item, ", e);
- } finally {
- try {
- txn.close();
- } catch (Throwable e) {
- s_logger.error("Unexpected exception", e);
- }
}
}
};
@@ -785,13 +765,15 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
@DB
- protected void expungeAsyncJob(AsyncJobVO job) {
- Transaction txn = Transaction.currentTxn();
- txn.start();
- _jobDao.expunge(job.getId());
- //purge corresponding sync queue item
- _queueMgr.purgeAsyncJobQueueItemId(job.getId());
- txn.commit();
+ protected void expungeAsyncJob(final AsyncJobVO job) {
+ Transaction.execute(new TransactionCallbackNoReturn() {
+ @Override
+ public void doInTransactionWithoutResult(TransactionStatus status) {
+ _jobDao.expunge(job.getId());
+ //purge corresponding sync queue item
+ _queueMgr.purgeAsyncJobQueueItemId(job.getId());
+ }
+ });
}
private long getMsid() {
@@ -825,58 +807,60 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
Collections.sort(result);
Long[] ids = result.toArray(new Long[result.size()]);
- SearchCriteria<AsyncJobVO> jobsSC = JobIdsSearch.create("ids", ids);
- SearchCriteria<SyncQueueItemVO> queueItemsSC = QueueJobIdsSearch.create("contentIds", ids);
+ final SearchCriteria<AsyncJobVO> jobsSC = JobIdsSearch.create("ids", ids);
+ final SearchCriteria<SyncQueueItemVO> queueItemsSC = QueueJobIdsSearch.create("contentIds", ids);
- Transaction txn = Transaction.currentTxn();
- txn.start();
- AsyncJobVO job = _jobDao.createForUpdate();
- job.setPendingSignals(AsyncJob.Constants.SIGNAL_MASK_WAKEUP);
- _jobDao.update(job, jobsSC);
+ Transaction.execute(new TransactionCallbackNoReturn() {
+ @Override
+ public void doInTransactionWithoutResult(TransactionStatus status) {
+ AsyncJobVO job = _jobDao.createForUpdate();
+ job.setPendingSignals(AsyncJob.Constants.SIGNAL_MASK_WAKEUP);
+ _jobDao.update(job, jobsSC);
- SyncQueueItemVO item = _queueItemDao.createForUpdate();
- item.setLastProcessNumber(null);
- item.setLastProcessMsid(null);
- _queueItemDao.update(item, queueItemsSC);
- txn.commit();
+ SyncQueueItemVO item = _queueItemDao.createForUpdate();
+ item.setLastProcessNumber(null);
+ item.setLastProcessMsid(null);
+ _queueItemDao.update(item, queueItemsSC);
+ }
+ });
}
return _joinMapDao.findJobsToWake(joinedJobId);
}
@DB
protected List<Long> wakeupScan() {
- Date cutDate = DateUtil.currentGMTTime();
- Transaction txn = Transaction.currentTxn();
+ final Date cutDate = DateUtil.currentGMTTime();
SearchCriteria<Long> sc = JoinJobTimeSearch.create();
sc.setParameters("beginTime", cutDate);
sc.setParameters("endTime", cutDate);
- List<Long> result = _joinMapDao.customSearch(sc, null);
-
- txn.start();
- if (result.size() > 0) {
- Collections.sort(result);
- Long[] ids = result.toArray(new Long[result.size()]);
+ final List<Long> result = _joinMapDao.customSearch(sc, null);
- AsyncJobVO job = _jobDao.createForUpdate();
- job.setPendingSignals(AsyncJob.Constants.SIGNAL_MASK_WAKEUP);
+ return Transaction.execute(new TransactionCallback<List<Long>>() {
+ @Override
+ public List<Long> doInTransaction(TransactionStatus status) {
+ if (result.size() > 0) {
+ Collections.sort(result);
+ Long[] ids = result.toArray(new Long[result.size()]);
- SearchCriteria<AsyncJobVO> sc2 = JobIdsSearch.create("ids", ids);
- SearchCriteria<SyncQueueItemVO> queueItemsSC = QueueJobIdsSearch.create("contentIds", ids);
+ AsyncJobVO job = _jobDao.createForUpdate();
+ job.setPendingSignals(AsyncJob.Constants.SIGNAL_MASK_WAKEUP);
- _jobDao.update(job, sc2);
+ SearchCriteria<AsyncJobVO> sc2 = JobIdsSearch.create("ids", ids);
+ SearchCriteria<SyncQueueItemVO> queueItemsSC = QueueJobIdsSearch.create("contentIds", ids);
- SyncQueueItemVO item = _queueItemDao.createForUpdate();
- item.setLastProcessNumber(null);
- item.setLastProcessMsid(null);
- _queueItemDao.update(item, queueItemsSC);
- }
+ _jobDao.update(job, sc2);
- List<Long> wakupIds = _joinMapDao.findJobsToWakeBetween(cutDate);
- txn.commit();
+ SyncQueueItemVO item = _queueItemDao.createForUpdate();
+ item.setLastProcessNumber(null);
+ item.setLastProcessMsid(null);
+ _queueItemDao.update(item, queueItemsSC);
+ }
- return wakupIds;
+ return _joinMapDao.findJobsToWakeBetween(cutDate);
+ }
+ });
}
@Override
@@ -933,18 +917,18 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
@Override
public void onManagementNodeLeft(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
- for (ManagementServerHost msHost : nodeList) {
- Transaction txn = Transaction.open(Transaction.CLOUD_DB);
+ for (final ManagementServerHost msHost : nodeList) {
try {
- txn.start();
- List<SyncQueueItemVO> items = _queueMgr.getActiveQueueItems(msHost.getId(), true);
- cleanupPendingJobs(items);
- _jobDao.resetJobProcess(msHost.getId(), ApiErrorCode.INTERNAL_ERROR.getHttpCode(), "job cancelled because of management server restart");
- txn.commit();
+ Transaction.execute(new TransactionCallbackNoReturn() {
+ @Override
+ public void doInTransactionWithoutResult(TransactionStatus status) {
+ List<SyncQueueItemVO> items = _queueMgr.getActiveQueueItems(msHost.getId(), true);
+ cleanupPendingJobs(items);
+ _jobDao.resetJobProcess(msHost.getId(), ApiErrorCode.INTERNAL_ERROR.getHttpCode(), "job cancelled because of management server restart");
+ }
+ });
} catch (Throwable e) {
s_logger.warn("Unexpected exception ", e);
- } finally {
- txn.close();
}
}
}