You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@syncope.apache.org by gi...@apache.org on 2015/06/22 16:49:07 UTC
[2/2] syncope git commit: [SYNCOPE-660]Forcing interrupt for heavy
tasks - minor improvement
[SYNCOPE-660]Forcing interrupt for heavy tasks - minor improvement
Project: http://git-wip-us.apache.org/repos/asf/syncope/repo
Commit: http://git-wip-us.apache.org/repos/asf/syncope/commit/b13e9fe4
Tree: http://git-wip-us.apache.org/repos/asf/syncope/tree/b13e9fe4
Diff: http://git-wip-us.apache.org/repos/asf/syncope/diff/b13e9fe4
Branch: refs/heads/master
Commit: b13e9fe4dd226d7ef48b249447289e3a4451c84b
Parents: 0ae4ac8 bdb3257
Author: giacomolm <gi...@hotmail.it>
Authored: Mon Jun 22 16:45:28 2015 +0200
Committer: giacomolm <gi...@hotmail.it>
Committed: Mon Jun 22 16:45:28 2015 +0200
----------------------------------------------------------------------
.../provisioning/java/job/AbstractTaskJob.java | 29 ++++++++------------
1 file changed, 12 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/syncope/blob/b13e9fe4/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractTaskJob.java
----------------------------------------------------------------------
diff --cc core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractTaskJob.java
index 4c49655,0000000..728ab41
mode 100644,000000..100644
--- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractTaskJob.java
+++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractTaskJob.java
@@@ -1,221 -1,0 +1,216 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.syncope.core.provisioning.java.job;
+
- import java.text.SimpleDateFormat;
+import java.util.Date;
- import java.util.Locale;
+import java.util.concurrent.atomic.AtomicReference;
- import org.apache.syncope.common.lib.SyncopeConstants;
+import org.apache.syncope.common.lib.types.AuditElements;
+import org.apache.syncope.common.lib.types.AuditElements.Result;
+import org.apache.syncope.core.persistence.api.dao.TaskDAO;
+import org.apache.syncope.core.persistence.api.dao.TaskExecDAO;
+import org.apache.syncope.core.persistence.api.entity.EntityFactory;
+import org.apache.syncope.core.persistence.api.entity.task.Task;
+import org.apache.syncope.core.persistence.api.entity.task.TaskExec;
+import org.apache.syncope.core.provisioning.api.job.TaskJob;
+import org.apache.syncope.core.misc.AuditManager;
++import org.apache.syncope.core.misc.DataFormat;
+import org.apache.syncope.core.misc.ExceptionUtils2;
+import org.apache.syncope.core.persistence.api.dao.ConfDAO;
+import org.apache.syncope.core.provisioning.api.notification.NotificationManager;
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.UnableToInterruptJobException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+/**
+ * Abstract job implementation that delegates to concrete implementation the actual job execution and provides some
+ * base features.
+ * <strong>Extending this class will not provide support transaction management.</strong><br/>
+ * Extend <tt>AbstractTransactionalTaskJob</tt> for this purpose.
+ *
+ * @see AbstractTransactionalTaskJob
+ */
+@DisallowConcurrentExecution
+public abstract class AbstractTaskJob implements TaskJob {
+
+ /**
+ * Task execution status.
+ */
+ public enum Status {
+
+ SUCCESS,
+ FAILURE
+
+ }
+
+ /**
+ * Logger.
+ */
+ protected static final Logger LOG = LoggerFactory.getLogger(AbstractTaskJob.class);
+
+ /**
+ * Task DAO.
+ */
+ @Autowired
+ protected TaskDAO taskDAO;
+
+ /**
+ * Task execution DAO.
+ */
+ @Autowired
+ private TaskExecDAO taskExecDAO;
+
+ /**
+ * Configuration DAO.
+ */
+ @Autowired
+ private ConfDAO confDAO;
+
+ /**
+ * Notification manager.
+ */
+ @Autowired
+ private NotificationManager notificationManager;
+
+ /**
+ * Audit manager.
+ */
+ @Autowired
+ private AuditManager auditManager;
+
+ @Autowired
+ private EntityFactory entityFactory;
+
+ /**
+ * Id, set by the caller, for identifying the task to be executed.
+ */
+ protected Long taskId;
+
+ /**
+ * The actual task to be executed.
+ */
+ protected Task task;
+
+ /**
+ * The current running thread containing the task to be executed.
+ */
+ protected AtomicReference<Thread> runningThread = new AtomicReference<Thread>();
+
+ /**
+ * Task id setter.
+ *
+ * @param taskId to be set
+ */
+ @Override
+ public void setTaskId(final Long taskId) {
+ this.taskId = taskId;
+ }
+
+ @Override
+ public void execute(final JobExecutionContext context) throws JobExecutionException {
+ this.runningThread.set(Thread.currentThread());
+ task = taskDAO.find(taskId);
+ if (task == null) {
+ throw new JobExecutionException("Task " + taskId + " not found");
+ }
+
+ TaskExec execution = entityFactory.newEntity(TaskExec.class);
+ execution.setStartDate(new Date());
+ execution.setTask(task);
+
+ Result result;
+
+ try {
+ execution.setMessage(doExecute(context.getMergedJobDataMap().getBoolean(DRY_RUN_JOBDETAIL_KEY)));
+ execution.setStatus(Status.SUCCESS.name());
+ result = Result.SUCCESS;
+ } catch (JobExecutionException e) {
+ LOG.error("While executing task " + taskId, e);
+ result = Result.FAILURE;
+
+ execution.setMessage(ExceptionUtils2.getFullStackTrace(e));
+ execution.setStatus(Status.FAILURE.name());
+ }
+ execution.setEndDate(new Date());
+
+ if (hasToBeRegistered(execution)) {
+ taskExecDAO.saveAndAdd(taskId, execution);
+ }
+ task = taskDAO.save(task);
+
+ notificationManager.createTasks(
+ AuditElements.EventCategoryType.TASK,
+ this.getClass().getSimpleName(),
+ null,
+ this.getClass().getSimpleName(), // searching for before object is too much expensive ...
+ result,
+ task,
+ execution);
+
+ auditManager.audit(
+ AuditElements.EventCategoryType.TASK,
+ task.getClass().getSimpleName(),
+ null,
+ null, // searching for before object is too much expensive ...
+ result,
+ task,
+ (Object[]) null);
+ }
+
+ /**
+ * The actual execution, delegated to child classes.
+ *
+ * @param dryRun whether to actually touch the data
+ * @return the task execution status to be set
+ * @throws JobExecutionException if anything goes wrong
+ */
+ protected abstract String doExecute(boolean dryRun) throws JobExecutionException;
+
+ /**
+ * Template method to determine whether this job's task execution has to be persisted or not.
+ *
+ * @param execution task execution
+ * @return wether to persist or not
+ */
+ protected boolean hasToBeRegistered(final TaskExec execution) {
+ return false;
+ }
+
+ @Override
+ public void interrupt() throws UnableToInterruptJobException {
+ Thread thread = this.runningThread.getAndSet(null);
- if (thread != null) {
- LOG.info("Interrupting job time {} ", (new SimpleDateFormat(SyncopeConstants.DEFAULT_DATE_PATTERN, Locale.
- getDefault())).format(new Date()));
- thread.interrupt();
++ if (thread == null) {
++ LOG.warn("Unable to retrieve the thread of the current job execution");
++ } else {
++ LOG.info("Interrupting job from thread {} at {} ", thread.getId(), DataFormat.format(new Date()));
++
++ long maxRetry = confDAO.find("tasks.interruptMaxRetries", "1").getValues().get(0).getLongValue();
++ for (int i = 0; i < maxRetry && thread.isAlive(); i++) {
++ thread.interrupt();
++ }
++ // if the thread is still alive, it should be available in the next stop
+ if (thread.isAlive()) {
- long maxRetry = confDAO.find("tasks.interruptMaxRetries", "0").getValues().get(0).getLongValue();
- for (int i = 0; i <= maxRetry && thread.isAlive(); i++) {
- thread.interrupt();
- }
- //if the thread is still alive, it should be available in the next stop
- if (thread.isAlive()) {
- this.runningThread.set(thread);
- }
++ this.runningThread.set(thread);
+ }
- } else {
- LOG.warn("Unable to retrieve the right thread related to the current job execution");
+ }
+ }
+}