You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ra...@apache.org on 2014/11/05 11:45:27 UTC

[33/50] [abbrv] git commit: updated refs/heads/master to 4c5f792

CLOUDSTACK-7832: Move some job db update and item purge to
    completeAsyncJob transaction to avoid MySQL deadlock.


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/6830cbc1
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/6830cbc1
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/6830cbc1

Branch: refs/heads/master
Commit: 6830cbc15ae70d51c555f7f19348f6d87d0b5391
Parents: 3760fde
Author: Min Chen <mi...@citrix.com>
Authored: Mon Nov 3 10:41:36 2014 -0800
Committer: Min Chen <mi...@citrix.com>
Committed: Mon Nov 3 11:18:52 2014 -0800

----------------------------------------------------------------------
 .../db/src/com/cloud/utils/db/Transaction.java    |  8 ++++++++
 framework/ipc/pom.xml                             | 18 ++++++++++++++++++
 .../framework/messagebus/MessageBusBase.java      | 14 +++++++++++++-
 .../framework/jobs/impl/AsyncJobManagerImpl.java  | 11 ++++++-----
 4 files changed, 45 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/6830cbc1/framework/db/src/com/cloud/utils/db/Transaction.java
----------------------------------------------------------------------
diff --git a/framework/db/src/com/cloud/utils/db/Transaction.java b/framework/db/src/com/cloud/utils/db/Transaction.java
index 471e0cf..dd91a96 100755
--- a/framework/db/src/com/cloud/utils/db/Transaction.java
+++ b/framework/db/src/com/cloud/utils/db/Transaction.java
@@ -18,11 +18,15 @@ package com.cloud.utils.db;
 
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.log4j.Logger;
+
 public class Transaction {
     private final static AtomicLong counter = new AtomicLong(0);
     private final static TransactionStatus STATUS = new TransactionStatus() {
     };
 
+    private static final Logger s_logger = Logger.getLogger(Transaction.class);
+
     @SuppressWarnings("deprecation")
     public static <T, E extends Throwable> T execute(TransactionCallbackWithException<T, E> callback) throws E {
         String name = "tx-" + counter.incrementAndGet();
@@ -33,6 +37,10 @@ public class Transaction {
         }
         TransactionLegacy txn = TransactionLegacy.open(name, databaseId, false);
         try {
+//            if (txn.dbTxnStarted()){
+//                String warnMsg = "Potential Wrong Usage: TRANSACTION.EXECUTE IS WRAPPED INSIDE ANOTHER DB TRANSACTION!";
+//                s_logger.warn(warnMsg, new CloudRuntimeException(warnMsg));
+//            }
             txn.start();
             T result = callback.doInTransaction(STATUS);
             txn.commit();

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/6830cbc1/framework/ipc/pom.xml
----------------------------------------------------------------------
diff --git a/framework/ipc/pom.xml b/framework/ipc/pom.xml
index 12b4a3d..09b0c41 100644
--- a/framework/ipc/pom.xml
+++ b/framework/ipc/pom.xml
@@ -39,4 +39,22 @@
       <version>${project.version}</version>
     </dependency>    
   </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <skipTests>true</skipTests>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>integration-test</phase>
+            <goals>
+              <goal>test</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>  
 </project>

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/6830cbc1/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
index e8f9bce..e3eeb7b 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
@@ -30,6 +30,9 @@ import org.apache.log4j.Logger;
 
 import org.apache.cloudstack.framework.serializer.MessageSerializer;
 
+import com.cloud.utils.db.TransactionLegacy;
+import com.cloud.utils.exception.CloudRuntimeException;
+
 public class MessageBusBase implements MessageBus {
 
     private final Gate _gate;
@@ -158,7 +161,11 @@ public class MessageBusBase implements MessageBus {
 
     @Override
     public void publish(String senderAddress, String subject, PublishScope scope, Object args) {
-
+        // publish cannot be in DB transaction, which may hold DB lock too long, and we are guarding this here
+        if (!noDbTxn()){
+            String errMsg = "NO EVENT PUBLISH CAN BE WRAPPED WITHIN DB TRANSACTION!";
+            s_logger.error(errMsg, new CloudRuntimeException(errMsg));
+        }
         if (_gate.enter(true)) {
             if (s_logger.isTraceEnabled()) {
                 s_logger.trace("Enter gate in message bus publish");
@@ -256,6 +263,11 @@ public class MessageBusBase implements MessageBus {
         }
     }
 
+    private boolean noDbTxn() {
+        TransactionLegacy txn = TransactionLegacy.currentTxn();
+        return !txn.dbTxnStarted();
+    }
+
     //
     // Support inner classes
     //

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/6830cbc1/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
index 04fab24..aab1683 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
@@ -258,6 +258,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
                 }
 
                 job.setLastUpdated(DateUtil.currentGMTTime());
+                job.setExecutingMsid(null);
                 _jobDao.update(jobId, job);
 
                 if (s_logger.isDebugEnabled()) {
@@ -266,6 +267,11 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
                 List<Long> wakeupList = wakeupByJoinedJobCompletion(jobId);
                 _joinMapDao.disjoinAllJobs(jobId);
 
+                // purge the job sync item from queue
+                if (job.getSyncSource() != null) {
+                    _queueMgr.purgeItem(job.getSyncSource().getId());
+                }
+
                 return wakeupList;
             }
         });
@@ -527,12 +533,7 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
                 } finally {
                     // guard final clause as well
                     try {
-                        AsyncJobVO jobToUpdate = _jobDao.findById(job.getId());
-                        jobToUpdate.setExecutingMsid(null);
-                        _jobDao.update(job.getId(), jobToUpdate);
-
                         if (job.getSyncSource() != null) {
-                            _queueMgr.purgeItem(job.getSyncSource().getId());
                             checkQueue(job.getSyncSource().getQueueId());
                         }