You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ah...@apache.org on 2013/06/18 02:24:51 UTC
[6/9] git commit: updated refs/heads/vmsync to 309f8da
Finally, we moved AsyncJobManagerImpl out of server package
Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/8f00c191
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/8f00c191
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/8f00c191
Branch: refs/heads/vmsync
Commit: 8f00c1919228069e1c4a8f9912048b8be45663b7
Parents: 3a074f3
Author: Alex Huang <al...@gmail.com>
Authored: Sat Jun 15 06:44:38 2013 -0700
Committer: Alex Huang <al...@gmail.com>
Committed: Mon Jun 17 17:04:05 2013 -0700
----------------------------------------------------------------------
client/tomcatconf/applicationContext.xml.in | 2 +-
.../jobs/impl/AsyncJobManagerImpl.java | 831 ++++++++++++++++++
.../com/cloud/async/AsyncJobManagerImpl.java | 841 -------------------
3 files changed, 832 insertions(+), 842 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/8f00c191/client/tomcatconf/applicationContext.xml.in
----------------------------------------------------------------------
diff --git a/client/tomcatconf/applicationContext.xml.in b/client/tomcatconf/applicationContext.xml.in
index 6b1dad6..d3b5369 100644
--- a/client/tomcatconf/applicationContext.xml.in
+++ b/client/tomcatconf/applicationContext.xml.in
@@ -817,7 +817,7 @@
<bean id="asyncJobJoinDaoImpl" class="com.cloud.api.query.dao.AsyncJobJoinDaoImpl" />
<bean id="asyncJobJournalDaoImpl" class="org.apache.cloudstack.framework.jobs.dao.AsyncJobJournalDaoImpl" />
<bean id="asyncJobJoinMapDaoImpl" class="org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDaoImpl" />
- <bean id="asyncJobManagerImpl" class="com.cloud.async.AsyncJobManagerImpl"/>
+ <bean id="asyncJobManagerImpl" class="org.apache.cloudstack.framework.jobs.impl.AsyncJobManagerImpl"/>
<bean id="asyncJobMonitor" class="org.apache.cloudstack.framework.jobs.impl.AsyncJobMonitor"/>
<bean id="syncQueueDaoImpl" class="org.apache.cloudstack.framework.jobs.dao.SyncQueueDaoImpl" />
<bean id="syncQueueItemDaoImpl" class="org.apache.cloudstack.framework.jobs.dao.SyncQueueItemDaoImpl" />
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/8f00c191/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
new file mode 100644
index 0000000..4dee7e9
--- /dev/null
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/impl/AsyncJobManagerImpl.java
@@ -0,0 +1,831 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.framework.jobs.impl;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+import javax.naming.ConfigurationException;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cloudstack.api.ApiErrorCode;
+import org.apache.cloudstack.config.ConfigDepot;
+import org.apache.cloudstack.config.ConfigKey;
+import org.apache.cloudstack.config.ConfigValue;
+import org.apache.cloudstack.config.Configurable;
+import org.apache.cloudstack.framework.jobs.AsyncJob;
+import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher;
+import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
+import org.apache.cloudstack.framework.jobs.AsyncJobManager;
+import org.apache.cloudstack.framework.jobs.dao.AsyncJobDao;
+import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao;
+import org.apache.cloudstack.framework.jobs.dao.AsyncJobJournalDao;
+import org.apache.cloudstack.framework.messagebus.MessageBus;
+import org.apache.cloudstack.framework.messagebus.MessageDetector;
+import org.apache.cloudstack.framework.messagebus.PublishScope;
+import org.apache.cloudstack.jobs.JobInfo;
+import org.apache.cloudstack.jobs.JobInfo.Status;
+
+import com.cloud.cluster.ClusterManagerListener;
+import com.cloud.cluster.ManagementServerHost;
+import com.cloud.cluster.ManagementServerNode;
+import com.cloud.utils.DateUtil;
+import com.cloud.utils.Predicate;
+import com.cloud.utils.PropertiesUtil;
+import com.cloud.utils.component.ManagerBase;
+import com.cloud.utils.concurrency.NamedThreadFactory;
+import com.cloud.utils.db.DB;
+import com.cloud.utils.db.GenericDao;
+import com.cloud.utils.db.GenericDaoBase;
+import com.cloud.utils.db.GlobalLock;
+import com.cloud.utils.db.Transaction;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.utils.exception.ExceptionUtil;
+import com.cloud.utils.mgmt.JmxUtil;
+
+public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, ClusterManagerListener, Configurable {
+ // Advanced
+ private static final ConfigKey<Long> JobExpireMinutes = new ConfigKey<Long>(Long.class, "job.expire.minutes", "Advanced", AsyncJobManager.class, "1440",
+ "Time (in minutes) for async-jobs to be kept in system", true, null);
+ private static final ConfigKey<Long> JobCancelThresholdMinutes = new ConfigKey<Long>(Long.class, "job.cancel.threshold.minutes", "Advanced", AsyncJobManager.class,
+ "60", "Time (in minutes) for async-jobs to be forcely cancelled if it has been in process for long", true, null);
+
+ private static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class);
+ private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds
+ // Although we may have detailed masks for each individual wakeup event, i.e.
+ // periodical timer, matched topic from message bus, it seems that we don't
+ // need to distinguish them to such level. Therefore, only one wakeup signal
+ // is defined
+ public static final int SIGNAL_MASK_WAKEUP = 1;
+
+ private static final int MAX_ONETIME_SCHEDULE_SIZE = 50;
+ private static final int HEARTBEAT_INTERVAL = 2000;
+ private static final int GC_INTERVAL = 10000; // 10 seconds
+
+ @Inject private SyncQueueManager _queueMgr;
+ @Inject private AsyncJobDao _jobDao;
+ @Inject private AsyncJobJournalDao _journalDao;
+ @Inject private AsyncJobJoinMapDao _joinMapDao;
+ @Inject private List<AsyncJobDispatcher> _jobDispatchers;
+ @Inject private MessageBus _messageBus;
+ @Inject private AsyncJobMonitor _jobMonitor;
+ @Inject
+ private ConfigDepot _configDepot;
+
+ private ConfigValue<Long> _jobExpireSeconds; // 1 day
+ private ConfigValue<Long> _jobCancelThresholdSeconds; // 1 hour (for cancelling the jobs blocking other jobs)
+
+ private volatile long _executionRunNumber = 1;
+
+ private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat"));
+ private ExecutorService _executor;
+
+ @Override
+ public ConfigKey<?>[] getConfigKeys() {
+ return new ConfigKey<?>[] {JobExpireMinutes, JobCancelThresholdMinutes};
+ }
+
+ @Override
+ public AsyncJobVO getAsyncJob(long jobId) {
+ return _jobDao.findById(jobId);
+ }
+
+ @Override
+ public List<AsyncJobVO> findInstancePendingAsyncJobs(String instanceType, Long accountId) {
+ return _jobDao.findInstancePendingAsyncJobs(instanceType, accountId);
+ }
+
+ @Override @DB
+ public AsyncJob getPseudoJob(long accountId, long userId) {
+ AsyncJobVO job = _jobDao.findPseudoJob(Thread.currentThread().getId(), getMsid());
+ if(job == null) {
+ job = new AsyncJobVO();
+ job.setAccountId(accountId);
+ job.setUserId(userId);
+ job.setInitMsid(getMsid());
+ job.setDispatcher(AsyncJobVO.JOB_DISPATCHER_PSEUDO);
+ job.setInstanceType(AsyncJobVO.PSEUDO_JOB_INSTANCE_TYPE);
+ job.setInstanceId(Thread.currentThread().getId());
+ _jobDao.persist(job);
+ }
+ return job;
+ }
+
+ @Override
+ public long submitAsyncJob(AsyncJob job) {
+ return submitAsyncJob(job, false);
+ }
+
+ @SuppressWarnings("unchecked")
+ @DB
+ public long submitAsyncJob(AsyncJob job, boolean scheduleJobExecutionInContext) {
+ @SuppressWarnings("rawtypes")
+ GenericDao dao = GenericDaoBase.getDao(job.getClass());
+ job.setInitMsid(getMsid());
+ job.setSyncSource(null); // no sync source originally
+ dao.persist(job);
+
+ scheduleExecution(job, scheduleJobExecutionInContext);
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("submit async job-" + job.getId() + ", details: " + job.toString());
+ }
+ return job.getId();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override @DB
+ public long submitAsyncJob(AsyncJob job, String syncObjType, long syncObjId) {
+ Transaction txt = Transaction.currentTxn();
+ try {
+ @SuppressWarnings("rawtypes")
+ GenericDao dao = GenericDaoBase.getDao(job.getClass());
+
+ txt.start();
+ job.setInitMsid(getMsid());
+ dao.persist(job);
+
+ syncAsyncJobExecution(job, syncObjType, syncObjId, 1);
+ txt.commit();
+ return job.getId();
+ } catch(Exception e) {
+ String errMsg = "Unable to schedule async job for command " + job.getCmd() + ", unexpected exception.";
+ s_logger.warn(errMsg, e);
+ throw new CloudRuntimeException(errMsg);
+ }
+ }
+
+ @Override @DB
+ public void completeAsyncJob(long jobId, Status jobStatus, int resultCode, String resultObject) {
+ if(s_logger.isDebugEnabled()) {
+ s_logger.debug("Complete async job-" + jobId + ", jobStatus: " + jobStatus +
+ ", resultCode: " + resultCode + ", result: " + resultObject);
+ }
+
+ Transaction txn = Transaction.currentTxn();
+ try {
+ txn.start();
+ AsyncJobVO job = _jobDao.findById(jobId);
+ if(job == null) {
+ if(s_logger.isDebugEnabled()) {
+ s_logger.debug("job-" + jobId + " no longer exists, we just log completion info here. " + jobStatus +
+ ", resultCode: " + resultCode + ", result: " + resultObject);
+ }
+
+ txn.rollback();
+ return;
+ }
+
+ if(job.getStatus() != JobInfo.Status.IN_PROGRESS) {
+ if(s_logger.isDebugEnabled()) {
+ s_logger.debug("job-" + jobId + " is already completed.");
+ }
+
+ txn.rollback();
+ return;
+ }
+
+ job.setCompleteMsid(getMsid());
+ job.setStatus(jobStatus);
+ job.setResultCode(resultCode);
+
+ // reset attached object
+ job.setInstanceType(null);
+ job.setInstanceId(null);
+
+ if (resultObject != null) {
+ job.setResult(resultObject);
+ }
+
+ job.setLastUpdated(DateUtil.currentGMTTime());
+ _jobDao.update(jobId, job);
+
+ List<Long> wakeupList = _joinMapDao.wakeupByJoinedJobCompletion(jobId);
+ _joinMapDao.disjoinAllJobs(jobId);
+
+ txn.commit();
+
+ for(Long id : wakeupList) {
+ // TODO, we assume that all jobs in this category is API job only
+ AsyncJobVO jobToWakeup = _jobDao.findById(id);
+ if (jobToWakeup != null && (jobToWakeup.getPendingSignals() & SIGNAL_MASK_WAKEUP) != 0)
+ scheduleExecution(jobToWakeup, false);
+ }
+
+ _messageBus.publish(null, AsyncJob.Topics.JOB_STATE, PublishScope.GLOBAL, jobId);
+ } catch(Exception e) {
+ s_logger.error("Unexpected exception while completing async job-" + jobId, e);
+ txn.rollback();
+ }
+ }
+
+ @Override @DB
+ public void updateAsyncJobStatus(long jobId, int processStatus, String resultObject) {
+ if(s_logger.isDebugEnabled()) {
+ s_logger.debug("Update async-job progress, job-" + jobId + ", processStatus: " + processStatus +
+ ", result: " + resultObject);
+ }
+
+ Transaction txt = Transaction.currentTxn();
+ try {
+ txt.start();
+ AsyncJobVO job = _jobDao.findById(jobId);
+ if(job == null) {
+ if(s_logger.isDebugEnabled()) {
+ s_logger.debug("job-" + jobId + " no longer exists, we just log progress info here. progress status: " + processStatus);
+ }
+
+ txt.rollback();
+ return;
+ }
+
+ job.setProcessStatus(processStatus);
+ if(resultObject != null) {
+ job.setResult(resultObject);
+ }
+ job.setLastUpdated(DateUtil.currentGMTTime());
+ _jobDao.update(jobId, job);
+ txt.commit();
+ } catch(Exception e) {
+ s_logger.error("Unexpected exception while updating async job-" + jobId + " status: ", e);
+ txt.rollback();
+ }
+ }
+
+ @Override @DB
+ public void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId) {
+ if(s_logger.isDebugEnabled()) {
+ s_logger.debug("Update async-job attachment, job-" + jobId + ", instanceType: " + instanceType +
+ ", instanceId: " + instanceId);
+ }
+
+ Transaction txt = Transaction.currentTxn();
+ try {
+ txt.start();
+
+ AsyncJobVO job = _jobDao.createForUpdate();
+ job.setInstanceType(instanceType);
+ job.setInstanceId(instanceId);
+ job.setLastUpdated(DateUtil.currentGMTTime());
+ _jobDao.update(jobId, job);
+
+ txt.commit();
+ } catch(Exception e) {
+ s_logger.error("Unexpected exception while updating async job-" + jobId + " attachment: ", e);
+ txt.rollback();
+ }
+ }
+
+ @Override @DB
+ public void logJobJournal(long jobId, AsyncJob.JournalType journalType, String
+ journalText, String journalObjJson) {
+ AsyncJobJournalVO journal = new AsyncJobJournalVO();
+ journal.setJobId(jobId);
+ journal.setJournalType(journalType);
+ journal.setJournalText(journalText);
+ journal.setJournalObjJsonString(journalObjJson);
+
+ _journalDao.persist(journal);
+ }
+
+ @Override @DB
+ public void joinJob(long jobId, long joinJobId) {
+ _joinMapDao.joinJob(jobId, joinJobId, getMsid(), 0, 0, null, null, null);
+ }
+
+ @Override @DB
+ public void joinJob(long jobId, long joinJobId, String wakeupHandler, String wakeupDispatcher,
+ String[] wakeupTopcisOnMessageBus, long wakeupIntervalInMilliSeconds, long timeoutInMilliSeconds) {
+
+ Long syncSourceId = null;
+ AsyncJobExecutionContext context = AsyncJobExecutionContext.getCurrentExecutionContext();
+ assert(context.getJob() != null);
+ if(context.getJob().getSyncSource() != null) {
+ syncSourceId = context.getJob().getSyncSource().getQueueId();
+ }
+
+ _joinMapDao.joinJob(jobId, joinJobId, getMsid(),
+ wakeupIntervalInMilliSeconds, timeoutInMilliSeconds,
+ syncSourceId, wakeupHandler, wakeupDispatcher);
+ }
+
+ @Override @DB
+ public void disjoinJob(long jobId, long joinedJobId) {
+ _joinMapDao.disjoinJob(jobId, joinedJobId);
+ }
+
+ @Override @DB
+ public void completeJoin(long joinJobId, JobInfo.Status joinStatus, String joinResult) {
+ _joinMapDao.completeJoin(joinJobId, joinStatus, joinResult, getMsid());
+ }
+
+ @Override
+ public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit) {
+ if(s_logger.isDebugEnabled()) {
+ s_logger.debug("Sync job-" + job.getId() + " execution on object " + syncObjType + "." + syncObjId);
+ }
+
+ SyncQueueVO queue = null;
+
+ // to deal with temporary DB exceptions like DB deadlock/Lock-wait time out cased rollbacks
+ // we retry five times until we throw an exception
+ Random random = new Random();
+
+ for(int i = 0; i < 5; i++) {
+ queue = _queueMgr.queue(syncObjType, syncObjId, SyncQueueItem.AsyncJobContentType, job.getId(), queueSizeLimit);
+ if(queue != null) {
+ break;
+ }
+
+ try {
+ Thread.sleep(1000 + random.nextInt(5000));
+ } catch (InterruptedException e) {
+ }
+ }
+
+ if (queue == null)
+ throw new CloudRuntimeException("Unable to insert queue item into database, DB is full?");
+ }
+
+ @Override
+ public AsyncJob queryJob(long jobId, boolean updatePollTime) {
+ AsyncJobVO job = _jobDao.findById(jobId);
+
+ if (updatePollTime) {
+ job.setLastPolled(DateUtil.currentGMTTime());
+ _jobDao.update(jobId, job);
+ }
+ return job;
+ }
+
+
+ private void scheduleExecution(final AsyncJobVO job) {
+ scheduleExecution(job, false);
+ }
+
+ private void scheduleExecution(final AsyncJob job, boolean executeInContext) {
+ Runnable runnable = getExecutorRunnable(job);
+ if (executeInContext) {
+ runnable.run();
+ } else {
+ _executor.submit(runnable);
+ }
+ }
+
+ private AsyncJobDispatcher getDispatcher(String dispatcherName) {
+ assert (dispatcherName != null && !dispatcherName.isEmpty()) : "Who's not setting the dispatcher when submitting a job? Who am I suppose to call if you do that!";
+
+ for (AsyncJobDispatcher dispatcher : _jobDispatchers) {
+ if (dispatcherName.equals(dispatcher.getName()))
+ return dispatcher;
+ }
+
+ throw new CloudRuntimeException("Unable to find dispatcher name: " + dispatcherName);
+ }
+
+ private AsyncJobDispatcher getWakeupDispatcher(AsyncJob job) {
+ if(_jobDispatchers != null) {
+ List<AsyncJobJoinMapVO> joinRecords = _joinMapDao.listJoinRecords(job.getId());
+ if(joinRecords.size() > 0) {
+ AsyncJobJoinMapVO joinRecord = joinRecords.get(0);
+ for(AsyncJobDispatcher dispatcher : _jobDispatchers) {
+ if(dispatcher.getName().equals(joinRecord.getWakeupDispatcher()))
+ return dispatcher;
+ }
+ } else {
+ s_logger.warn("job-" + job.getId() + " is scheduled for wakeup run, but there is no joining info anymore");
+ }
+ }
+ return null;
+ }
+
+ private long getJobRunNumber() {
+ synchronized(this) {
+ return _executionRunNumber++;
+ }
+ }
+
+ private Runnable getExecutorRunnable(final AsyncJob job) {
+ return new Runnable() {
+ @Override
+ public void run() {
+ Transaction txn = null;
+ long runNumber = getJobRunNumber();
+
+ try {
+ //
+ // setup execution environment
+ //
+ txn = Transaction.open(Transaction.CLOUD_DB);
+
+ try {
+ JmxUtil.registerMBean("AsyncJobManager", "Active Job " + job.getId(), new AsyncJobMBeanImpl(job));
+ } catch(Exception e) {
+ // Due to co-existence of normal-dispatched-job/wakeup-dispatched-job, MBean register() call
+ // is expected to fail under situations
+ if(s_logger.isTraceEnabled())
+ s_logger.trace("Unable to register active job " + job.getId() + " to JMX monitoring due to exception " + ExceptionUtil.toString(e));
+ }
+
+ _jobMonitor.registerActiveTask(runNumber, job.getId());
+ AsyncJobExecutionContext.setCurrentExecutionContext(new AsyncJobExecutionContext(job));
+
+ // execute the job
+ if(s_logger.isDebugEnabled()) {
+ s_logger.debug("Executing " + job);
+ }
+
+ if ((getAndResetPendingSignals(job) & SIGNAL_MASK_WAKEUP) != 0) {
+ AsyncJobDispatcher jobDispatcher = getWakeupDispatcher(job);
+ if(jobDispatcher != null) {
+ jobDispatcher.runJob(job);
+ } else {
+ s_logger.error("Unable to find a wakeup dispatcher from the joined job: " + job);
+ }
+ } else {
+ AsyncJobDispatcher jobDispatcher = getDispatcher(job.getDispatcher());
+ if(jobDispatcher != null) {
+ jobDispatcher.runJob(job);
+ } else {
+ s_logger.error("Unable to find job dispatcher, job will be cancelled");
+ completeAsyncJob(job.getId(), JobInfo.Status.FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null);
+ }
+ }
+
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Done executing " + job.getCmd() + " for job-" + job.getId());
+ }
+
+ } catch (Throwable e) {
+ s_logger.error("Unexpected exception", e);
+ completeAsyncJob(job.getId(), JobInfo.Status.FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null);
+ } finally {
+ // guard final clause as well
+ try {
+ AsyncJobVO jobToUpdate = _jobDao.findById(job.getId());
+ jobToUpdate.setExecutingMsid(null);
+ _jobDao.update(job.getId(), jobToUpdate);
+
+ if (job.getSyncSource() != null) {
+ _queueMgr.purgeItem(job.getSyncSource().getId());
+ checkQueue(job.getSyncSource().getQueueId());
+ }
+
+ try {
+ JmxUtil.unregisterMBean("AsyncJobManager", "Active Job " + job.getId());
+ } catch(Exception e) {
+ // Due to co-existence of normal-dispatched-job/wakeup-dispatched-job, MBean unregister() call
+ // is expected to fail under situations
+ if(s_logger.isTraceEnabled())
+ s_logger.trace("Unable to unregister job " + job.getId() + " to JMX monitoring due to exception " + ExceptionUtil.toString(e));
+ }
+
+ if(txn != null)
+ txn.close();
+
+ //
+ // clean execution environment
+ //
+ AsyncJobExecutionContext.unregister();
+ _jobMonitor.unregisterActiveTask(runNumber);
+
+ } catch(Throwable e) {
+ s_logger.error("Double exception", e);
+ }
+ }
+ }
+ };
+ }
+
+ private int getAndResetPendingSignals(AsyncJob job) {
+ int signals = job.getPendingSignals();
+ if(signals != 0) {
+ AsyncJobVO jobRecord = _jobDao.findById(job.getId());
+ jobRecord.setPendingSignals(0);
+ _jobDao.update(job.getId(), jobRecord);
+ }
+ return signals;
+ }
+
+ private void executeQueueItem(SyncQueueItemVO item, boolean fromPreviousSession) {
+ AsyncJobVO job = _jobDao.findById(item.getContentId());
+ if (job != null) {
+ if(s_logger.isDebugEnabled()) {
+ s_logger.debug("Schedule queued job-" + job.getId());
+ }
+
+ job.setSyncSource(item);
+
+ job.setExecutingMsid(getMsid());
+ _jobDao.update(job.getId(), job);
+
+ try {
+ scheduleExecution(job);
+ } catch(RejectedExecutionException e) {
+ s_logger.warn("Execution for job-" + job.getId() + " is rejected, return it to the queue for next turn");
+ _queueMgr.returnItem(item.getId());
+
+ job.setExecutingMsid(null);
+ _jobDao.update(job.getId(), job);
+ }
+
+ } else {
+ if(s_logger.isDebugEnabled()) {
+ s_logger.debug("Unable to find related job for queue item: " + item.toString());
+ }
+
+ _queueMgr.purgeItem(item.getId());
+ }
+ }
+
+ @Override
+ public void releaseSyncSource() {
+ AsyncJobExecutionContext executionContext = AsyncJobExecutionContext.getCurrentExecutionContext();
+ assert(executionContext != null);
+
+ if(executionContext.getSyncSource() != null) {
+ if(s_logger.isDebugEnabled()) {
+ s_logger.debug("Release sync source for job-" + executionContext.getJob().getId() + " sync source: "
+ + executionContext.getSyncSource().getContentType() + "-"
+ + executionContext.getSyncSource().getContentId());
+ }
+
+ _queueMgr.purgeItem(executionContext.getSyncSource().getId());
+ checkQueue(executionContext.getSyncSource().getQueueId());
+ }
+ }
+
+ @Override
+ public boolean waitAndCheck(String[] wakupTopicsOnMessageBus, long checkIntervalInMilliSeconds,
+ long timeoutInMiliseconds, Predicate predicate) {
+
+ MessageDetector msgDetector = new MessageDetector();
+ msgDetector.open(_messageBus, wakupTopicsOnMessageBus);
+ try {
+ long startTick = System.currentTimeMillis();
+ while(System.currentTimeMillis() - startTick < timeoutInMiliseconds) {
+ msgDetector.waitAny(checkIntervalInMilliSeconds);
+ if(predicate.checkCondition())
+ return true;
+ }
+ } finally {
+ msgDetector.close();
+ }
+
+ return false;
+ }
+
+ private void checkQueue(long queueId) {
+ while(true) {
+ try {
+ SyncQueueItemVO item = _queueMgr.dequeueFromOne(queueId, getMsid());
+ if(item != null) {
+ if(s_logger.isDebugEnabled()) {
+ s_logger.debug("Executing sync queue item: " + item.toString());
+ }
+
+ executeQueueItem(item, false);
+ } else {
+ break;
+ }
+ } catch(Throwable e) {
+ s_logger.error("Unexpected exception when kicking sync queue-" + queueId, e);
+ break;
+ }
+ }
+ }
+
+ private Runnable getHeartbeatTask() {
+ return new Runnable() {
+ @Override
+ public void run() {
+ Transaction txn = Transaction.open("AsyncJobManagerImpl.getHeartbeatTask");
+ try {
+ List<SyncQueueItemVO> l = _queueMgr.dequeueFromAny(getMsid(), MAX_ONETIME_SCHEDULE_SIZE);
+ if(l != null && l.size() > 0) {
+ for(SyncQueueItemVO item: l) {
+ if(s_logger.isDebugEnabled()) {
+ s_logger.debug("Execute sync-queue item: " + item.toString());
+ }
+ executeQueueItem(item, false);
+ }
+ }
+
+ List<Long> standaloneWakeupJobs = _joinMapDao.wakeupScan();
+ for(Long jobId : standaloneWakeupJobs) {
+ // TODO, we assume that all jobs in this category is API job only
+ AsyncJobVO job = _jobDao.findById(jobId);
+ if (job != null && (job.getPendingSignals() & SIGNAL_MASK_WAKEUP) != 0)
+ scheduleExecution(job, false);
+ }
+ } catch(Throwable e) {
+ s_logger.error("Unexpected exception when trying to execute queue item, ", e);
+ } finally {
+ try {
+ txn.close();
+ } catch(Throwable e) {
+ s_logger.error("Unexpected exception", e);
+ }
+ }
+ }
+ };
+ }
+
+ @DB
+ private Runnable getGCTask() {
+ return new Runnable() {
+ @Override
+ public void run() {
+ GlobalLock scanLock = GlobalLock.getInternLock("AsyncJobManagerGC");
+ try {
+ if(scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) {
+ try {
+ reallyRun();
+ } finally {
+ scanLock.unlock();
+ }
+ }
+ } finally {
+ scanLock.releaseRef();
+ }
+ }
+
+ public void reallyRun() {
+ try {
+ s_logger.trace("Begin cleanup expired async-jobs");
+
+ Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - _jobExpireSeconds.value() * 1000);
+
+ // limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute
+ // hopefully this will be fast enough to balance potential growth of job table
+ //1) Expire unfinished jobs that weren't processed yet
+ List<AsyncJobVO> l = _jobDao.getExpiredUnfinishedJobs(cutTime, 100);
+ for(AsyncJobVO job : l) {
+ s_logger.trace("Expunging unfinished job " + job);
+ expungeAsyncJob(job);
+ }
+
+ //2) Expunge finished jobs
+ List<AsyncJobVO> completedJobs = _jobDao.getExpiredCompletedJobs(cutTime, 100);
+ for(AsyncJobVO job : completedJobs) {
+ s_logger.trace("Expunging completed job " + job);
+ expungeAsyncJob(job);
+ }
+
+ // forcefully cancel blocking queue items if they've been staying there for too long
+ List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(_jobCancelThresholdSeconds.value()
+ * 1000, false);
+ if(blockItems != null && blockItems.size() > 0) {
+ for(SyncQueueItemVO item : blockItems) {
+ if(item.getContentType().equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) {
+ completeAsyncJob(item.getContentId(), JobInfo.Status.FAILED, 0, "Job is cancelled as it has been blocking others for too long");
+ }
+
+ // purge the item and resume queue processing
+ _queueMgr.purgeItem(item.getId());
+ }
+ }
+
+ s_logger.trace("End cleanup expired async-jobs");
+ } catch(Throwable e) {
+ s_logger.error("Unexpected exception when trying to execute queue item, ", e);
+ }
+ }
+ };
+ }
+
+ @DB
+ protected void expungeAsyncJob(AsyncJobVO job) {
+ Transaction txn = Transaction.currentTxn();
+ txn.start();
+ _jobDao.expunge(job.getId());
+ //purge corresponding sync queue item
+ _queueMgr.purgeAsyncJobQueueItemId(job.getId());
+ txn.commit();
+ }
+
+ private long getMsid() {
+ return ManagementServerNode.getManagementServerId();
+ }
+
+ private void cleanupPendingJobs(List<SyncQueueItemVO> l) {
+ for (SyncQueueItemVO item : l) {
+ if (s_logger.isInfoEnabled()) {
+ s_logger.info("Discard left-over queue item: " + item.toString());
+ }
+
+ String contentType = item.getContentType();
+ if (contentType != null && contentType.equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) {
+ Long jobId = item.getContentId();
+ if (jobId != null) {
+ s_logger.warn("Mark job as failed as its correspoding queue-item has been discarded. job id: " + jobId);
+ completeAsyncJob(jobId, JobInfo.Status.FAILED, 0, "Execution was cancelled because of server shutdown");
+ }
+ }
+ _queueMgr.purgeItem(item.getId());
+ }
+ }
+
+ @Override
+ public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
+ _jobExpireSeconds = _configDepot.get(JobExpireMinutes).setMultiplier(60);
+ _jobCancelThresholdSeconds = _configDepot.get(JobCancelThresholdMinutes).setMultiplier(60);
+
+ try {
+ final File dbPropsFile = PropertiesUtil.findConfigFile("db.properties");
+ final Properties dbProps = new Properties();
+ dbProps.load(new FileInputStream(dbPropsFile));
+
+ final int cloudMaxActive = Integer.parseInt(dbProps.getProperty("db.cloud.maxActive"));
+
+ int poolSize = (cloudMaxActive * 2) / 3;
+
+ s_logger.info("Start AsyncJobManager thread pool in size " + poolSize);
+ _executor = Executors.newFixedThreadPool(poolSize, new NamedThreadFactory(AsyncJobManager.JOB_POOL_THREAD_PREFIX));
+ } catch (final Exception e) {
+ throw new ConfigurationException("Unable to load db.properties to configure AsyncJobManagerImpl");
+ }
+
+ AsyncJobExecutionContext.init(this, _joinMapDao);
+ OutcomeImpl.init(this);
+
+ return true;
+ }
+
+ @Override
+ public void onManagementNodeJoined(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
+ }
+
+ @Override
+ public void onManagementNodeLeft(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
+ for (ManagementServerHost msHost : nodeList) {
+ Transaction txn = Transaction.open(Transaction.CLOUD_DB);
+ try {
+ txn.start();
+ List<SyncQueueItemVO> items = _queueMgr.getActiveQueueItems(msHost.getId(), true);
+ cleanupPendingJobs(items);
+ _jobDao.resetJobProcess(msHost.getId(), ApiErrorCode.INTERNAL_ERROR.getHttpCode(), "job cancelled because of management server restart");
+ txn.commit();
+ } catch(Throwable e) {
+ s_logger.warn("Unexpected exception ", e);
+ } finally {
+ txn.close();
+ }
+ }
+ }
+
+ @Override
+ public void onManagementNodeIsolated() {
+ }
+
+ @Override
+ public boolean start() {
+ try {
+ _jobDao.cleanupPseduoJobs(getMsid());
+
+ List<SyncQueueItemVO> l = _queueMgr.getActiveQueueItems(getMsid(), false);
+ cleanupPendingJobs(l);
+ _jobDao.resetJobProcess(getMsid(), ApiErrorCode.INTERNAL_ERROR.getHttpCode(), "job cancelled because of management server restart");
+ } catch(Throwable e) {
+ s_logger.error("Unexpected exception " + e.getMessage(), e);
+ }
+
+ _heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
+ _heartbeatScheduler.scheduleAtFixedRate(getGCTask(), GC_INTERVAL, GC_INTERVAL, TimeUnit.MILLISECONDS);
+
+ return true;
+ }
+
+ @Override
+ public boolean stop() {
+ _heartbeatScheduler.shutdown();
+ _executor.shutdown();
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/8f00c191/server/src/com/cloud/async/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java
deleted file mode 100644
index bc8f99b..0000000
--- a/server/src/com/cloud/async/AsyncJobManagerImpl.java
+++ /dev/null
@@ -1,841 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package com.cloud.async;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import javax.inject.Inject;
-import javax.naming.ConfigurationException;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cloudstack.api.ApiErrorCode;
-import org.apache.cloudstack.config.ConfigDepot;
-import org.apache.cloudstack.config.ConfigKey;
-import org.apache.cloudstack.config.ConfigValue;
-import org.apache.cloudstack.config.Configurable;
-import org.apache.cloudstack.framework.jobs.AsyncJob;
-import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher;
-import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
-import org.apache.cloudstack.framework.jobs.AsyncJobManager;
-import org.apache.cloudstack.framework.jobs.dao.AsyncJobDao;
-import org.apache.cloudstack.framework.jobs.dao.AsyncJobJoinMapDao;
-import org.apache.cloudstack.framework.jobs.dao.AsyncJobJournalDao;
-import org.apache.cloudstack.framework.jobs.impl.AsyncJobJoinMapVO;
-import org.apache.cloudstack.framework.jobs.impl.AsyncJobJournalVO;
-import org.apache.cloudstack.framework.jobs.impl.AsyncJobMBeanImpl;
-import org.apache.cloudstack.framework.jobs.impl.AsyncJobMonitor;
-import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
-import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl;
-import org.apache.cloudstack.framework.jobs.impl.SyncQueueItem;
-import org.apache.cloudstack.framework.jobs.impl.SyncQueueItemVO;
-import org.apache.cloudstack.framework.jobs.impl.SyncQueueManager;
-import org.apache.cloudstack.framework.jobs.impl.SyncQueueVO;
-import org.apache.cloudstack.framework.messagebus.MessageBus;
-import org.apache.cloudstack.framework.messagebus.MessageDetector;
-import org.apache.cloudstack.framework.messagebus.PublishScope;
-import org.apache.cloudstack.jobs.JobInfo;
-import org.apache.cloudstack.jobs.JobInfo.Status;
-
-import com.cloud.cluster.ClusterManagerListener;
-import com.cloud.cluster.ManagementServerHost;
-import com.cloud.cluster.ManagementServerNode;
-import com.cloud.utils.DateUtil;
-import com.cloud.utils.Predicate;
-import com.cloud.utils.PropertiesUtil;
-import com.cloud.utils.component.ManagerBase;
-import com.cloud.utils.concurrency.NamedThreadFactory;
-import com.cloud.utils.db.DB;
-import com.cloud.utils.db.GenericDao;
-import com.cloud.utils.db.GenericDaoBase;
-import com.cloud.utils.db.GlobalLock;
-import com.cloud.utils.db.Transaction;
-import com.cloud.utils.exception.CloudRuntimeException;
-import com.cloud.utils.exception.ExceptionUtil;
-import com.cloud.utils.mgmt.JmxUtil;
-
-public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager, ClusterManagerListener, Configurable {
- // Advanced
- private static final ConfigKey<Long> JobExpireMinutes = new ConfigKey<Long>(Long.class, "job.expire.minutes", "Advanced", AsyncJobManager.class, "1440",
- "Time (in minutes) for async-jobs to be kept in system", true, null);
- private static final ConfigKey<Long> JobCancelThresholdMinutes = new ConfigKey<Long>(Long.class, "job.cancel.threshold.minutes", "Advanced", AsyncJobManager.class,
- "60", "Time (in minutes) for async-jobs to be forcely cancelled if it has been in process for long", true, null);
-
- private static final Logger s_logger = Logger.getLogger(AsyncJobManagerImpl.class);
- private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 3; // 3 seconds
- // Although we may have detailed masks for each individual wakeup event, i.e.
- // periodical timer, matched topic from message bus, it seems that we don't
- // need to distinguish them to such level. Therefore, only one wakeup signal
- // is defined
- public static final int SIGNAL_MASK_WAKEUP = 1;
-
- private static final int MAX_ONETIME_SCHEDULE_SIZE = 50;
- private static final int HEARTBEAT_INTERVAL = 2000;
- private static final int GC_INTERVAL = 10000; // 10 seconds
-
- @Inject private SyncQueueManager _queueMgr;
- @Inject private AsyncJobDao _jobDao;
- @Inject private AsyncJobJournalDao _journalDao;
- @Inject private AsyncJobJoinMapDao _joinMapDao;
- @Inject private List<AsyncJobDispatcher> _jobDispatchers;
- @Inject private MessageBus _messageBus;
- @Inject private AsyncJobMonitor _jobMonitor;
- @Inject
- private ConfigDepot _configDepot;
-
- private ConfigValue<Long> _jobExpireSeconds; // 1 day
- private ConfigValue<Long> _jobCancelThresholdSeconds; // 1 hour (for cancelling the jobs blocking other jobs)
-
- private volatile long _executionRunNumber = 1;
-
- private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("AsyncJobMgr-Heartbeat"));
- private ExecutorService _executor;
-
- @Override
- public ConfigKey<?>[] getConfigKeys() {
- return new ConfigKey<?>[] {JobExpireMinutes, JobCancelThresholdMinutes};
- }
-
- @Override
- public AsyncJobVO getAsyncJob(long jobId) {
- return _jobDao.findById(jobId);
- }
-
- @Override
- public List<AsyncJobVO> findInstancePendingAsyncJobs(String instanceType, Long accountId) {
- return _jobDao.findInstancePendingAsyncJobs(instanceType, accountId);
- }
-
- @Override @DB
- public AsyncJob getPseudoJob(long accountId, long userId) {
- AsyncJobVO job = _jobDao.findPseudoJob(Thread.currentThread().getId(), getMsid());
- if(job == null) {
- job = new AsyncJobVO();
- job.setAccountId(accountId);
- job.setUserId(userId);
- job.setInitMsid(getMsid());
- job.setDispatcher(AsyncJobVO.JOB_DISPATCHER_PSEUDO);
- job.setInstanceType(AsyncJobVO.PSEUDO_JOB_INSTANCE_TYPE);
- job.setInstanceId(Thread.currentThread().getId());
- _jobDao.persist(job);
- }
- return job;
- }
-
- @Override
- public long submitAsyncJob(AsyncJob job) {
- return submitAsyncJob(job, false);
- }
-
- @SuppressWarnings("unchecked")
- @DB
- public long submitAsyncJob(AsyncJob job, boolean scheduleJobExecutionInContext) {
- @SuppressWarnings("rawtypes")
- GenericDao dao = GenericDaoBase.getDao(job.getClass());
- job.setInitMsid(getMsid());
- job.setSyncSource(null); // no sync source originally
- dao.persist(job);
-
- scheduleExecution(job, scheduleJobExecutionInContext);
- if (s_logger.isDebugEnabled()) {
- s_logger.debug("submit async job-" + job.getId() + ", details: " + job.toString());
- }
- return job.getId();
- }
-
- @SuppressWarnings("unchecked")
- @Override @DB
- public long submitAsyncJob(AsyncJob job, String syncObjType, long syncObjId) {
- Transaction txt = Transaction.currentTxn();
- try {
- @SuppressWarnings("rawtypes")
- GenericDao dao = GenericDaoBase.getDao(job.getClass());
-
- txt.start();
- job.setInitMsid(getMsid());
- dao.persist(job);
-
- syncAsyncJobExecution(job, syncObjType, syncObjId, 1);
- txt.commit();
- return job.getId();
- } catch(Exception e) {
- String errMsg = "Unable to schedule async job for command " + job.getCmd() + ", unexpected exception.";
- s_logger.warn(errMsg, e);
- throw new CloudRuntimeException(errMsg);
- }
- }
-
- @Override @DB
- public void completeAsyncJob(long jobId, Status jobStatus, int resultCode, String resultObject) {
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Complete async job-" + jobId + ", jobStatus: " + jobStatus +
- ", resultCode: " + resultCode + ", result: " + resultObject);
- }
-
- Transaction txn = Transaction.currentTxn();
- try {
- txn.start();
- AsyncJobVO job = _jobDao.findById(jobId);
- if(job == null) {
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("job-" + jobId + " no longer exists, we just log completion info here. " + jobStatus +
- ", resultCode: " + resultCode + ", result: " + resultObject);
- }
-
- txn.rollback();
- return;
- }
-
- if(job.getStatus() != JobInfo.Status.IN_PROGRESS) {
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("job-" + jobId + " is already completed.");
- }
-
- txn.rollback();
- return;
- }
-
- job.setCompleteMsid(getMsid());
- job.setStatus(jobStatus);
- job.setResultCode(resultCode);
-
- // reset attached object
- job.setInstanceType(null);
- job.setInstanceId(null);
-
- if (resultObject != null) {
- job.setResult(resultObject);
- }
-
- job.setLastUpdated(DateUtil.currentGMTTime());
- _jobDao.update(jobId, job);
-
- List<Long> wakeupList = _joinMapDao.wakeupByJoinedJobCompletion(jobId);
- _joinMapDao.disjoinAllJobs(jobId);
-
- txn.commit();
-
- for(Long id : wakeupList) {
- // TODO, we assume that all jobs in this category is API job only
- AsyncJobVO jobToWakeup = _jobDao.findById(id);
- if (jobToWakeup != null && (jobToWakeup.getPendingSignals() & SIGNAL_MASK_WAKEUP) != 0)
- scheduleExecution(jobToWakeup, false);
- }
-
- _messageBus.publish(null, AsyncJob.Topics.JOB_STATE, PublishScope.GLOBAL, jobId);
- } catch(Exception e) {
- s_logger.error("Unexpected exception while completing async job-" + jobId, e);
- txn.rollback();
- }
- }
-
- @Override @DB
- public void updateAsyncJobStatus(long jobId, int processStatus, String resultObject) {
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Update async-job progress, job-" + jobId + ", processStatus: " + processStatus +
- ", result: " + resultObject);
- }
-
- Transaction txt = Transaction.currentTxn();
- try {
- txt.start();
- AsyncJobVO job = _jobDao.findById(jobId);
- if(job == null) {
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("job-" + jobId + " no longer exists, we just log progress info here. progress status: " + processStatus);
- }
-
- txt.rollback();
- return;
- }
-
- job.setProcessStatus(processStatus);
- if(resultObject != null) {
- job.setResult(resultObject);
- }
- job.setLastUpdated(DateUtil.currentGMTTime());
- _jobDao.update(jobId, job);
- txt.commit();
- } catch(Exception e) {
- s_logger.error("Unexpected exception while updating async job-" + jobId + " status: ", e);
- txt.rollback();
- }
- }
-
- @Override @DB
- public void updateAsyncJobAttachment(long jobId, String instanceType, Long instanceId) {
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Update async-job attachment, job-" + jobId + ", instanceType: " + instanceType +
- ", instanceId: " + instanceId);
- }
-
- Transaction txt = Transaction.currentTxn();
- try {
- txt.start();
-
- AsyncJobVO job = _jobDao.createForUpdate();
- job.setInstanceType(instanceType);
- job.setInstanceId(instanceId);
- job.setLastUpdated(DateUtil.currentGMTTime());
- _jobDao.update(jobId, job);
-
- txt.commit();
- } catch(Exception e) {
- s_logger.error("Unexpected exception while updating async job-" + jobId + " attachment: ", e);
- txt.rollback();
- }
- }
-
- @Override @DB
- public void logJobJournal(long jobId, AsyncJob.JournalType journalType, String
- journalText, String journalObjJson) {
- AsyncJobJournalVO journal = new AsyncJobJournalVO();
- journal.setJobId(jobId);
- journal.setJournalType(journalType);
- journal.setJournalText(journalText);
- journal.setJournalObjJsonString(journalObjJson);
-
- _journalDao.persist(journal);
- }
-
- @Override @DB
- public void joinJob(long jobId, long joinJobId) {
- _joinMapDao.joinJob(jobId, joinJobId, getMsid(), 0, 0, null, null, null);
- }
-
- @Override @DB
- public void joinJob(long jobId, long joinJobId, String wakeupHandler, String wakeupDispatcher,
- String[] wakeupTopcisOnMessageBus, long wakeupIntervalInMilliSeconds, long timeoutInMilliSeconds) {
-
- Long syncSourceId = null;
- AsyncJobExecutionContext context = AsyncJobExecutionContext.getCurrentExecutionContext();
- assert(context.getJob() != null);
- if(context.getJob().getSyncSource() != null) {
- syncSourceId = context.getJob().getSyncSource().getQueueId();
- }
-
- _joinMapDao.joinJob(jobId, joinJobId, getMsid(),
- wakeupIntervalInMilliSeconds, timeoutInMilliSeconds,
- syncSourceId, wakeupHandler, wakeupDispatcher);
- }
-
- @Override @DB
- public void disjoinJob(long jobId, long joinedJobId) {
- _joinMapDao.disjoinJob(jobId, joinedJobId);
- }
-
- @Override @DB
- public void completeJoin(long joinJobId, JobInfo.Status joinStatus, String joinResult) {
- _joinMapDao.completeJoin(joinJobId, joinStatus, joinResult, getMsid());
- }
-
- @Override
- public void syncAsyncJobExecution(AsyncJob job, String syncObjType, long syncObjId, long queueSizeLimit) {
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Sync job-" + job.getId() + " execution on object " + syncObjType + "." + syncObjId);
- }
-
- SyncQueueVO queue = null;
-
- // to deal with temporary DB exceptions like DB deadlock/Lock-wait time out cased rollbacks
- // we retry five times until we throw an exception
- Random random = new Random();
-
- for(int i = 0; i < 5; i++) {
- queue = _queueMgr.queue(syncObjType, syncObjId, SyncQueueItem.AsyncJobContentType, job.getId(), queueSizeLimit);
- if(queue != null) {
- break;
- }
-
- try {
- Thread.sleep(1000 + random.nextInt(5000));
- } catch (InterruptedException e) {
- }
- }
-
- if (queue == null)
- throw new CloudRuntimeException("Unable to insert queue item into database, DB is full?");
- }
-
- @Override
- public AsyncJob queryJob(long jobId, boolean updatePollTime) {
- AsyncJobVO job = _jobDao.findById(jobId);
-
- if (updatePollTime) {
- job.setLastPolled(DateUtil.currentGMTTime());
- _jobDao.update(jobId, job);
- }
- return job;
- }
-
-
- private void scheduleExecution(final AsyncJobVO job) {
- scheduleExecution(job, false);
- }
-
- private void scheduleExecution(final AsyncJob job, boolean executeInContext) {
- Runnable runnable = getExecutorRunnable(job);
- if (executeInContext) {
- runnable.run();
- } else {
- _executor.submit(runnable);
- }
- }
-
- private AsyncJobDispatcher getDispatcher(String dispatcherName) {
- assert (dispatcherName != null && !dispatcherName.isEmpty()) : "Who's not setting the dispatcher when submitting a job? Who am I suppose to call if you do that!";
-
- for (AsyncJobDispatcher dispatcher : _jobDispatchers) {
- if (dispatcherName.equals(dispatcher.getName()))
- return dispatcher;
- }
-
- throw new CloudRuntimeException("Unable to find dispatcher name: " + dispatcherName);
- }
-
- private AsyncJobDispatcher getWakeupDispatcher(AsyncJob job) {
- if(_jobDispatchers != null) {
- List<AsyncJobJoinMapVO> joinRecords = _joinMapDao.listJoinRecords(job.getId());
- if(joinRecords.size() > 0) {
- AsyncJobJoinMapVO joinRecord = joinRecords.get(0);
- for(AsyncJobDispatcher dispatcher : _jobDispatchers) {
- if(dispatcher.getName().equals(joinRecord.getWakeupDispatcher()))
- return dispatcher;
- }
- } else {
- s_logger.warn("job-" + job.getId() + " is scheduled for wakeup run, but there is no joining info anymore");
- }
- }
- return null;
- }
-
- private long getJobRunNumber() {
- synchronized(this) {
- return _executionRunNumber++;
- }
- }
-
- private Runnable getExecutorRunnable(final AsyncJob job) {
- return new Runnable() {
- @Override
- public void run() {
- Transaction txn = null;
- long runNumber = getJobRunNumber();
-
- try {
- //
- // setup execution environment
- //
- txn = Transaction.open(Transaction.CLOUD_DB);
-
- try {
- JmxUtil.registerMBean("AsyncJobManager", "Active Job " + job.getId(), new AsyncJobMBeanImpl(job));
- } catch(Exception e) {
- // Due to co-existence of normal-dispatched-job/wakeup-dispatched-job, MBean register() call
- // is expected to fail under situations
- if(s_logger.isTraceEnabled())
- s_logger.trace("Unable to register active job " + job.getId() + " to JMX monitoring due to exception " + ExceptionUtil.toString(e));
- }
-
- _jobMonitor.registerActiveTask(runNumber, job.getId());
- AsyncJobExecutionContext.setCurrentExecutionContext(new AsyncJobExecutionContext(job));
-
- // execute the job
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Executing " + job);
- }
-
- if ((getAndResetPendingSignals(job) & SIGNAL_MASK_WAKEUP) != 0) {
- AsyncJobDispatcher jobDispatcher = getWakeupDispatcher(job);
- if(jobDispatcher != null) {
- jobDispatcher.runJob(job);
- } else {
- s_logger.error("Unable to find a wakeup dispatcher from the joined job: " + job);
- }
- } else {
- AsyncJobDispatcher jobDispatcher = getDispatcher(job.getDispatcher());
- if(jobDispatcher != null) {
- jobDispatcher.runJob(job);
- } else {
- s_logger.error("Unable to find job dispatcher, job will be cancelled");
- completeAsyncJob(job.getId(), JobInfo.Status.FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null);
- }
- }
-
- if (s_logger.isDebugEnabled()) {
- s_logger.debug("Done executing " + job.getCmd() + " for job-" + job.getId());
- }
-
- } catch (Throwable e) {
- s_logger.error("Unexpected exception", e);
- completeAsyncJob(job.getId(), JobInfo.Status.FAILED, ApiErrorCode.INTERNAL_ERROR.getHttpCode(), null);
- } finally {
- // guard final clause as well
- try {
- AsyncJobVO jobToUpdate = _jobDao.findById(job.getId());
- jobToUpdate.setExecutingMsid(null);
- _jobDao.update(job.getId(), jobToUpdate);
-
- if (job.getSyncSource() != null) {
- _queueMgr.purgeItem(job.getSyncSource().getId());
- checkQueue(job.getSyncSource().getQueueId());
- }
-
- try {
- JmxUtil.unregisterMBean("AsyncJobManager", "Active Job " + job.getId());
- } catch(Exception e) {
- // Due to co-existence of normal-dispatched-job/wakeup-dispatched-job, MBean unregister() call
- // is expected to fail under situations
- if(s_logger.isTraceEnabled())
- s_logger.trace("Unable to unregister job " + job.getId() + " to JMX monitoring due to exception " + ExceptionUtil.toString(e));
- }
-
- if(txn != null)
- txn.close();
-
- //
- // clean execution environment
- //
- AsyncJobExecutionContext.unregister();
- _jobMonitor.unregisterActiveTask(runNumber);
-
- } catch(Throwable e) {
- s_logger.error("Double exception", e);
- }
- }
- }
- };
- }
-
- private int getAndResetPendingSignals(AsyncJob job) {
- int signals = job.getPendingSignals();
- if(signals != 0) {
- AsyncJobVO jobRecord = _jobDao.findById(job.getId());
- jobRecord.setPendingSignals(0);
- _jobDao.update(job.getId(), jobRecord);
- }
- return signals;
- }
-
- private void executeQueueItem(SyncQueueItemVO item, boolean fromPreviousSession) {
- AsyncJobVO job = _jobDao.findById(item.getContentId());
- if (job != null) {
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Schedule queued job-" + job.getId());
- }
-
- job.setSyncSource(item);
-
- job.setExecutingMsid(getMsid());
- _jobDao.update(job.getId(), job);
-
- try {
- scheduleExecution(job);
- } catch(RejectedExecutionException e) {
- s_logger.warn("Execution for job-" + job.getId() + " is rejected, return it to the queue for next turn");
- _queueMgr.returnItem(item.getId());
-
- job.setExecutingMsid(null);
- _jobDao.update(job.getId(), job);
- }
-
- } else {
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Unable to find related job for queue item: " + item.toString());
- }
-
- _queueMgr.purgeItem(item.getId());
- }
- }
-
- @Override
- public void releaseSyncSource() {
- AsyncJobExecutionContext executionContext = AsyncJobExecutionContext.getCurrentExecutionContext();
- assert(executionContext != null);
-
- if(executionContext.getSyncSource() != null) {
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Release sync source for job-" + executionContext.getJob().getId() + " sync source: "
- + executionContext.getSyncSource().getContentType() + "-"
- + executionContext.getSyncSource().getContentId());
- }
-
- _queueMgr.purgeItem(executionContext.getSyncSource().getId());
- checkQueue(executionContext.getSyncSource().getQueueId());
- }
- }
-
- @Override
- public boolean waitAndCheck(String[] wakupTopicsOnMessageBus, long checkIntervalInMilliSeconds,
- long timeoutInMiliseconds, Predicate predicate) {
-
- MessageDetector msgDetector = new MessageDetector();
- msgDetector.open(_messageBus, wakupTopicsOnMessageBus);
- try {
- long startTick = System.currentTimeMillis();
- while(System.currentTimeMillis() - startTick < timeoutInMiliseconds) {
- msgDetector.waitAny(checkIntervalInMilliSeconds);
- if(predicate.checkCondition())
- return true;
- }
- } finally {
- msgDetector.close();
- }
-
- return false;
- }
-
- private void checkQueue(long queueId) {
- while(true) {
- try {
- SyncQueueItemVO item = _queueMgr.dequeueFromOne(queueId, getMsid());
- if(item != null) {
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Executing sync queue item: " + item.toString());
- }
-
- executeQueueItem(item, false);
- } else {
- break;
- }
- } catch(Throwable e) {
- s_logger.error("Unexpected exception when kicking sync queue-" + queueId, e);
- break;
- }
- }
- }
-
- private Runnable getHeartbeatTask() {
- return new Runnable() {
- @Override
- public void run() {
- Transaction txn = Transaction.open("AsyncJobManagerImpl.getHeartbeatTask");
- try {
- List<SyncQueueItemVO> l = _queueMgr.dequeueFromAny(getMsid(), MAX_ONETIME_SCHEDULE_SIZE);
- if(l != null && l.size() > 0) {
- for(SyncQueueItemVO item: l) {
- if(s_logger.isDebugEnabled()) {
- s_logger.debug("Execute sync-queue item: " + item.toString());
- }
- executeQueueItem(item, false);
- }
- }
-
- List<Long> standaloneWakeupJobs = _joinMapDao.wakeupScan();
- for(Long jobId : standaloneWakeupJobs) {
- // TODO, we assume that all jobs in this category is API job only
- AsyncJobVO job = _jobDao.findById(jobId);
- if (job != null && (job.getPendingSignals() & SIGNAL_MASK_WAKEUP) != 0)
- scheduleExecution(job, false);
- }
- } catch(Throwable e) {
- s_logger.error("Unexpected exception when trying to execute queue item, ", e);
- } finally {
- try {
- txn.close();
- } catch(Throwable e) {
- s_logger.error("Unexpected exception", e);
- }
- }
- }
- };
- }
-
- @DB
- private Runnable getGCTask() {
- return new Runnable() {
- @Override
- public void run() {
- GlobalLock scanLock = GlobalLock.getInternLock("AsyncJobManagerGC");
- try {
- if(scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) {
- try {
- reallyRun();
- } finally {
- scanLock.unlock();
- }
- }
- } finally {
- scanLock.releaseRef();
- }
- }
-
- public void reallyRun() {
- try {
- s_logger.trace("Begin cleanup expired async-jobs");
-
- Date cutTime = new Date(DateUtil.currentGMTTime().getTime() - _jobExpireSeconds.value() * 1000);
-
- // limit to 100 jobs per turn, this gives cleanup throughput as 600 jobs per minute
- // hopefully this will be fast enough to balance potential growth of job table
- //1) Expire unfinished jobs that weren't processed yet
- List<AsyncJobVO> l = _jobDao.getExpiredUnfinishedJobs(cutTime, 100);
- for(AsyncJobVO job : l) {
- s_logger.trace("Expunging unfinished job " + job);
- expungeAsyncJob(job);
- }
-
- //2) Expunge finished jobs
- List<AsyncJobVO> completedJobs = _jobDao.getExpiredCompletedJobs(cutTime, 100);
- for(AsyncJobVO job : completedJobs) {
- s_logger.trace("Expunging completed job " + job);
- expungeAsyncJob(job);
- }
-
- // forcefully cancel blocking queue items if they've been staying there for too long
- List<SyncQueueItemVO> blockItems = _queueMgr.getBlockedQueueItems(_jobCancelThresholdSeconds.value()
- * 1000, false);
- if(blockItems != null && blockItems.size() > 0) {
- for(SyncQueueItemVO item : blockItems) {
- if(item.getContentType().equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) {
- completeAsyncJob(item.getContentId(), JobInfo.Status.FAILED, 0, "Job is cancelled as it has been blocking others for too long");
- }
-
- // purge the item and resume queue processing
- _queueMgr.purgeItem(item.getId());
- }
- }
-
- s_logger.trace("End cleanup expired async-jobs");
- } catch(Throwable e) {
- s_logger.error("Unexpected exception when trying to execute queue item, ", e);
- }
- }
- };
- }
-
- @DB
- protected void expungeAsyncJob(AsyncJobVO job) {
- Transaction txn = Transaction.currentTxn();
- txn.start();
- _jobDao.expunge(job.getId());
- //purge corresponding sync queue item
- _queueMgr.purgeAsyncJobQueueItemId(job.getId());
- txn.commit();
- }
-
- private long getMsid() {
- return ManagementServerNode.getManagementServerId();
- }
-
- private void cleanupPendingJobs(List<SyncQueueItemVO> l) {
- for (SyncQueueItemVO item : l) {
- if (s_logger.isInfoEnabled()) {
- s_logger.info("Discard left-over queue item: " + item.toString());
- }
-
- String contentType = item.getContentType();
- if (contentType != null && contentType.equalsIgnoreCase(SyncQueueItem.AsyncJobContentType)) {
- Long jobId = item.getContentId();
- if (jobId != null) {
- s_logger.warn("Mark job as failed as its correspoding queue-item has been discarded. job id: " + jobId);
- completeAsyncJob(jobId, JobInfo.Status.FAILED, 0, "Execution was cancelled because of server shutdown");
- }
- }
- _queueMgr.purgeItem(item.getId());
- }
- }
-
- @Override
- public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
- _jobExpireSeconds = _configDepot.get(JobExpireMinutes).setMultiplier(60);
- _jobCancelThresholdSeconds = _configDepot.get(JobCancelThresholdMinutes).setMultiplier(60);
-
- try {
- final File dbPropsFile = PropertiesUtil.findConfigFile("db.properties");
- final Properties dbProps = new Properties();
- dbProps.load(new FileInputStream(dbPropsFile));
-
- final int cloudMaxActive = Integer.parseInt(dbProps.getProperty("db.cloud.maxActive"));
-
- int poolSize = (cloudMaxActive * 2) / 3;
-
- s_logger.info("Start AsyncJobManager thread pool in size " + poolSize);
- _executor = Executors.newFixedThreadPool(poolSize, new NamedThreadFactory(AsyncJobManager.JOB_POOL_THREAD_PREFIX));
- } catch (final Exception e) {
- throw new ConfigurationException("Unable to load db.properties to configure AsyncJobManagerImpl");
- }
-
- AsyncJobExecutionContext.init(this, _joinMapDao);
- OutcomeImpl.init(this);
-
- return true;
- }
-
- @Override
- public void onManagementNodeJoined(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
- }
-
- @Override
- public void onManagementNodeLeft(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
- for (ManagementServerHost msHost : nodeList) {
- Transaction txn = Transaction.open(Transaction.CLOUD_DB);
- try {
- txn.start();
- List<SyncQueueItemVO> items = _queueMgr.getActiveQueueItems(msHost.getId(), true);
- cleanupPendingJobs(items);
- _jobDao.resetJobProcess(msHost.getId(), ApiErrorCode.INTERNAL_ERROR.getHttpCode(), "job cancelled because of management server restart");
- txn.commit();
- } catch(Throwable e) {
- s_logger.warn("Unexpected exception ", e);
- } finally {
- txn.close();
- }
- }
- }
-
- @Override
- public void onManagementNodeIsolated() {
- }
-
- @Override
- public boolean start() {
- try {
- _jobDao.cleanupPseduoJobs(getMsid());
-
- List<SyncQueueItemVO> l = _queueMgr.getActiveQueueItems(getMsid(), false);
- cleanupPendingJobs(l);
- _jobDao.resetJobProcess(getMsid(), ApiErrorCode.INTERNAL_ERROR.getHttpCode(), "job cancelled because of management server restart");
- } catch(Throwable e) {
- s_logger.error("Unexpected exception " + e.getMessage(), e);
- }
-
- _heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
- _heartbeatScheduler.scheduleAtFixedRate(getGCTask(), GC_INTERVAL, GC_INTERVAL, TimeUnit.MILLISECONDS);
-
- return true;
- }
-
- @Override
- public boolean stop() {
- _heartbeatScheduler.shutdown();
- _executor.shutdown();
- return true;
- }
-}