You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by st...@apache.org on 2018/03/20 15:13:26 UTC
incubator-batchee git commit: BATCHEE-131 keep track of running jobs
Repository: incubator-batchee
Updated Branches:
refs/heads/master ab0fb6f57 -> aef149a0a
BATCHEE-131 keep track of running jobs
and actively stop them on shutdown.
Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/aef149a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/aef149a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/aef149a0
Branch: refs/heads/master
Commit: aef149a0afdedd6b3d83079c7494a80c0f8adf5c
Parents: ab0fb6f
Author: Mark Struberg <st...@apache.org>
Authored: Tue Mar 20 16:12:38 2018 +0100
Committer: Mark Struberg <st...@apache.org>
Committed: Tue Mar 20 16:12:38 2018 +0100
----------------------------------------------------------------------
.../batchee/container/impl/JobOperatorImpl.java | 6 ++++-
.../thread/AsyncEjbBatchThreadPoolService.java | 28 ++++++++++++++++----
.../services/thread/ThreadExecutorEjb.java | 18 +++++++++++++
3 files changed, 46 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/aef149a0/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java b/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
index 27082f7..af0f54d 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
@@ -61,7 +61,7 @@ import java.util.logging.Logger;
import static org.apache.batchee.container.util.ClassLoaderAwareHandler.makeLoaderAware;
-public class JobOperatorImpl implements JobOperator {
+public class JobOperatorImpl implements JobOperator, AutoCloseable {
private static final Logger LOGGER = Logger.getLogger(JobOperatorImpl.class.getName());
static {
@@ -115,6 +115,10 @@ public class JobOperatorImpl implements JobOperator {
this(ServicesManager.find());
}
+ public void close() throws Exception {
+
+ }
+
@Override
public long start(final String jobXMLName, final Properties jobParameters) throws JobStartException, JobSecurityException {
/*
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/aef149a0/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/AsyncEjbBatchThreadPoolService.java
----------------------------------------------------------------------
diff --git a/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/AsyncEjbBatchThreadPoolService.java b/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/AsyncEjbBatchThreadPoolService.java
index f4db719..df8beea 100644
--- a/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/AsyncEjbBatchThreadPoolService.java
+++ b/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/AsyncEjbBatchThreadPoolService.java
@@ -18,12 +18,17 @@ package org.apache.batchee.tools.services.thread;
import java.util.Properties;
import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.batch.operations.JobOperator;
+import javax.batch.runtime.BatchRuntime;
import javax.enterprise.context.spi.CreationalContext;
import javax.enterprise.inject.spi.Bean;
import javax.enterprise.inject.spi.BeanManager;
import org.apache.batchee.container.cdi.BatchCDIInjectionExtension;
+import org.apache.batchee.container.util.BatchWorkUnit;
import org.apache.batchee.spi.BatchThreadPoolService;
/**
@@ -43,10 +48,12 @@ import org.apache.batchee.spi.BatchThreadPoolService;
*
*/
public class AsyncEjbBatchThreadPoolService implements BatchThreadPoolService {
-
+
+ private static final Logger logger = Logger.getLogger(AsyncEjbBatchThreadPoolService.class.getName());
+
private BeanManager beanManager;
private ThreadExecutorEjb threadExecutorEjb;
-
+
@Override
public void init(Properties batchConfig) {
beanManager = BatchCDIInjectionExtension.getInstance().getBeanManager();
@@ -65,8 +72,19 @@ public class AsyncEjbBatchThreadPoolService implements BatchThreadPoolService {
@Override
public void shutdown() {
- // We cannot force an async EJB to shutdown.
- // This usually works out of the box if the container EJB
- // undeploys or stops the application.
+ Set<BatchWorkUnit> runningBatchWorkUnits = threadExecutorEjb.getRunningBatchWorkUnits();
+ if (!runningBatchWorkUnits.isEmpty()) {
+ JobOperator jobOperator = BatchRuntime.getJobOperator();
+ for (BatchWorkUnit batchWorkUnit : runningBatchWorkUnits) {
+ try {
+ long executionId = batchWorkUnit.getJobExecutionImpl().getExecutionId();
+ if (executionId >= 0) {
+ jobOperator.stop(executionId);
+ }
+ } catch(Exception e) {
+ logger.log(Level.SEVERE, "Failure while shutting down execution", e);
+ }
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/aef149a0/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/ThreadExecutorEjb.java
----------------------------------------------------------------------
diff --git a/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/ThreadExecutorEjb.java b/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/ThreadExecutorEjb.java
index d4923d9..edb732e 100644
--- a/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/ThreadExecutorEjb.java
+++ b/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/ThreadExecutorEjb.java
@@ -16,6 +16,8 @@
*/
package org.apache.batchee.tools.services.thread;
+import org.apache.batchee.container.util.BatchWorkUnit;
+
import javax.annotation.Resource;
import javax.ejb.Asynchronous;
import javax.ejb.Lock;
@@ -24,6 +26,9 @@ import javax.ejb.Singleton;
import javax.ejb.TransactionManagement;
import javax.ejb.TransactionManagementType;
import javax.transaction.UserTransaction;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
/**
* Small helper class to allow new threads being created via the
@@ -42,18 +47,31 @@ public class ThreadExecutorEjb {
@Resource
private UserTransaction ut;
+ private Set<BatchWorkUnit> runningBatchWorkUnits = Collections.synchronizedSet(new HashSet<BatchWorkUnit>());
+
+
private static ThreadLocal<UserTransaction> userTransactions = new ThreadLocal<UserTransaction>();
@Asynchronous
public void executeTask(Runnable work, Object config) {
try {
userTransactions.set(ut);
+ if (work instanceof BatchWorkUnit) {
+ runningBatchWorkUnits.add((BatchWorkUnit) work);
+ }
+
work.run();
} finally {
+ if (work instanceof BatchWorkUnit) {
+ runningBatchWorkUnits.remove(work);
+ }
userTransactions.remove();
}
}
+ public Set<BatchWorkUnit> getRunningBatchWorkUnits() {
+ return runningBatchWorkUnits;
+ }
public static UserTransaction getUserTransaction() {
return userTransactions.get();