You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by mc...@apache.org on 2014/09/17 00:24:42 UTC

git commit: updated refs/heads/master to a2d85c8

Repository: cloudstack
Updated Branches:
  refs/heads/master cb4513379 -> a2d85c8ca


CLOUDSTACK-7566:Many jobs getting stuck in pending state and cloud is
unusable.

Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/a2d85c8c
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/a2d85c8c
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/a2d85c8c

Branch: refs/heads/master
Commit: a2d85c8cae5f603bbcfcd3659c1207f0bfe461a7
Parents: cb45133
Author: Min Chen <mi...@citrix.com>
Authored: Tue Sep 16 15:14:08 2014 -0700
Committer: Min Chen <mi...@citrix.com>
Committed: Tue Sep 16 15:14:08 2014 -0700

----------------------------------------------------------------------
 .../framework/messagebus/MessageBusBase.java    | 37 ++++++++++++++++----
 .../jobs/impl/AsyncJobManagerImpl.java          | 13 +++++++
 2 files changed, 44 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a2d85c8c/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
index 9432da0..e8f9bce 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
@@ -26,6 +26,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.log4j.Logger;
+
 import org.apache.cloudstack.framework.serializer.MessageSerializer;
 
 public class MessageBusBase implements MessageBus {
@@ -36,6 +38,8 @@ public class MessageBusBase implements MessageBus {
     private final SubscriptionNode _subscriberRoot;
     private MessageSerializer _messageSerializer;
 
+    private static final Logger s_logger = Logger.getLogger(MessageBusBase.class);
+
     public MessageBusBase() {
         _gate = new Gate();
         _pendingActions = new ArrayList<ActionRecord>();
@@ -58,6 +62,9 @@ public class MessageBusBase implements MessageBus {
         assert (subject != null);
         assert (subscriber != null);
         if (_gate.enter()) {
+            if (s_logger.isTraceEnabled()) {
+                s_logger.trace("Enter gate in message bus subscribe");
+            }
             try {
                 SubscriptionNode current = locate(subject, null, true);
                 assert (current != null);
@@ -75,6 +82,9 @@ public class MessageBusBase implements MessageBus {
     @Override
     public void unsubscribe(String subject, MessageSubscriber subscriber) {
         if (_gate.enter()) {
+            if (s_logger.isTraceEnabled()) {
+                s_logger.trace("Enter gate in message bus unsubscribe");
+            }
             try {
                 if (subject != null) {
                     SubscriptionNode current = locate(subject, null, false);
@@ -96,6 +106,9 @@ public class MessageBusBase implements MessageBus {
     @Override
     public void clearAll() {
         if (_gate.enter()) {
+            if (s_logger.isTraceEnabled()) {
+                s_logger.trace("Enter gate in message bus clearAll");
+            }
             try {
                 _subscriberRoot.clearAll();
                 doPrune();
@@ -112,6 +125,9 @@ public class MessageBusBase implements MessageBus {
     @Override
     public void prune() {
         if (_gate.enter()) {
+            if (s_logger.isTraceEnabled()) {
+                s_logger.trace("Enter gate in message bus prune");
+            }
             try {
                 doPrune();
             } finally {
@@ -144,6 +160,9 @@ public class MessageBusBase implements MessageBus {
     public void publish(String senderAddress, String subject, PublishScope scope, Object args) {
 
         if (_gate.enter(true)) {
+            if (s_logger.isTraceEnabled()) {
+                s_logger.trace("Enter gate in message bus publish");
+            }
             try {
                 List<SubscriptionNode> chainFromTop = new ArrayList<SubscriptionNode>();
                 SubscriptionNode current = locate(subject, chainFromTop, false);
@@ -309,14 +328,20 @@ public class MessageBusBase implements MessageBus {
         public void leave() {
             synchronized (this) {
                 if (_reentranceCount > 0) {
-                    assert (_gateOwner == Thread.currentThread());
+                    try {
+                        assert (_gateOwner == Thread.currentThread());
 
-                    onGateOpen();
-                    _reentranceCount--;
-                    assert (_reentranceCount == 0);
-                    _gateOwner = null;
+                        onGateOpen();
+                    } finally {
+                        if (s_logger.isTraceEnabled()) {
+                            s_logger.trace("Open gate of message bus");
+                        }
+                        _reentranceCount--;
+                        assert (_reentranceCount == 0);
+                        _gateOwner = null;
 
-                    notifyAll();
+                        notifyAll();
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a2d85c8c/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
index c28e87b..7d374da 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
@@ -236,10 +236,20 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
             job.setResult(resultObject);
         }
 
+        if (s_logger.isDebugEnabled()) {
+            s_logger.debug("Publish async job-" + jobId + " complete on message bus");
+        }
         publishOnEventBus(job, "complete"); // publish before the instance type and ID are wiped out
+
+        if (s_logger.isDebugEnabled()) {
+            s_logger.debug("Wake up jobs related to job- " + jobId);
+        }
         List<Long> wakeupList = Transaction.execute(new TransactionCallback<List<Long>>() {
             @Override
             public List<Long> doInTransaction(TransactionStatus status) {
+                if (s_logger.isDebugEnabled()) {
+                    s_logger.debug("Update db status for job- " + jobId);
+                }
                 job.setCompleteMsid(getMsid());
                 job.setStatus(jobStatus);
                 job.setResultCode(resultCode);
@@ -253,6 +263,9 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
                 job.setLastUpdated(DateUtil.currentGMTTime());
                 _jobDao.update(jobId, job);
 
+                if (s_logger.isDebugEnabled()) {
+                    s_logger.debug("Wake up jobs joined with job- " + jobId + " and disjoin all subjobs created from job- " + jobId);
+                }
                 List<Long> wakeupList = wakeupByJoinedJobCompletion(jobId);
                 _joinMapDao.disjoinAllJobs(jobId);