You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by mc...@apache.org on 2014/09/17 00:24:42 UTC
git commit: updated refs/heads/master to a2d85c8
Repository: cloudstack
Updated Branches:
refs/heads/master cb4513379 -> a2d85c8ca
CLOUDSTACK-7566:Many jobs getting stuck in pending state and cloud is
unusable.
Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/a2d85c8c
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/a2d85c8c
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/a2d85c8c
Branch: refs/heads/master
Commit: a2d85c8cae5f603bbcfcd3659c1207f0bfe461a7
Parents: cb45133
Author: Min Chen <mi...@citrix.com>
Authored: Tue Sep 16 15:14:08 2014 -0700
Committer: Min Chen <mi...@citrix.com>
Committed: Tue Sep 16 15:14:08 2014 -0700
----------------------------------------------------------------------
.../framework/messagebus/MessageBusBase.java | 37 ++++++++++++++++----
.../jobs/impl/AsyncJobManagerImpl.java | 13 +++++++
2 files changed, 44 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a2d85c8c/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
----------------------------------------------------------------------
diff --git a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
index 9432da0..e8f9bce 100644
--- a/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
+++ b/framework/ipc/src/org/apache/cloudstack/framework/messagebus/MessageBusBase.java
@@ -26,6 +26,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.log4j.Logger;
+
import org.apache.cloudstack.framework.serializer.MessageSerializer;
public class MessageBusBase implements MessageBus {
@@ -36,6 +38,8 @@ public class MessageBusBase implements MessageBus {
private final SubscriptionNode _subscriberRoot;
private MessageSerializer _messageSerializer;
+ private static final Logger s_logger = Logger.getLogger(MessageBusBase.class);
+
public MessageBusBase() {
_gate = new Gate();
_pendingActions = new ArrayList<ActionRecord>();
@@ -58,6 +62,9 @@ public class MessageBusBase implements MessageBus {
assert (subject != null);
assert (subscriber != null);
if (_gate.enter()) {
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace("Enter gate in message bus subscribe");
+ }
try {
SubscriptionNode current = locate(subject, null, true);
assert (current != null);
@@ -75,6 +82,9 @@ public class MessageBusBase implements MessageBus {
@Override
public void unsubscribe(String subject, MessageSubscriber subscriber) {
if (_gate.enter()) {
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace("Enter gate in message bus unsubscribe");
+ }
try {
if (subject != null) {
SubscriptionNode current = locate(subject, null, false);
@@ -96,6 +106,9 @@ public class MessageBusBase implements MessageBus {
@Override
public void clearAll() {
if (_gate.enter()) {
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace("Enter gate in message bus clearAll");
+ }
try {
_subscriberRoot.clearAll();
doPrune();
@@ -112,6 +125,9 @@ public class MessageBusBase implements MessageBus {
@Override
public void prune() {
if (_gate.enter()) {
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace("Enter gate in message bus prune");
+ }
try {
doPrune();
} finally {
@@ -144,6 +160,9 @@ public class MessageBusBase implements MessageBus {
public void publish(String senderAddress, String subject, PublishScope scope, Object args) {
if (_gate.enter(true)) {
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace("Enter gate in message bus publish");
+ }
try {
List<SubscriptionNode> chainFromTop = new ArrayList<SubscriptionNode>();
SubscriptionNode current = locate(subject, chainFromTop, false);
@@ -309,14 +328,20 @@ public class MessageBusBase implements MessageBus {
public void leave() {
synchronized (this) {
if (_reentranceCount > 0) {
- assert (_gateOwner == Thread.currentThread());
+ try {
+ assert (_gateOwner == Thread.currentThread());
- onGateOpen();
- _reentranceCount--;
- assert (_reentranceCount == 0);
- _gateOwner = null;
+ onGateOpen();
+ } finally {
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace("Open gate of message bus");
+ }
+ _reentranceCount--;
+ assert (_reentranceCount == 0);
+ _gateOwner = null;
- notifyAll();
+ notifyAll();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a2d85c8c/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
index c28e87b..7d374da 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
@@ -236,10 +236,20 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
job.setResult(resultObject);
}
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Publish async job-" + jobId + " complete on message bus");
+ }
publishOnEventBus(job, "complete"); // publish before the instance type and ID are wiped out
+
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Wake up jobs related to job- " + jobId);
+ }
List<Long> wakeupList = Transaction.execute(new TransactionCallback<List<Long>>() {
@Override
public List<Long> doInTransaction(TransactionStatus status) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Update db status for job- " + jobId);
+ }
job.setCompleteMsid(getMsid());
job.setStatus(jobStatus);
job.setResultCode(resultCode);
@@ -253,6 +263,9 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
job.setLastUpdated(DateUtil.currentGMTTime());
_jobDao.update(jobId, job);
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Wake up jobs joined with job- " + jobId + " and disjoin all subjobs created from job- " + jobId);
+ }
List<Long> wakeupList = wakeupByJoinedJobCompletion(jobId);
_joinMapDao.disjoinAllJobs(jobId);