You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@syncope.apache.org by gi...@apache.org on 2015/06/22 16:49:07 UTC

[2/2] syncope git commit: [SYNCOPE-660]Forcing interrupt for heavy tasks - minor improvement

[SYNCOPE-660]Forcing interrupt for heavy tasks - minor improvement


Project: http://git-wip-us.apache.org/repos/asf/syncope/repo
Commit: http://git-wip-us.apache.org/repos/asf/syncope/commit/b13e9fe4
Tree: http://git-wip-us.apache.org/repos/asf/syncope/tree/b13e9fe4
Diff: http://git-wip-us.apache.org/repos/asf/syncope/diff/b13e9fe4

Branch: refs/heads/master
Commit: b13e9fe4dd226d7ef48b249447289e3a4451c84b
Parents: 0ae4ac8 bdb3257
Author: giacomolm <gi...@hotmail.it>
Authored: Mon Jun 22 16:45:28 2015 +0200
Committer: giacomolm <gi...@hotmail.it>
Committed: Mon Jun 22 16:45:28 2015 +0200

----------------------------------------------------------------------
 .../provisioning/java/job/AbstractTaskJob.java  | 29 ++++++++------------
 1 file changed, 12 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/syncope/blob/b13e9fe4/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractTaskJob.java
----------------------------------------------------------------------
diff --cc core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractTaskJob.java
index 4c49655,0000000..728ab41
mode 100644,000000..100644
--- a/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractTaskJob.java
+++ b/core/provisioning-java/src/main/java/org/apache/syncope/core/provisioning/java/job/AbstractTaskJob.java
@@@ -1,221 -1,0 +1,216 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.syncope.core.provisioning.java.job;
 +
- import java.text.SimpleDateFormat;
 +import java.util.Date;
- import java.util.Locale;
 +import java.util.concurrent.atomic.AtomicReference;
- import org.apache.syncope.common.lib.SyncopeConstants;
 +import org.apache.syncope.common.lib.types.AuditElements;
 +import org.apache.syncope.common.lib.types.AuditElements.Result;
 +import org.apache.syncope.core.persistence.api.dao.TaskDAO;
 +import org.apache.syncope.core.persistence.api.dao.TaskExecDAO;
 +import org.apache.syncope.core.persistence.api.entity.EntityFactory;
 +import org.apache.syncope.core.persistence.api.entity.task.Task;
 +import org.apache.syncope.core.persistence.api.entity.task.TaskExec;
 +import org.apache.syncope.core.provisioning.api.job.TaskJob;
 +import org.apache.syncope.core.misc.AuditManager;
++import org.apache.syncope.core.misc.DataFormat;
 +import org.apache.syncope.core.misc.ExceptionUtils2;
 +import org.apache.syncope.core.persistence.api.dao.ConfDAO;
 +import org.apache.syncope.core.provisioning.api.notification.NotificationManager;
 +import org.quartz.DisallowConcurrentExecution;
 +import org.quartz.JobExecutionContext;
 +import org.quartz.JobExecutionException;
 +import org.quartz.UnableToInterruptJobException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +import org.springframework.beans.factory.annotation.Autowired;
 +
 +/**
 + * Abstract job implementation that delegates to concrete implementation the actual job execution and provides some
 + * base features.
 + * <strong>Extending this class will not provide support transaction management.</strong><br/>
 + * Extend <tt>AbstractTransactionalTaskJob</tt> for this purpose.
 + *
 + * @see AbstractTransactionalTaskJob
 + */
 +@DisallowConcurrentExecution
 +public abstract class AbstractTaskJob implements TaskJob {
 +
 +    /**
 +     * Task execution status.
 +     */
 +    public enum Status {
 +
 +        SUCCESS,
 +        FAILURE
 +
 +    }
 +
 +    /**
 +     * Logger.
 +     */
 +    protected static final Logger LOG = LoggerFactory.getLogger(AbstractTaskJob.class);
 +
 +    /**
 +     * Task DAO.
 +     */
 +    @Autowired
 +    protected TaskDAO taskDAO;
 +
 +    /**
 +     * Task execution DAO.
 +     */
 +    @Autowired
 +    private TaskExecDAO taskExecDAO;
 +
 +    /**
 +     * Configuration DAO.
 +     */
 +    @Autowired
 +    private ConfDAO confDAO;
 +
 +    /**
 +     * Notification manager.
 +     */
 +    @Autowired
 +    private NotificationManager notificationManager;
 +
 +    /**
 +     * Audit manager.
 +     */
 +    @Autowired
 +    private AuditManager auditManager;
 +
 +    @Autowired
 +    private EntityFactory entityFactory;
 +
 +    /**
 +     * Id, set by the caller, for identifying the task to be executed.
 +     */
 +    protected Long taskId;
 +
 +    /**
 +     * The actual task to be executed.
 +     */
 +    protected Task task;
 +
 +    /**
 +     * The current running thread containing the task to be executed.
 +     */
 +    protected AtomicReference<Thread> runningThread = new AtomicReference<Thread>();
 +
 +    /**
 +     * Task id setter.
 +     *
 +     * @param taskId to be set
 +     */
 +    @Override
 +    public void setTaskId(final Long taskId) {
 +        this.taskId = taskId;
 +    }
 +
 +    @Override
 +    public void execute(final JobExecutionContext context) throws JobExecutionException {
 +        this.runningThread.set(Thread.currentThread());
 +        task = taskDAO.find(taskId);
 +        if (task == null) {
 +            throw new JobExecutionException("Task " + taskId + " not found");
 +        }
 +
 +        TaskExec execution = entityFactory.newEntity(TaskExec.class);
 +        execution.setStartDate(new Date());
 +        execution.setTask(task);
 +
 +        Result result;
 +
 +        try {
 +            execution.setMessage(doExecute(context.getMergedJobDataMap().getBoolean(DRY_RUN_JOBDETAIL_KEY)));
 +            execution.setStatus(Status.SUCCESS.name());
 +            result = Result.SUCCESS;
 +        } catch (JobExecutionException e) {
 +            LOG.error("While executing task " + taskId, e);
 +            result = Result.FAILURE;
 +
 +            execution.setMessage(ExceptionUtils2.getFullStackTrace(e));
 +            execution.setStatus(Status.FAILURE.name());
 +        }
 +        execution.setEndDate(new Date());
 +
 +        if (hasToBeRegistered(execution)) {
 +            taskExecDAO.saveAndAdd(taskId, execution);
 +        }
 +        task = taskDAO.save(task);
 +
 +        notificationManager.createTasks(
 +                AuditElements.EventCategoryType.TASK,
 +                this.getClass().getSimpleName(),
 +                null,
 +                this.getClass().getSimpleName(), // searching for before object is too much expensive ...
 +                result,
 +                task,
 +                execution);
 +
 +        auditManager.audit(
 +                AuditElements.EventCategoryType.TASK,
 +                task.getClass().getSimpleName(),
 +                null,
 +                null, // searching for before object is too much expensive ...
 +                result,
 +                task,
 +                (Object[]) null);
 +    }
 +
 +    /**
 +     * The actual execution, delegated to child classes.
 +     *
 +     * @param dryRun whether to actually touch the data
 +     * @return the task execution status to be set
 +     * @throws JobExecutionException if anything goes wrong
 +     */
 +    protected abstract String doExecute(boolean dryRun) throws JobExecutionException;
 +
 +    /**
 +     * Template method to determine whether this job's task execution has to be persisted or not.
 +     *
 +     * @param execution task execution
 +     * @return wether to persist or not
 +     */
 +    protected boolean hasToBeRegistered(final TaskExec execution) {
 +        return false;
 +    }
 +
 +    @Override
 +    public void interrupt() throws UnableToInterruptJobException {
 +        Thread thread = this.runningThread.getAndSet(null);
-         if (thread != null) {
-             LOG.info("Interrupting job time {} ", (new SimpleDateFormat(SyncopeConstants.DEFAULT_DATE_PATTERN, Locale.
-                     getDefault())).format(new Date()));
-             thread.interrupt();
++        if (thread == null) {
++            LOG.warn("Unable to retrieve the thread of the current job execution");
++        } else {
++            LOG.info("Interrupting job from thread {} at {} ", thread.getId(), DataFormat.format(new Date()));
++
++            long maxRetry = confDAO.find("tasks.interruptMaxRetries", "1").getValues().get(0).getLongValue();
++            for (int i = 0; i < maxRetry && thread.isAlive(); i++) {
++                thread.interrupt();
++            }
++            // if the thread is still alive, it should be available in the next stop
 +            if (thread.isAlive()) {
-                 long maxRetry = confDAO.find("tasks.interruptMaxRetries", "0").getValues().get(0).getLongValue();
-                 for (int i = 0; i <= maxRetry && thread.isAlive(); i++) {
-                     thread.interrupt();
-                 }
-                 //if the thread is still alive, it should be available in the next stop
-                 if (thread.isAlive()) {
-                     this.runningThread.set(thread);
-                 }
++                this.runningThread.set(thread);
 +            }
-         } else {
-             LOG.warn("Unable to retrieve the right thread related to the current job execution");
 +        }
 +    }
 +}