You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ra...@apache.org on 2014/11/05 11:45:27 UTC
[33/50] [abbrv] git commit: updated refs/heads/master to 4c5f792
CLOUDSTACK-7832: Move some job db update and item purge to
completeAsyncJob transaction to avoid MySQL deadlock.
Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/6830cbc1
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/6830cbc1
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/6830cbc1
Branch: refs/heads/master
Commit: 6830cbc15ae70d51c555f7f19348f6d87d0b5391
Parents: 3760fde
Author: Min Chen <mi...@citrix.com>
Authored: Mon Nov 3 10:41:36 2014 -0800
Committer: Min Chen <mi...@citrix.com>
Committed: Mon Nov 3 11:18:52 2014 -0800
----------------------------------------------------------------------
.../db/src/com/cloud/utils/db/Transaction.java | 8 ++++++++
framework/ipc/pom.xml | 18 ++++++++++++++++++
.../framework/messagebus/MessageBusBase.java | 14 +++++++++++++-
.../framework/jobs/impl/AsyncJobManagerImpl.java | 11 ++++++-----
4 files changed, 45 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/6830cbc1/framework/db/src/com/cloud/utils/db/Transaction.java
----------------------------------------------------------------------
diff --git a/framework/db/src/com/cloud/utils/db/Transaction.java b/framework/db/src/com/cloud/utils/db/Transaction.java
index 471e0cf..dd91a96 100755
--- a/framework/db/src/com/cloud/utils/db/Transaction.java
+++ b/framework/db/src/com/cloud/utils/db/Transaction.java
@@ -18,11 +18,15 @@ package com.cloud.utils.db;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.log4j.Logger;
+
public class Transaction {
private final static AtomicLong counter = new AtomicLong(0);
private final static TransactionStatus STATUS = new TransactionStatus() {
};
+ private static final Logger s_logger = Logger.getLogger(Transaction.class);
+
@SuppressWarnings("deprecation")
public static <T, E extends Throwable> T execute(TransactionCallbackWithException<T, E> callback) throws E {
String name = "tx-" + counter.incrementAndGet();
@@ -33,6 +37,10 @@ public class Transaction {
}
TransactionLegacy txn = TransactionLegacy.open(name, databaseId, false);
try {
+// if (txn.dbTxnStarted()){
+// String warnMsg = "Potential Wrong Usage: TRANSACTION.EXECUTE IS WRAPPED INSIDE ANOTHER DB TRANSACTION!";
+// s_logger.warn(warnMsg, new CloudRuntimeException(warnMsg));
+// }
txn.start();
T result = callback.doInTransaction(STATUS);
txn.commit();
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/6830cbc1/framework/ipc/pom.xml
----------------------------------------------------------------------
diff --git a/framework/ipc/pom.xml b/framework/ipc/pom.xml
index 12b4a3d..09b0c41 100644
--- a/framework/ipc/pom.xml
+++ b/framework/ipc/pom.xml
@@ -39,4 +39,22 @@
<version>${project.version}</version>
</dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <skipTests>true</skipTests>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>integration-test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/6830cbc1/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
index e8f9bce..e3eeb7b 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
@@ -30,6 +30,9 @@ import org.apache.log4j.Logger;
import org.apache.cloudstack.framework.serializer.MessageSerializer;
+import com.cloud.utils.db.TransactionLegacy;
+import com.cloud.utils.exception.CloudRuntimeException;
+
public class MessageBusBase implements MessageBus {
private final Gate _gate;
@@ -158,7 +161,11 @@ public class MessageBusBase implements MessageBus {
@Override
public void publish(String senderAddress, String subject, PublishScope scope, Object args) {
-
+ // publish cannot be in DB transaction, which may hold DB lock too long, and we are guarding this here
+ if (!noDbTxn()){
+ String errMsg = "NO EVENT PUBLISH CAN BE WRAPPED WITHIN DB TRANSACTION!";
+ s_logger.error(errMsg, new CloudRuntimeException(errMsg));
+ }
if (_gate.enter(true)) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Enter gate in message bus publish");
@@ -256,6 +263,11 @@ public class MessageBusBase implements MessageBus {
}
}
+ private boolean noDbTxn() {
+ TransactionLegacy txn = TransactionLegacy.currentTxn();
+ return !txn.dbTxnStarted();
+ }
+
//
// Support inner classes
//
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/6830cbc1/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
index 04fab24..aab1683 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
@@ -258,6 +258,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
}
job.setLastUpdated(DateUtil.currentGMTTime());
+ job.setExecutingMsid(null);
_jobDao.update(jobId, job);
if (s_logger.isDebugEnabled()) {
@@ -266,6 +267,11 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
List<Long> wakeupList = wakeupByJoinedJobCompletion(jobId);
_joinMapDao.disjoinAllJobs(jobId);
+ // purge the job sync item from queue
+ if (job.getSyncSource() != null) {
+ _queueMgr.purgeItem(job.getSyncSource().getId());
+ }
+
return wakeupList;
}
});
@@ -527,12 +533,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
} finally {
// guard final clause as well
try {
- AsyncJobVO jobToUpdate = _jobDao.findById(job.getId());
- jobToUpdate.setExecutingMsid(null);
- _jobDao.update(job.getId(), jobToUpdate);
-
if (job.getSyncSource() != null) {
- _queueMgr.purgeItem(job.getSyncSource().getId());
checkQueue(job.getSyncSource().getQueueId());
}