You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by st...@apache.org on 2018/07/09 14:20:23 UTC
[1/2] incubator-batchee git commit: BATCHEE-131 shutdown BatchEE-CLI
on Ctrl-C
Repository: incubator-batchee
Updated Branches:
refs/heads/master 563c55eab -> 30aa2a4d7
BATCHEE-131 shutdown BatchEE-CLI on Ctrl-C
Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/60e7e575
Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/60e7e575
Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/60e7e575
Branch: refs/heads/master
Commit: 60e7e575a2c7c6195bafb7c86d3289bd53e8b26f
Parents: 563c55e
Author: Mark Struberg <st...@apache.org>
Authored: Mon Jul 9 13:54:47 2018 +0200
Committer: Mark Struberg <st...@apache.org>
Committed: Mon Jul 9 13:54:47 2018 +0200
----------------------------------------------------------------------
.../batchee/cli/command/JobOperatorCommand.java | 18 ++++++++++++++++--
.../cli/lifecycle/impl/OpenEJBLifecycle.java | 13 +++++++++++++
2 files changed, 29 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/60e7e575/tools/cli/src/main/java/org/apache/batchee/cli/command/JobOperatorCommand.java
----------------------------------------------------------------------
diff --git a/tools/cli/src/main/java/org/apache/batchee/cli/command/JobOperatorCommand.java b/tools/cli/src/main/java/org/apache/batchee/cli/command/JobOperatorCommand.java
index bec981d..bb10fe7 100644
--- a/tools/cli/src/main/java/org/apache/batchee/cli/command/JobOperatorCommand.java
+++ b/tools/cli/src/main/java/org/apache/batchee/cli/command/JobOperatorCommand.java
@@ -48,8 +48,7 @@ import static java.lang.Thread.currentThread;
* Note: the classloader is created from libs command, it is handy to organize batches
* by folders to be able to run them contextual using this command.
*/
-public abstract class JobOperatorCommand implements Runnable {
- // Remote config
+public abstract class JobOperatorCommand implements Runnable { // Remote config
@Option(name = "url", description = "when using JAXRS the batchee resource url")
protected String baseUrl = null;
@@ -178,6 +177,8 @@ public abstract class JobOperatorCommand implements Runnable {
if (lifecycle != null) {
lifecycleInstance = createLifecycle(loader);
state = lifecycleInstance.start();
+
+ registerShutdownHook(lifecycleInstance, state);
} else {
lifecycleInstance = null;
state = null;
@@ -195,6 +196,19 @@ public abstract class JobOperatorCommand implements Runnable {
}
}
+ private void registerShutdownHook(final Lifecycle<Object> lifecycleInstance, final Object state) {
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ // as System.out as the Logger might already be resetted.
+ // Additionally we want to give this message to the person hitting Ctrl-C
+ // and not
+ System.out.println("\n Shutting down the JBatch engine started...\n");
+ lifecycleInstance.stop(state);
+ }
+ });
+ }
+
private Lifecycle<Object> createLifecycle(final ClassLoader loader) {
// some shortcuts are nicer to use from CLI
if ("openejb".equalsIgnoreCase(lifecycle)) {
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/60e7e575/tools/cli/src/main/java/org/apache/batchee/cli/lifecycle/impl/OpenEJBLifecycle.java
----------------------------------------------------------------------
diff --git a/tools/cli/src/main/java/org/apache/batchee/cli/lifecycle/impl/OpenEJBLifecycle.java b/tools/cli/src/main/java/org/apache/batchee/cli/lifecycle/impl/OpenEJBLifecycle.java
index 3608f24..3ebc210 100644
--- a/tools/cli/src/main/java/org/apache/batchee/cli/lifecycle/impl/OpenEJBLifecycle.java
+++ b/tools/cli/src/main/java/org/apache/batchee/cli/lifecycle/impl/OpenEJBLifecycle.java
@@ -18,6 +18,8 @@ package org.apache.batchee.cli.lifecycle.impl;
import org.apache.batchee.cli.classloader.ChildFirstURLClassLoader;
import org.apache.batchee.container.exception.BatchContainerRuntimeException;
+import org.apache.batchee.container.services.ServicesManager;
+import org.apache.batchee.spi.BatchThreadPoolService;
import org.apache.openejb.OpenEjbContainer;
import org.apache.openejb.UndeployException;
import org.apache.openejb.assembler.classic.AppInfo;
@@ -39,12 +41,15 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
// EJBContainer doesn't support war deployment
public class OpenEJBLifecycle extends LifecycleBase<Object> {
private Assembler assembler = null;
private AppInfo info = null;
+ private AtomicBoolean running = new AtomicBoolean(false);
+
@Override
public Object start() {
final Map<String, Object> config = configuration("openejb");
@@ -78,6 +83,7 @@ public class OpenEJBLifecycle extends LifecycleBase<Object> {
assembler = SystemInstance.get().getComponent(Assembler.class);
assembler.createApplication(info, loader);
+ running.set(true);
return initialContext;
} catch (final Exception e) {
throw new BatchContainerRuntimeException(e);
@@ -94,6 +100,13 @@ public class OpenEJBLifecycle extends LifecycleBase<Object> {
@Override
public void stop(final Object state) {
+ if (!running.compareAndSet(true, false)) {
+ return;
+ }
+
+ BatchThreadPoolService threadPoolService = ServicesManager.find().service(BatchThreadPoolService.class);
+ threadPoolService.shutdown();
+
if (state != null) {
if (assembler != null && info != null) {
try {
[2/2] incubator-batchee git commit: BATCHEE-131 keep track of running
threads
Posted by st...@apache.org.
BATCHEE-131 keep track of running threads
And shut them down if the container stops
Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/30aa2a4d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/30aa2a4d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/30aa2a4d
Branch: refs/heads/master
Commit: 30aa2a4d7c86a151abd7ddcdeaff7ad1a2be0994
Parents: 60e7e57
Author: Mark Struberg <st...@apache.org>
Authored: Mon Jul 9 16:19:07 2018 +0200
Committer: Mark Struberg <st...@apache.org>
Committed: Mon Jul 9 16:19:07 2018 +0200
----------------------------------------------------------------------
.../executor/AbstractThreadPoolService.java | 159 +++++++++++++------
1 file changed, 107 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/30aa2a4d/jbatch/src/main/java/org/apache/batchee/container/services/executor/AbstractThreadPoolService.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/executor/AbstractThreadPoolService.java b/jbatch/src/main/java/org/apache/batchee/container/services/executor/AbstractThreadPoolService.java
index 1f92e2f..03c38bc 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/services/executor/AbstractThreadPoolService.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/executor/AbstractThreadPoolService.java
@@ -1,52 +1,107 @@
-/*
- * Copyright 2013 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.batchee.container.services.executor;
-
-import org.apache.batchee.container.exception.BatchContainerServiceException;
-import org.apache.batchee.spi.BatchThreadPoolService;
-
-import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-
-import static org.apache.batchee.container.util.ClassLoaderAwareHandler.runnableLoaderAware;
-
-public abstract class AbstractThreadPoolService implements BatchThreadPoolService {
- protected ExecutorService executorService;
-
- protected abstract ExecutorService newExecutorService(Properties batchConfig);
-
- @Override
- public void init(final Properties batchConfig) throws BatchContainerServiceException {
- executorService = newExecutorService(batchConfig);
- }
-
- @Override
- public void shutdown() throws BatchContainerServiceException {
- executorService.shutdownNow();
- executorService = null;
- }
-
- @Override
- public void executeTask(final Runnable work, final Object config) {
- executorService.execute(runnableLoaderAware(work));
- }
-
- @Override
- public String toString() {
- return getClass().getName();
- }
-}
+/*
+ * Copyright 2013 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.batchee.container.services.executor;
+
+import org.apache.batchee.container.exception.BatchContainerServiceException;
+import org.apache.batchee.container.util.BatchWorkUnit;
+import org.apache.batchee.spi.BatchThreadPoolService;
+
+import javax.batch.operations.JobOperator;
+import javax.batch.runtime.BatchRuntime;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static org.apache.batchee.container.util.ClassLoaderAwareHandler.runnableLoaderAware;
+
+public abstract class AbstractThreadPoolService implements BatchThreadPoolService {
+ private final static Logger LOGGER = Logger.getLogger(AbstractThreadPoolService.class.getName());
+
+ protected ExecutorService executorService;
+
+ volatile boolean shutdown = false;
+
+ private Set<BatchWorkUnit> runningBatchWorkUnits = Collections.synchronizedSet(new HashSet<BatchWorkUnit>());
+
+ protected abstract ExecutorService newExecutorService(Properties batchConfig);
+
+ @Override
+ public void init(final Properties batchConfig) throws BatchContainerServiceException {
+ executorService = newExecutorService(batchConfig);
+ }
+
+ @Override
+ public void shutdown() throws BatchContainerServiceException {
+ this.shutdown = true;
+ if (!runningBatchWorkUnits.isEmpty()) {
+ JobOperator jobOperator = BatchRuntime.getJobOperator();
+ for (BatchWorkUnit batchWorkUnit : runningBatchWorkUnits) {
+ try {
+ long executionId = batchWorkUnit.getJobExecutionImpl().getExecutionId();
+ if (executionId >= 0) {
+ jobOperator.stop(executionId);
+ }
+ } catch (Exception e) {
+ LOGGER.log(Level.SEVERE, "Failure while shutting down execution", e);
+ }
+ }
+ }
+
+ executorService.shutdownNow();
+ executorService = null;
+ }
+
+ @Override
+ public void executeTask(final Runnable work, final Object config) {
+ if (shutdown) {
+ throw new IllegalStateException("Refuse to start Batch Task due to shutdown being in progress!");
+ }
+ executorService.execute(runnableLoaderAware(new ActiveWorkTracker(work)));
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getName();
+ }
+
+ class ActiveWorkTracker implements Runnable {
+ private final Runnable work;
+
+ ActiveWorkTracker(Runnable work) {
+ this.work = work;
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (work instanceof BatchWorkUnit) {
+ runningBatchWorkUnits.add((BatchWorkUnit) work);
+ }
+ work.run();
+ } finally {
+ if (work instanceof BatchWorkUnit) {
+ runningBatchWorkUnits.remove(work);
+ }
+ }
+ }
+ }
+
+}