You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rs...@apache.org on 2016/02/15 16:00:50 UTC

incubator-batchee git commit: BATCHEE-95 stop subJobs only if they are running

Repository: incubator-batchee
Updated Branches:
  refs/heads/master 1044c365b -> 24097670c


BATCHEE-95 stop subJobs only if they are running


Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/24097670
Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/24097670
Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/24097670

Branch: refs/heads/master
Commit: 24097670c0723dc010c8c5a6813aa0f1edbfedc0
Parents: 1044c36
Author: Reinhard Sandtner <rs...@apache.org>
Authored: Mon Feb 15 15:59:12 2016 +0100
Committer: Reinhard Sandtner <rs...@apache.org>
Committed: Mon Feb 15 15:59:12 2016 +0100

----------------------------------------------------------------------
 .../controller/PartitionedStepController.java   |   8 +-
 .../test/partitioned/PartitionedBatchTest.java  | 141 +++++++++++++++++++
 .../META-INF/batch-jobs/partition-stop.xml      |  30 ++++
 3 files changed, 178 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/24097670/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
index f98420e..e072a73 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
@@ -103,7 +103,13 @@ public class PartitionedStepController extends BaseStepController {
             if (parallelBatchWorkUnits != null) {
                 for (BatchWorkUnit subJob : parallelBatchWorkUnits) {
                     try {
-                        kernelService.stopJob(subJob.getJobExecutionImpl().getExecutionId());
+
+                        // only try to stop the sub-jobs if they are running
+                        if (subJob.getJobExecutionImpl().getBatchStatus() == BatchStatus.STARTING ||
+                            subJob.getJobExecutionImpl().getBatchStatus() == BatchStatus.STARTED) {
+
+                            kernelService.stopJob(subJob.getJobExecutionImpl().getExecutionId());
+                        }
                     } catch (Exception e) {
                         // TODO - Is this what we want to know.
                         // Blow up if it happens to force the issue.

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/24097670/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java
----------------------------------------------------------------------
diff --git a/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java b/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java
new file mode 100644
index 0000000..2599b34
--- /dev/null
+++ b/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.batchee.test.partitioned;
+
+import org.apache.batchee.test.tck.lifecycle.ContainerLifecycle;
+import org.apache.batchee.util.Batches;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Listeners;
+import org.testng.annotations.Test;
+
+import javax.batch.api.BatchProperty;
+import javax.batch.api.chunk.AbstractItemReader;
+import javax.batch.api.chunk.AbstractItemWriter;
+import javax.batch.api.partition.PartitionMapper;
+import javax.batch.api.partition.PartitionPlan;
+import javax.batch.api.partition.PartitionPlanImpl;
+import javax.batch.operations.JobOperator;
+import javax.batch.runtime.BatchRuntime;
+import javax.batch.runtime.BatchStatus;
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+@Listeners(ContainerLifecycle.class)
+public class PartitionedBatchTest {
+
+    private static final Logger log = LoggerFactory.getLogger(PartitionedBatchTest.class);
+
+
+    @Test
+    public void testStopPartitionedBatch() throws Exception {
+
+        JobOperator jobOperator = BatchRuntime.getJobOperator();
+        long executionId = jobOperator.start("partition-stop", new Properties());
+
+        do {
+            log.info("Waiting til batch is started");
+            Thread.sleep(50);
+        }
+        while (jobOperator.getJobExecution(executionId).getBatchStatus() != BatchStatus.STARTED);
+
+        Thread.sleep(100);
+
+        jobOperator.stop(executionId);
+
+        BatchStatus status = Batches.waitFor(jobOperator, executionId);
+        Assert.assertEquals(status, BatchStatus.STOPPED);
+    }
+
+
+    public static class StopReader extends AbstractItemReader {
+
+        private static final int MAX_INVOCATIONS = 2;
+
+
+        @Inject
+        @BatchProperty
+        private Integer idx;
+
+        private int invocations;
+
+
+        @Override
+        public Object readItem() throws Exception {
+            if (invocations++ < MAX_INVOCATIONS) {
+
+                Thread.sleep(5);
+                return invocations;
+            }
+
+            log.info("{} invoked {} times", idx, invocations);
+            return null;
+        }
+    }
+
+    public static class StopWriter extends AbstractItemWriter {
+
+        private static final Map<Integer, List<Object>> STORAGE = new HashMap<Integer, List<Object>>(2);
+
+
+        @Inject
+        @BatchProperty
+        private Integer idx;
+
+        @Override
+        public void writeItems(List<Object> items) throws Exception {
+
+            List<Object> objects = STORAGE.get(idx);
+            if (objects == null) {
+                objects = new ArrayList<Object>();
+                STORAGE.put(idx, objects);
+            }
+
+            objects.addAll(items);
+        }
+    }
+
+    public static class StopMapper implements PartitionMapper {
+
+        private static final int NUMBER_OF_PARTITIONS = 50;
+        private static final int NUMBER_OF_THREADS = 5;
+
+        @Override
+        public PartitionPlan mapPartitions() throws Exception {
+
+            Properties[] props = new Properties[NUMBER_OF_PARTITIONS];
+            for (int i = 0; i < NUMBER_OF_PARTITIONS; i++) {
+                Properties properties = new Properties();
+                properties.setProperty("idx", String.valueOf(i + 1));
+
+                props[i] = properties;
+            }
+
+            PartitionPlanImpl plan = new PartitionPlanImpl();
+            plan.setPartitions(NUMBER_OF_PARTITIONS);
+            plan.setThreads(NUMBER_OF_THREADS);
+            plan.setPartitionProperties(props);
+
+            return plan;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/24097670/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml
----------------------------------------------------------------------
diff --git a/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml b/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml
new file mode 100644
index 0000000..341ab8c
--- /dev/null
+++ b/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  See the NOTICE file distributed with this work for additional information
+  regarding copyright ownership. Licensed under the Apache License,
+  Version 2.0 (the "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<job id="partition-stop" version="1.0" xmlns="http://xmlns.jcp.org/xml/ns/javaee">
+    <step id="the-step">
+        <chunk item-count="10">
+            <reader ref="org.apache.batchee.test.partitioned.PartitionedBatchTest$StopReader">
+                <properties>
+                    <property name="idx" value="#{partitionPlan['idx']}" />
+                </properties>
+            </reader>
+            <writer ref="org.apache.batchee.test.partitioned.PartitionedBatchTest$StopWriter" />
+        </chunk>
+        <partition>
+            <mapper ref="org.apache.batchee.test.partitioned.PartitionedBatchTest$StopMapper" />
+        </partition>
+    </step>
+</job>


Re: incubator-batchee git commit: BATCHEE-95 stop subJobs only if they are running

Posted by re...@gmail.com.
Ups ;-) 

Will fix it this evening! So many files and i pick the wrong! Thx

Lg reini

> Am 15.02.2016 um 16:13 schrieb Romain Manni-Bucau <rm...@gmail.com>:
> 
> guess it is a copy paste but we don't need IBM in the header for our own
> code ;)
> 
> Romain Manni-Bucau
> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
> <http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
> LinkedIn <https://www.linkedin.com/in/rmannibucau> | Tomitriber
> <http://www.tomitribe.com>
> 
> ---------- Forwarded message ----------
> From: <rs...@apache.org>
> Date: 2016-02-15 16:00 GMT+01:00
> Subject: incubator-batchee git commit: BATCHEE-95 stop subJobs only if they
> are running
> To: commits@batchee.incubator.apache.org
> 
> 
> Repository: incubator-batchee
> Updated Branches:
>  refs/heads/master 1044c365b -> 24097670c
> 
> 
> BATCHEE-95 stop subJobs only if they are running
> 
> 
> Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo
> Commit:
> http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/24097670
> Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/24097670
> Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/24097670
> 
> Branch: refs/heads/master
> Commit: 24097670c0723dc010c8c5a6813aa0f1edbfedc0
> Parents: 1044c36
> Author: Reinhard Sandtner <rs...@apache.org>
> Authored: Mon Feb 15 15:59:12 2016 +0100
> Committer: Reinhard Sandtner <rs...@apache.org>
> Committed: Mon Feb 15 15:59:12 2016 +0100
> 
> ----------------------------------------------------------------------
> .../controller/PartitionedStepController.java   |   8 +-
> .../test/partitioned/PartitionedBatchTest.java  | 141 +++++++++++++++++++
> .../META-INF/batch-jobs/partition-stop.xml      |  30 ++++
> 3 files changed, 178 insertions(+), 1 deletion(-)
> ----------------------------------------------------------------------
> 
> 
> http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/24097670/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
> ----------------------------------------------------------------------
> diff --git
> a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
> b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
> index f98420e..e072a73 100755
> ---
> a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
> +++
> b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
> @@ -103,7 +103,13 @@ public class PartitionedStepController extends
> BaseStepController {
>             if (parallelBatchWorkUnits != null) {
>                 for (BatchWorkUnit subJob : parallelBatchWorkUnits) {
>                     try {
> -
> kernelService.stopJob(subJob.getJobExecutionImpl().getExecutionId());
> +
> +                        // only try to stop the sub-jobs if they are
> running
> +                        if (subJob.getJobExecutionImpl().getBatchStatus()
> == BatchStatus.STARTING ||
> +                            subJob.getJobExecutionImpl().getBatchStatus()
> == BatchStatus.STARTED) {
> +
> +
> kernelService.stopJob(subJob.getJobExecutionImpl().getExecutionId());
> +                        }
>                     } catch (Exception e) {
>                         // TODO - Is this what we want to know.
>                         // Blow up if it happens to force the issue.
> 
> http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/24097670/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java
> ----------------------------------------------------------------------
> diff --git
> a/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java
> b/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java
> new file mode 100644
> index 0000000..2599b34
> --- /dev/null
> +++
> b/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java
> @@ -0,0 +1,141 @@
> +/*
> + * Copyright 2012 International Business Machines Corp.
> + *
> + * See the NOTICE file distributed with this work for additional
> information
> + * regarding copyright ownership. Licensed under the Apache License,
> + * Version 2.0 (the "License"); you may not use this file except in
> compliance
> + * with the License. You may obtain a copy of the License at
> + *
> + *   http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> +*/
> +package org.apache.batchee.test.partitioned;
> +
> +import org.apache.batchee.test.tck.lifecycle.ContainerLifecycle;
> +import org.apache.batchee.util.Batches;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
> +import org.testng.Assert;
> +import org.testng.annotations.Listeners;
> +import org.testng.annotations.Test;
> +
> +import javax.batch.api.BatchProperty;
> +import javax.batch.api.chunk.AbstractItemReader;
> +import javax.batch.api.chunk.AbstractItemWriter;
> +import javax.batch.api.partition.PartitionMapper;
> +import javax.batch.api.partition.PartitionPlan;
> +import javax.batch.api.partition.PartitionPlanImpl;
> +import javax.batch.operations.JobOperator;
> +import javax.batch.runtime.BatchRuntime;
> +import javax.batch.runtime.BatchStatus;
> +import javax.inject.Inject;
> +import java.util.ArrayList;
> +import java.util.HashMap;
> +import java.util.List;
> +import java.util.Map;
> +import java.util.Properties;
> +
> +@Listeners(ContainerLifecycle.class)
> +public class PartitionedBatchTest {
> +
> +    private static final Logger log =
> LoggerFactory.getLogger(PartitionedBatchTest.class);
> +
> +
> +    @Test
> +    public void testStopPartitionedBatch() throws Exception {
> +
> +        JobOperator jobOperator = BatchRuntime.getJobOperator();
> +        long executionId = jobOperator.start("partition-stop", new
> Properties());
> +
> +        do {
> +            log.info("Waiting til batch is started");
> +            Thread.sleep(50);
> +        }
> +        while (jobOperator.getJobExecution(executionId).getBatchStatus()
> != BatchStatus.STARTED);
> +
> +        Thread.sleep(100);
> +
> +        jobOperator.stop(executionId);
> +
> +        BatchStatus status = Batches.waitFor(jobOperator, executionId);
> +        Assert.assertEquals(status, BatchStatus.STOPPED);
> +    }
> +
> +
> +    public static class StopReader extends AbstractItemReader {
> +
> +        private static final int MAX_INVOCATIONS = 2;
> +
> +
> +        @Inject
> +        @BatchProperty
> +        private Integer idx;
> +
> +        private int invocations;
> +
> +
> +        @Override
> +        public Object readItem() throws Exception {
> +            if (invocations++ < MAX_INVOCATIONS) {
> +
> +                Thread.sleep(5);
> +                return invocations;
> +            }
> +
> +            log.info("{} invoked {} times", idx, invocations);
> +            return null;
> +        }
> +    }
> +
> +    public static class StopWriter extends AbstractItemWriter {
> +
> +        private static final Map<Integer, List<Object>> STORAGE = new
> HashMap<Integer, List<Object>>(2);
> +
> +
> +        @Inject
> +        @BatchProperty
> +        private Integer idx;
> +
> +        @Override
> +        public void writeItems(List<Object> items) throws Exception {
> +
> +            List<Object> objects = STORAGE.get(idx);
> +            if (objects == null) {
> +                objects = new ArrayList<Object>();
> +                STORAGE.put(idx, objects);
> +            }
> +
> +            objects.addAll(items);
> +        }
> +    }
> +
> +    public static class StopMapper implements PartitionMapper {
> +
> +        private static final int NUMBER_OF_PARTITIONS = 50;
> +        private static final int NUMBER_OF_THREADS = 5;
> +
> +        @Override
> +        public PartitionPlan mapPartitions() throws Exception {
> +
> +            Properties[] props = new Properties[NUMBER_OF_PARTITIONS];
> +            for (int i = 0; i < NUMBER_OF_PARTITIONS; i++) {
> +                Properties properties = new Properties();
> +                properties.setProperty("idx", String.valueOf(i + 1));
> +
> +                props[i] = properties;
> +            }
> +
> +            PartitionPlanImpl plan = new PartitionPlanImpl();
> +            plan.setPartitions(NUMBER_OF_PARTITIONS);
> +            plan.setThreads(NUMBER_OF_THREADS);
> +            plan.setPartitionProperties(props);
> +
> +            return plan;
> +        }
> +    }
> +}
> 
> http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/24097670/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml
> ----------------------------------------------------------------------
> diff --git
> a/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml
> b/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml
> new file mode 100644
> index 0000000..341ab8c
> --- /dev/null
> +++ b/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml
> @@ -0,0 +1,30 @@
> +<?xml version="1.0" encoding="UTF-8"?>
> +<!--
> +  See the NOTICE file distributed with this work for additional information
> +  regarding copyright ownership. Licensed under the Apache License,
> +  Version 2.0 (the "License"); you may not use this file except in
> compliance
> +  with the License. You may obtain a copy of the License at
> +
> +    http://www.apache.org/licenses/LICENSE-2.0
> +
> +  Unless required by applicable law or agreed to in writing, software
> +  distributed under the License is distributed on an "AS IS" BASIS,
> +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> +  See the License for the specific language governing permissions and
> +  limitations under the License.
> +-->
> +<job id="partition-stop" version="1.0" xmlns="
> http://xmlns.jcp.org/xml/ns/javaee">
> +    <step id="the-step">
> +        <chunk item-count="10">
> +            <reader
> ref="org.apache.batchee.test.partitioned.PartitionedBatchTest$StopReader">
> +                <properties>
> +                    <property name="idx" value="#{partitionPlan['idx']}" />
> +                </properties>
> +            </reader>
> +            <writer
> ref="org.apache.batchee.test.partitioned.PartitionedBatchTest$StopWriter" />
> +        </chunk>
> +        <partition>
> +            <mapper
> ref="org.apache.batchee.test.partitioned.PartitionedBatchTest$StopMapper" />
> +        </partition>
> +    </step>
> +</job>

Fwd: incubator-batchee git commit: BATCHEE-95 stop subJobs only if they are running

Posted by Romain Manni-Bucau <rm...@gmail.com>.
guess it is a copy paste but we don't need IBM in the header for our own
code ;)

Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau> | Tomitriber
<http://www.tomitribe.com>

---------- Forwarded message ----------
From: <rs...@apache.org>
Date: 2016-02-15 16:00 GMT+01:00
Subject: incubator-batchee git commit: BATCHEE-95 stop subJobs only if they
are running
To: commits@batchee.incubator.apache.org


Repository: incubator-batchee
Updated Branches:
  refs/heads/master 1044c365b -> 24097670c


BATCHEE-95 stop subJobs only if they are running


Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo
Commit:
http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/24097670
Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/24097670
Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/24097670

Branch: refs/heads/master
Commit: 24097670c0723dc010c8c5a6813aa0f1edbfedc0
Parents: 1044c36
Author: Reinhard Sandtner <rs...@apache.org>
Authored: Mon Feb 15 15:59:12 2016 +0100
Committer: Reinhard Sandtner <rs...@apache.org>
Committed: Mon Feb 15 15:59:12 2016 +0100

----------------------------------------------------------------------
 .../controller/PartitionedStepController.java   |   8 +-
 .../test/partitioned/PartitionedBatchTest.java  | 141 +++++++++++++++++++
 .../META-INF/batch-jobs/partition-stop.xml      |  30 ++++
 3 files changed, 178 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/24097670/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
----------------------------------------------------------------------
diff --git
a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
index f98420e..e072a73 100755
---
a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
+++
b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
@@ -103,7 +103,13 @@ public class PartitionedStepController extends
BaseStepController {
             if (parallelBatchWorkUnits != null) {
                 for (BatchWorkUnit subJob : parallelBatchWorkUnits) {
                     try {
-
kernelService.stopJob(subJob.getJobExecutionImpl().getExecutionId());
+
+                        // only try to stop the sub-jobs if they are
running
+                        if (subJob.getJobExecutionImpl().getBatchStatus()
== BatchStatus.STARTING ||
+                            subJob.getJobExecutionImpl().getBatchStatus()
== BatchStatus.STARTED) {
+
+
kernelService.stopJob(subJob.getJobExecutionImpl().getExecutionId());
+                        }
                     } catch (Exception e) {
                         // TODO - Is this what we want to know.
                         // Blow up if it happens to force the issue.

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/24097670/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java
----------------------------------------------------------------------
diff --git
a/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java
b/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java
new file mode 100644
index 0000000..2599b34
--- /dev/null
+++
b/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional
information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in
compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.batchee.test.partitioned;
+
+import org.apache.batchee.test.tck.lifecycle.ContainerLifecycle;
+import org.apache.batchee.util.Batches;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Listeners;
+import org.testng.annotations.Test;
+
+import javax.batch.api.BatchProperty;
+import javax.batch.api.chunk.AbstractItemReader;
+import javax.batch.api.chunk.AbstractItemWriter;
+import javax.batch.api.partition.PartitionMapper;
+import javax.batch.api.partition.PartitionPlan;
+import javax.batch.api.partition.PartitionPlanImpl;
+import javax.batch.operations.JobOperator;
+import javax.batch.runtime.BatchRuntime;
+import javax.batch.runtime.BatchStatus;
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+@Listeners(ContainerLifecycle.class)
+public class PartitionedBatchTest {
+
+    private static final Logger log =
LoggerFactory.getLogger(PartitionedBatchTest.class);
+
+
+    @Test
+    public void testStopPartitionedBatch() throws Exception {
+
+        JobOperator jobOperator = BatchRuntime.getJobOperator();
+        long executionId = jobOperator.start("partition-stop", new
Properties());
+
+        do {
+            log.info("Waiting til batch is started");
+            Thread.sleep(50);
+        }
+        while (jobOperator.getJobExecution(executionId).getBatchStatus()
!= BatchStatus.STARTED);
+
+        Thread.sleep(100);
+
+        jobOperator.stop(executionId);
+
+        BatchStatus status = Batches.waitFor(jobOperator, executionId);
+        Assert.assertEquals(status, BatchStatus.STOPPED);
+    }
+
+
+    public static class StopReader extends AbstractItemReader {
+
+        private static final int MAX_INVOCATIONS = 2;
+
+
+        @Inject
+        @BatchProperty
+        private Integer idx;
+
+        private int invocations;
+
+
+        @Override
+        public Object readItem() throws Exception {
+            if (invocations++ < MAX_INVOCATIONS) {
+
+                Thread.sleep(5);
+                return invocations;
+            }
+
+            log.info("{} invoked {} times", idx, invocations);
+            return null;
+        }
+    }
+
+    public static class StopWriter extends AbstractItemWriter {
+
+        private static final Map<Integer, List<Object>> STORAGE = new
HashMap<Integer, List<Object>>(2);
+
+
+        @Inject
+        @BatchProperty
+        private Integer idx;
+
+        @Override
+        public void writeItems(List<Object> items) throws Exception {
+
+            List<Object> objects = STORAGE.get(idx);
+            if (objects == null) {
+                objects = new ArrayList<Object>();
+                STORAGE.put(idx, objects);
+            }
+
+            objects.addAll(items);
+        }
+    }
+
+    public static class StopMapper implements PartitionMapper {
+
+        private static final int NUMBER_OF_PARTITIONS = 50;
+        private static final int NUMBER_OF_THREADS = 5;
+
+        @Override
+        public PartitionPlan mapPartitions() throws Exception {
+
+            Properties[] props = new Properties[NUMBER_OF_PARTITIONS];
+            for (int i = 0; i < NUMBER_OF_PARTITIONS; i++) {
+                Properties properties = new Properties();
+                properties.setProperty("idx", String.valueOf(i + 1));
+
+                props[i] = properties;
+            }
+
+            PartitionPlanImpl plan = new PartitionPlanImpl();
+            plan.setPartitions(NUMBER_OF_PARTITIONS);
+            plan.setThreads(NUMBER_OF_THREADS);
+            plan.setPartitionProperties(props);
+
+            return plan;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/24097670/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml
----------------------------------------------------------------------
diff --git
a/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml
b/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml
new file mode 100644
index 0000000..341ab8c
--- /dev/null
+++ b/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  See the NOTICE file distributed with this work for additional information
+  regarding copyright ownership. Licensed under the Apache License,
+  Version 2.0 (the "License"); you may not use this file except in
compliance
+  with the License. You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<job id="partition-stop" version="1.0" xmlns="
http://xmlns.jcp.org/xml/ns/javaee">
+    <step id="the-step">
+        <chunk item-count="10">
+            <reader
ref="org.apache.batchee.test.partitioned.PartitionedBatchTest$StopReader">
+                <properties>
+                    <property name="idx" value="#{partitionPlan['idx']}" />
+                </properties>
+            </reader>
+            <writer
ref="org.apache.batchee.test.partitioned.PartitionedBatchTest$StopWriter" />
+        </chunk>
+        <partition>
+            <mapper
ref="org.apache.batchee.test.partitioned.PartitionedBatchTest$StopMapper" />
+        </partition>
+    </step>
+</job>