You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by st...@apache.org on 2018/03/20 15:13:26 UTC

incubator-batchee git commit: BATCHEE-131 keep track of running jobs

Repository: incubator-batchee
Updated Branches:
  refs/heads/master ab0fb6f57 -> aef149a0a


BATCHEE-131 keep track of running jobs

and actively stop them on shutdown.


Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/aef149a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/aef149a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/aef149a0

Branch: refs/heads/master
Commit: aef149a0afdedd6b3d83079c7494a80c0f8adf5c
Parents: ab0fb6f
Author: Mark Struberg <st...@apache.org>
Authored: Tue Mar 20 16:12:38 2018 +0100
Committer: Mark Struberg <st...@apache.org>
Committed: Tue Mar 20 16:12:38 2018 +0100

----------------------------------------------------------------------
 .../batchee/container/impl/JobOperatorImpl.java |  6 ++++-
 .../thread/AsyncEjbBatchThreadPoolService.java  | 28 ++++++++++++++++----
 .../services/thread/ThreadExecutorEjb.java      | 18 +++++++++++++
 3 files changed, 46 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/aef149a0/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java b/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
index 27082f7..af0f54d 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java
@@ -61,7 +61,7 @@ import java.util.logging.Logger;
 import static org.apache.batchee.container.util.ClassLoaderAwareHandler.makeLoaderAware;
 
 
-public class JobOperatorImpl implements JobOperator {
+public class JobOperatorImpl implements JobOperator, AutoCloseable {
     private static final Logger LOGGER = Logger.getLogger(JobOperatorImpl.class.getName());
 
     static {
@@ -115,6 +115,10 @@ public class JobOperatorImpl implements JobOperator {
         this(ServicesManager.find());
     }
 
+    public void close() throws Exception {
+
+    }
+
     @Override
     public long start(final String jobXMLName, final Properties jobParameters) throws JobStartException, JobSecurityException {
         /*

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/aef149a0/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/AsyncEjbBatchThreadPoolService.java
----------------------------------------------------------------------
diff --git a/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/AsyncEjbBatchThreadPoolService.java b/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/AsyncEjbBatchThreadPoolService.java
index f4db719..df8beea 100644
--- a/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/AsyncEjbBatchThreadPoolService.java
+++ b/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/AsyncEjbBatchThreadPoolService.java
@@ -18,12 +18,17 @@ package org.apache.batchee.tools.services.thread;
 
 import java.util.Properties;
 import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
+import javax.batch.operations.JobOperator;
+import javax.batch.runtime.BatchRuntime;
 import javax.enterprise.context.spi.CreationalContext;
 import javax.enterprise.inject.spi.Bean;
 import javax.enterprise.inject.spi.BeanManager;
 
 import org.apache.batchee.container.cdi.BatchCDIInjectionExtension;
+import org.apache.batchee.container.util.BatchWorkUnit;
 import org.apache.batchee.spi.BatchThreadPoolService;
 
 /**
@@ -43,10 +48,12 @@ import org.apache.batchee.spi.BatchThreadPoolService;
  *
  */
 public class AsyncEjbBatchThreadPoolService implements BatchThreadPoolService {
-    
+
+    private static final Logger logger = Logger.getLogger(AsyncEjbBatchThreadPoolService.class.getName());
+
     private BeanManager beanManager;
     private ThreadExecutorEjb threadExecutorEjb;
-    
+
     @Override
     public void init(Properties batchConfig) {
         beanManager = BatchCDIInjectionExtension.getInstance().getBeanManager();
@@ -65,8 +72,19 @@ public class AsyncEjbBatchThreadPoolService implements BatchThreadPoolService {
     
     @Override
     public void shutdown() {
-        // We cannot force an async EJB to shutdown.
-        // This usually works out of the box if the container EJB
-        // undeploys or stops the application.
+        Set<BatchWorkUnit> runningBatchWorkUnits = threadExecutorEjb.getRunningBatchWorkUnits();
+        if (!runningBatchWorkUnits.isEmpty()) {
+            JobOperator jobOperator = BatchRuntime.getJobOperator();
+            for (BatchWorkUnit batchWorkUnit : runningBatchWorkUnits) {
+                try {
+                    long executionId = batchWorkUnit.getJobExecutionImpl().getExecutionId();
+                    if (executionId >= 0) {
+                        jobOperator.stop(executionId);
+                    }
+                } catch(Exception e) {
+                    logger.log(Level.SEVERE, "Failure while shutting down execution", e);
+                }
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/aef149a0/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/ThreadExecutorEjb.java
----------------------------------------------------------------------
diff --git a/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/ThreadExecutorEjb.java b/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/ThreadExecutorEjb.java
index d4923d9..edb732e 100644
--- a/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/ThreadExecutorEjb.java
+++ b/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/ThreadExecutorEjb.java
@@ -16,6 +16,8 @@
  */
 package org.apache.batchee.tools.services.thread;
 
+import org.apache.batchee.container.util.BatchWorkUnit;
+
 import javax.annotation.Resource;
 import javax.ejb.Asynchronous;
 import javax.ejb.Lock;
@@ -24,6 +26,9 @@ import javax.ejb.Singleton;
 import javax.ejb.TransactionManagement;
 import javax.ejb.TransactionManagementType;
 import javax.transaction.UserTransaction;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
  * Small helper class to allow new threads being created via the
@@ -42,18 +47,31 @@ public class ThreadExecutorEjb {
     @Resource
     private UserTransaction ut;
 
+    private Set<BatchWorkUnit> runningBatchWorkUnits = Collections.synchronizedSet(new HashSet<BatchWorkUnit>());
+
+
     private static ThreadLocal<UserTransaction> userTransactions = new ThreadLocal<UserTransaction>();
 
     @Asynchronous
     public void executeTask(Runnable work, Object config) {
         try {
             userTransactions.set(ut);
+            if (work instanceof BatchWorkUnit) {
+                runningBatchWorkUnits.add((BatchWorkUnit) work);
+            }
+
             work.run();
         } finally {
+            if (work instanceof BatchWorkUnit) {
+                runningBatchWorkUnits.remove(work);
+            }
             userTransactions.remove();
         }
     }
 
+    public Set<BatchWorkUnit> getRunningBatchWorkUnits() {
+        return runningBatchWorkUnits;
+    }
 
     public static UserTransaction getUserTransaction() {
         return userTransactions.get();