You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by st...@apache.org on 2018/07/09 14:20:23 UTC

[1/2] incubator-batchee git commit: BATCHEE-131 shutdown BatchEE-CLI on Ctrl-C

Repository: incubator-batchee
Updated Branches:
  refs/heads/master 563c55eab -> 30aa2a4d7


BATCHEE-131 shutdown BatchEE-CLI on Ctrl-C


Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/60e7e575
Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/60e7e575
Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/60e7e575

Branch: refs/heads/master
Commit: 60e7e575a2c7c6195bafb7c86d3289bd53e8b26f
Parents: 563c55e
Author: Mark Struberg <st...@apache.org>
Authored: Mon Jul 9 13:54:47 2018 +0200
Committer: Mark Struberg <st...@apache.org>
Committed: Mon Jul 9 13:54:47 2018 +0200

----------------------------------------------------------------------
 .../batchee/cli/command/JobOperatorCommand.java   | 18 ++++++++++++++++--
 .../cli/lifecycle/impl/OpenEJBLifecycle.java      | 13 +++++++++++++
 2 files changed, 29 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/60e7e575/tools/cli/src/main/java/org/apache/batchee/cli/command/JobOperatorCommand.java
----------------------------------------------------------------------
diff --git a/tools/cli/src/main/java/org/apache/batchee/cli/command/JobOperatorCommand.java b/tools/cli/src/main/java/org/apache/batchee/cli/command/JobOperatorCommand.java
index bec981d..bb10fe7 100644
--- a/tools/cli/src/main/java/org/apache/batchee/cli/command/JobOperatorCommand.java
+++ b/tools/cli/src/main/java/org/apache/batchee/cli/command/JobOperatorCommand.java
@@ -48,8 +48,7 @@ import static java.lang.Thread.currentThread;
  * Note: the classloader is created from libs command, it is handy to organize batches
  *       by folders to be able to run them contextual using this command.
 */
-public abstract class JobOperatorCommand implements Runnable {
-    // Remote config
+public abstract class JobOperatorCommand implements Runnable {    // Remote config
 
     @Option(name = "url", description = "when using JAXRS the batchee resource url")
     protected String baseUrl = null;
@@ -178,6 +177,8 @@ public abstract class JobOperatorCommand implements Runnable {
             if (lifecycle != null) {
                 lifecycleInstance = createLifecycle(loader);
                 state = lifecycleInstance.start();
+
+                registerShutdownHook(lifecycleInstance, state);
             } else {
                 lifecycleInstance = null;
                 state = null;
@@ -195,6 +196,19 @@ public abstract class JobOperatorCommand implements Runnable {
         }
     }
 
+    private void registerShutdownHook(final Lifecycle<Object> lifecycleInstance, final Object state) {
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                // as System.out as the Logger might already be resetted.
+                // Additionally we want to give this message to the person hitting Ctrl-C
+                // and not
+                System.out.println("\n    Shutting down the JBatch engine started...\n");
+                lifecycleInstance.stop(state);
+             }
+        });
+    }
+
     private Lifecycle<Object> createLifecycle(final ClassLoader loader) {
         // some shortcuts are nicer to use from CLI
         if ("openejb".equalsIgnoreCase(lifecycle)) {

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/60e7e575/tools/cli/src/main/java/org/apache/batchee/cli/lifecycle/impl/OpenEJBLifecycle.java
----------------------------------------------------------------------
diff --git a/tools/cli/src/main/java/org/apache/batchee/cli/lifecycle/impl/OpenEJBLifecycle.java b/tools/cli/src/main/java/org/apache/batchee/cli/lifecycle/impl/OpenEJBLifecycle.java
index 3608f24..3ebc210 100644
--- a/tools/cli/src/main/java/org/apache/batchee/cli/lifecycle/impl/OpenEJBLifecycle.java
+++ b/tools/cli/src/main/java/org/apache/batchee/cli/lifecycle/impl/OpenEJBLifecycle.java
@@ -18,6 +18,8 @@ package org.apache.batchee.cli.lifecycle.impl;
 
 import org.apache.batchee.cli.classloader.ChildFirstURLClassLoader;
 import org.apache.batchee.container.exception.BatchContainerRuntimeException;
+import org.apache.batchee.container.services.ServicesManager;
+import org.apache.batchee.spi.BatchThreadPoolService;
 import org.apache.openejb.OpenEjbContainer;
 import org.apache.openejb.UndeployException;
 import org.apache.openejb.assembler.classic.AppInfo;
@@ -39,12 +41,15 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 // EJBContainer doesn't support war deployment
 public class OpenEJBLifecycle extends LifecycleBase<Object> {
     private Assembler assembler = null;
     private AppInfo info = null;
 
+    private AtomicBoolean running = new AtomicBoolean(false);
+
     @Override
     public Object start() {
         final Map<String, Object> config = configuration("openejb");
@@ -78,6 +83,7 @@ public class OpenEJBLifecycle extends LifecycleBase<Object> {
                         assembler = SystemInstance.get().getComponent(Assembler.class);
                         assembler.createApplication(info, loader);
 
+                        running.set(true);
                         return initialContext;
                     } catch (final Exception e) {
                         throw new BatchContainerRuntimeException(e);
@@ -94,6 +100,13 @@ public class OpenEJBLifecycle extends LifecycleBase<Object> {
 
     @Override
     public void stop(final Object state) {
+        if (!running.compareAndSet(true, false)) {
+            return;
+        }
+
+        BatchThreadPoolService threadPoolService = ServicesManager.find().service(BatchThreadPoolService.class);
+        threadPoolService.shutdown();
+
         if (state != null) {
             if (assembler != null && info != null) {
                 try {


[2/2] incubator-batchee git commit: BATCHEE-131 keep track of running threads

Posted by st...@apache.org.
BATCHEE-131 keep track of running threads

And shut them down if the container stops


Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/30aa2a4d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/30aa2a4d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/30aa2a4d

Branch: refs/heads/master
Commit: 30aa2a4d7c86a151abd7ddcdeaff7ad1a2be0994
Parents: 60e7e57
Author: Mark Struberg <st...@apache.org>
Authored: Mon Jul 9 16:19:07 2018 +0200
Committer: Mark Struberg <st...@apache.org>
Committed: Mon Jul 9 16:19:07 2018 +0200

----------------------------------------------------------------------
 .../executor/AbstractThreadPoolService.java     | 159 +++++++++++++------
 1 file changed, 107 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/30aa2a4d/jbatch/src/main/java/org/apache/batchee/container/services/executor/AbstractThreadPoolService.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/executor/AbstractThreadPoolService.java b/jbatch/src/main/java/org/apache/batchee/container/services/executor/AbstractThreadPoolService.java
index 1f92e2f..03c38bc 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/services/executor/AbstractThreadPoolService.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/executor/AbstractThreadPoolService.java
@@ -1,52 +1,107 @@
-/*
- * Copyright 2013 International Business Machines Corp.
- * 
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License, 
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.batchee.container.services.executor;
-
-import org.apache.batchee.container.exception.BatchContainerServiceException;
-import org.apache.batchee.spi.BatchThreadPoolService;
-
-import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-
-import static org.apache.batchee.container.util.ClassLoaderAwareHandler.runnableLoaderAware;
-
-public abstract class AbstractThreadPoolService implements BatchThreadPoolService {
-    protected ExecutorService executorService;
-
-    protected abstract ExecutorService newExecutorService(Properties batchConfig);
-
-    @Override
-    public void init(final Properties batchConfig) throws BatchContainerServiceException {
-        executorService = newExecutorService(batchConfig);
-    }
-
-    @Override
-    public void shutdown() throws BatchContainerServiceException {
-        executorService.shutdownNow();
-        executorService = null;
-    }
-
-    @Override
-    public void executeTask(final Runnable work, final Object config) {
-        executorService.execute(runnableLoaderAware(work));
-    }
-
-    @Override
-    public String toString() {
-        return getClass().getName();
-    }
-}
+/*
+ * Copyright 2013 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.batchee.container.services.executor;
+
+import org.apache.batchee.container.exception.BatchContainerServiceException;
+import org.apache.batchee.container.util.BatchWorkUnit;
+import org.apache.batchee.spi.BatchThreadPoolService;
+
+import javax.batch.operations.JobOperator;
+import javax.batch.runtime.BatchRuntime;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static org.apache.batchee.container.util.ClassLoaderAwareHandler.runnableLoaderAware;
+
+public abstract class AbstractThreadPoolService implements BatchThreadPoolService {
+    private final static Logger LOGGER = Logger.getLogger(AbstractThreadPoolService.class.getName());
+
+    protected ExecutorService executorService;
+
+    volatile boolean shutdown = false;
+
+    private Set<BatchWorkUnit> runningBatchWorkUnits = Collections.synchronizedSet(new HashSet<BatchWorkUnit>());
+
+    protected abstract ExecutorService newExecutorService(Properties batchConfig);
+
+    @Override
+    public void init(final Properties batchConfig) throws BatchContainerServiceException {
+        executorService = newExecutorService(batchConfig);
+    }
+
+    @Override
+    public void shutdown() throws BatchContainerServiceException {
+        this.shutdown = true;
+        if (!runningBatchWorkUnits.isEmpty()) {
+            JobOperator jobOperator = BatchRuntime.getJobOperator();
+            for (BatchWorkUnit batchWorkUnit : runningBatchWorkUnits) {
+                try {
+                    long executionId = batchWorkUnit.getJobExecutionImpl().getExecutionId();
+                    if (executionId >= 0) {
+                        jobOperator.stop(executionId);
+                    }
+                } catch (Exception e) {
+                    LOGGER.log(Level.SEVERE, "Failure while shutting down execution", e);
+                }
+            }
+        }
+
+        executorService.shutdownNow();
+        executorService = null;
+    }
+
+    @Override
+    public void executeTask(final Runnable work, final Object config) {
+        if (shutdown) {
+            throw new IllegalStateException("Refuse to start Batch Task due to shutdown being in progress!");
+        }
+        executorService.execute(runnableLoaderAware(new ActiveWorkTracker(work)));
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getName();
+    }
+
+    class ActiveWorkTracker implements Runnable {
+        private final Runnable work;
+
+        ActiveWorkTracker(Runnable work) {
+            this.work = work;
+        }
+
+        @Override
+        public void run() {
+            try {
+                if (work instanceof BatchWorkUnit) {
+                    runningBatchWorkUnits.add((BatchWorkUnit) work);
+                }
+                work.run();
+            } finally {
+                if (work instanceof BatchWorkUnit) {
+                    runningBatchWorkUnits.remove(work);
+                }
+            }
+        }
+    }
+
+}