You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rs...@apache.org on 2016/02/15 16:00:50 UTC
incubator-batchee git commit: BATCHEE-95 stop subJobs only if they
are running
Repository: incubator-batchee
Updated Branches:
refs/heads/master 1044c365b -> 24097670c
BATCHEE-95 stop subJobs only if they are running
Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/24097670
Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/24097670
Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/24097670
Branch: refs/heads/master
Commit: 24097670c0723dc010c8c5a6813aa0f1edbfedc0
Parents: 1044c36
Author: Reinhard Sandtner <rs...@apache.org>
Authored: Mon Feb 15 15:59:12 2016 +0100
Committer: Reinhard Sandtner <rs...@apache.org>
Committed: Mon Feb 15 15:59:12 2016 +0100
----------------------------------------------------------------------
.../controller/PartitionedStepController.java | 8 +-
.../test/partitioned/PartitionedBatchTest.java | 141 +++++++++++++++++++
.../META-INF/batch-jobs/partition-stop.xml | 30 ++++
3 files changed, 178 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/24097670/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
index f98420e..e072a73 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
@@ -103,7 +103,13 @@ public class PartitionedStepController extends BaseStepController {
if (parallelBatchWorkUnits != null) {
for (BatchWorkUnit subJob : parallelBatchWorkUnits) {
try {
- kernelService.stopJob(subJob.getJobExecutionImpl().getExecutionId());
+
+ // only try to stop the sub-jobs if they are running
+ if (subJob.getJobExecutionImpl().getBatchStatus() == BatchStatus.STARTING ||
+ subJob.getJobExecutionImpl().getBatchStatus() == BatchStatus.STARTED) {
+
+ kernelService.stopJob(subJob.getJobExecutionImpl().getExecutionId());
+ }
} catch (Exception e) {
// TODO - Is this what we want to know.
// Blow up if it happens to force the issue.
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/24097670/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java
----------------------------------------------------------------------
diff --git a/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java b/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java
new file mode 100644
index 0000000..2599b34
--- /dev/null
+++ b/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.batchee.test.partitioned;
+
+import org.apache.batchee.test.tck.lifecycle.ContainerLifecycle;
+import org.apache.batchee.util.Batches;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Listeners;
+import org.testng.annotations.Test;
+
+import javax.batch.api.BatchProperty;
+import javax.batch.api.chunk.AbstractItemReader;
+import javax.batch.api.chunk.AbstractItemWriter;
+import javax.batch.api.partition.PartitionMapper;
+import javax.batch.api.partition.PartitionPlan;
+import javax.batch.api.partition.PartitionPlanImpl;
+import javax.batch.operations.JobOperator;
+import javax.batch.runtime.BatchRuntime;
+import javax.batch.runtime.BatchStatus;
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+@Listeners(ContainerLifecycle.class)
+public class PartitionedBatchTest {
+
+ private static final Logger log = LoggerFactory.getLogger(PartitionedBatchTest.class);
+
+
+ @Test
+ public void testStopPartitionedBatch() throws Exception {
+
+ JobOperator jobOperator = BatchRuntime.getJobOperator();
+ long executionId = jobOperator.start("partition-stop", new Properties());
+
+ do {
+ log.info("Waiting til batch is started");
+ Thread.sleep(50);
+ }
+ while (jobOperator.getJobExecution(executionId).getBatchStatus() != BatchStatus.STARTED);
+
+ Thread.sleep(100);
+
+ jobOperator.stop(executionId);
+
+ BatchStatus status = Batches.waitFor(jobOperator, executionId);
+ Assert.assertEquals(status, BatchStatus.STOPPED);
+ }
+
+
+ public static class StopReader extends AbstractItemReader {
+
+ private static final int MAX_INVOCATIONS = 2;
+
+
+ @Inject
+ @BatchProperty
+ private Integer idx;
+
+ private int invocations;
+
+
+ @Override
+ public Object readItem() throws Exception {
+ if (invocations++ < MAX_INVOCATIONS) {
+
+ Thread.sleep(5);
+ return invocations;
+ }
+
+ log.info("{} invoked {} times", idx, invocations);
+ return null;
+ }
+ }
+
+ public static class StopWriter extends AbstractItemWriter {
+
+ private static final Map<Integer, List<Object>> STORAGE = new HashMap<Integer, List<Object>>(2);
+
+
+ @Inject
+ @BatchProperty
+ private Integer idx;
+
+ @Override
+ public void writeItems(List<Object> items) throws Exception {
+
+ List<Object> objects = STORAGE.get(idx);
+ if (objects == null) {
+ objects = new ArrayList<Object>();
+ STORAGE.put(idx, objects);
+ }
+
+ objects.addAll(items);
+ }
+ }
+
+ public static class StopMapper implements PartitionMapper {
+
+ private static final int NUMBER_OF_PARTITIONS = 50;
+ private static final int NUMBER_OF_THREADS = 5;
+
+ @Override
+ public PartitionPlan mapPartitions() throws Exception {
+
+ Properties[] props = new Properties[NUMBER_OF_PARTITIONS];
+ for (int i = 0; i < NUMBER_OF_PARTITIONS; i++) {
+ Properties properties = new Properties();
+ properties.setProperty("idx", String.valueOf(i + 1));
+
+ props[i] = properties;
+ }
+
+ PartitionPlanImpl plan = new PartitionPlanImpl();
+ plan.setPartitions(NUMBER_OF_PARTITIONS);
+ plan.setThreads(NUMBER_OF_THREADS);
+ plan.setPartitionProperties(props);
+
+ return plan;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/24097670/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml
----------------------------------------------------------------------
diff --git a/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml b/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml
new file mode 100644
index 0000000..341ab8c
--- /dev/null
+++ b/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ See the NOTICE file distributed with this work for additional information
+ regarding copyright ownership. Licensed under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<job id="partition-stop" version="1.0" xmlns="http://xmlns.jcp.org/xml/ns/javaee">
+ <step id="the-step">
+ <chunk item-count="10">
+ <reader ref="org.apache.batchee.test.partitioned.PartitionedBatchTest$StopReader">
+ <properties>
+ <property name="idx" value="#{partitionPlan['idx']}" />
+ </properties>
+ </reader>
+ <writer ref="org.apache.batchee.test.partitioned.PartitionedBatchTest$StopWriter" />
+ </chunk>
+ <partition>
+ <mapper ref="org.apache.batchee.test.partitioned.PartitionedBatchTest$StopMapper" />
+ </partition>
+ </step>
+</job>
Re: incubator-batchee git commit: BATCHEE-95 stop subJobs only if they are running
Posted by re...@gmail.com.
Ups ;-)
Will fix it this evening! So many files and i pick the wrong! Thx
Lg reini
> Am 15.02.2016 um 16:13 schrieb Romain Manni-Bucau <rm...@gmail.com>:
>
> guess it is a copy paste but we don't need IBM in the header for our own
> code ;)
>
> Romain Manni-Bucau
> @rmannibucau <https://twitter.com/rmannibucau> | Blog
> <http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
> LinkedIn <https://www.linkedin.com/in/rmannibucau> | Tomitriber
> <http://www.tomitribe.com>
>
> ---------- Forwarded message ----------
> From: <rs...@apache.org>
> Date: 2016-02-15 16:00 GMT+01:00
> Subject: incubator-batchee git commit: BATCHEE-95 stop subJobs only if they
> are running
> To: commits@batchee.incubator.apache.org
>
>
> Repository: incubator-batchee
> Updated Branches:
> refs/heads/master 1044c365b -> 24097670c
>
>
> BATCHEE-95 stop subJobs only if they are running
>
>
> Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo
> Commit:
> http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/24097670
> Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/24097670
> Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/24097670
>
> Branch: refs/heads/master
> Commit: 24097670c0723dc010c8c5a6813aa0f1edbfedc0
> Parents: 1044c36
> Author: Reinhard Sandtner <rs...@apache.org>
> Authored: Mon Feb 15 15:59:12 2016 +0100
> Committer: Reinhard Sandtner <rs...@apache.org>
> Committed: Mon Feb 15 15:59:12 2016 +0100
>
> ----------------------------------------------------------------------
> .../controller/PartitionedStepController.java | 8 +-
> .../test/partitioned/PartitionedBatchTest.java | 141 +++++++++++++++++++
> .../META-INF/batch-jobs/partition-stop.xml | 30 ++++
> 3 files changed, 178 insertions(+), 1 deletion(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/24097670/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
> ----------------------------------------------------------------------
> diff --git
> a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
> b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
> index f98420e..e072a73 100755
> ---
> a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
> +++
> b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
> @@ -103,7 +103,13 @@ public class PartitionedStepController extends
> BaseStepController {
> if (parallelBatchWorkUnits != null) {
> for (BatchWorkUnit subJob : parallelBatchWorkUnits) {
> try {
> -
> kernelService.stopJob(subJob.getJobExecutionImpl().getExecutionId());
> +
> + // only try to stop the sub-jobs if they are
> running
> + if (subJob.getJobExecutionImpl().getBatchStatus()
> == BatchStatus.STARTING ||
> + subJob.getJobExecutionImpl().getBatchStatus()
> == BatchStatus.STARTED) {
> +
> +
> kernelService.stopJob(subJob.getJobExecutionImpl().getExecutionId());
> + }
> } catch (Exception e) {
> // TODO - Is this what we want to know.
> // Blow up if it happens to force the issue.
>
> http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/24097670/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java
> ----------------------------------------------------------------------
> diff --git
> a/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java
> b/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java
> new file mode 100644
> index 0000000..2599b34
> --- /dev/null
> +++
> b/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java
> @@ -0,0 +1,141 @@
> +/*
> + * Copyright 2012 International Business Machines Corp.
> + *
> + * See the NOTICE file distributed with this work for additional
> information
> + * regarding copyright ownership. Licensed under the Apache License,
> + * Version 2.0 (the "License"); you may not use this file except in
> compliance
> + * with the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> +*/
> +package org.apache.batchee.test.partitioned;
> +
> +import org.apache.batchee.test.tck.lifecycle.ContainerLifecycle;
> +import org.apache.batchee.util.Batches;
> +import org.slf4j.Logger;
> +import org.slf4j.LoggerFactory;
> +import org.testng.Assert;
> +import org.testng.annotations.Listeners;
> +import org.testng.annotations.Test;
> +
> +import javax.batch.api.BatchProperty;
> +import javax.batch.api.chunk.AbstractItemReader;
> +import javax.batch.api.chunk.AbstractItemWriter;
> +import javax.batch.api.partition.PartitionMapper;
> +import javax.batch.api.partition.PartitionPlan;
> +import javax.batch.api.partition.PartitionPlanImpl;
> +import javax.batch.operations.JobOperator;
> +import javax.batch.runtime.BatchRuntime;
> +import javax.batch.runtime.BatchStatus;
> +import javax.inject.Inject;
> +import java.util.ArrayList;
> +import java.util.HashMap;
> +import java.util.List;
> +import java.util.Map;
> +import java.util.Properties;
> +
> +@Listeners(ContainerLifecycle.class)
> +public class PartitionedBatchTest {
> +
> + private static final Logger log =
> LoggerFactory.getLogger(PartitionedBatchTest.class);
> +
> +
> + @Test
> + public void testStopPartitionedBatch() throws Exception {
> +
> + JobOperator jobOperator = BatchRuntime.getJobOperator();
> + long executionId = jobOperator.start("partition-stop", new
> Properties());
> +
> + do {
> + log.info("Waiting til batch is started");
> + Thread.sleep(50);
> + }
> + while (jobOperator.getJobExecution(executionId).getBatchStatus()
> != BatchStatus.STARTED);
> +
> + Thread.sleep(100);
> +
> + jobOperator.stop(executionId);
> +
> + BatchStatus status = Batches.waitFor(jobOperator, executionId);
> + Assert.assertEquals(status, BatchStatus.STOPPED);
> + }
> +
> +
> + public static class StopReader extends AbstractItemReader {
> +
> + private static final int MAX_INVOCATIONS = 2;
> +
> +
> + @Inject
> + @BatchProperty
> + private Integer idx;
> +
> + private int invocations;
> +
> +
> + @Override
> + public Object readItem() throws Exception {
> + if (invocations++ < MAX_INVOCATIONS) {
> +
> + Thread.sleep(5);
> + return invocations;
> + }
> +
> + log.info("{} invoked {} times", idx, invocations);
> + return null;
> + }
> + }
> +
> + public static class StopWriter extends AbstractItemWriter {
> +
> + private static final Map<Integer, List<Object>> STORAGE = new
> HashMap<Integer, List<Object>>(2);
> +
> +
> + @Inject
> + @BatchProperty
> + private Integer idx;
> +
> + @Override
> + public void writeItems(List<Object> items) throws Exception {
> +
> + List<Object> objects = STORAGE.get(idx);
> + if (objects == null) {
> + objects = new ArrayList<Object>();
> + STORAGE.put(idx, objects);
> + }
> +
> + objects.addAll(items);
> + }
> + }
> +
> + public static class StopMapper implements PartitionMapper {
> +
> + private static final int NUMBER_OF_PARTITIONS = 50;
> + private static final int NUMBER_OF_THREADS = 5;
> +
> + @Override
> + public PartitionPlan mapPartitions() throws Exception {
> +
> + Properties[] props = new Properties[NUMBER_OF_PARTITIONS];
> + for (int i = 0; i < NUMBER_OF_PARTITIONS; i++) {
> + Properties properties = new Properties();
> + properties.setProperty("idx", String.valueOf(i + 1));
> +
> + props[i] = properties;
> + }
> +
> + PartitionPlanImpl plan = new PartitionPlanImpl();
> + plan.setPartitions(NUMBER_OF_PARTITIONS);
> + plan.setThreads(NUMBER_OF_THREADS);
> + plan.setPartitionProperties(props);
> +
> + return plan;
> + }
> + }
> +}
>
> http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/24097670/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml
> ----------------------------------------------------------------------
> diff --git
> a/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml
> b/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml
> new file mode 100644
> index 0000000..341ab8c
> --- /dev/null
> +++ b/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml
> @@ -0,0 +1,30 @@
> +<?xml version="1.0" encoding="UTF-8"?>
> +<!--
> + See the NOTICE file distributed with this work for additional information
> + regarding copyright ownership. Licensed under the Apache License,
> + Version 2.0 (the "License"); you may not use this file except in
> compliance
> + with the License. You may obtain a copy of the License at
> +
> + http://www.apache.org/licenses/LICENSE-2.0
> +
> + Unless required by applicable law or agreed to in writing, software
> + distributed under the License is distributed on an "AS IS" BASIS,
> + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + See the License for the specific language governing permissions and
> + limitations under the License.
> +-->
> +<job id="partition-stop" version="1.0" xmlns="
> http://xmlns.jcp.org/xml/ns/javaee">
> + <step id="the-step">
> + <chunk item-count="10">
> + <reader
> ref="org.apache.batchee.test.partitioned.PartitionedBatchTest$StopReader">
> + <properties>
> + <property name="idx" value="#{partitionPlan['idx']}" />
> + </properties>
> + </reader>
> + <writer
> ref="org.apache.batchee.test.partitioned.PartitionedBatchTest$StopWriter" />
> + </chunk>
> + <partition>
> + <mapper
> ref="org.apache.batchee.test.partitioned.PartitionedBatchTest$StopMapper" />
> + </partition>
> + </step>
> +</job>
Fwd: incubator-batchee git commit: BATCHEE-95 stop subJobs only if
they are running
Posted by Romain Manni-Bucau <rm...@gmail.com>.
guess it is a copy paste but we don't need IBM in the header for our own
code ;)
Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> | Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau> | Tomitriber
<http://www.tomitribe.com>
---------- Forwarded message ----------
From: <rs...@apache.org>
Date: 2016-02-15 16:00 GMT+01:00
Subject: incubator-batchee git commit: BATCHEE-95 stop subJobs only if they
are running
To: commits@batchee.incubator.apache.org
Repository: incubator-batchee
Updated Branches:
refs/heads/master 1044c365b -> 24097670c
BATCHEE-95 stop subJobs only if they are running
Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo
Commit:
http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/24097670
Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/24097670
Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/24097670
Branch: refs/heads/master
Commit: 24097670c0723dc010c8c5a6813aa0f1edbfedc0
Parents: 1044c36
Author: Reinhard Sandtner <rs...@apache.org>
Authored: Mon Feb 15 15:59:12 2016 +0100
Committer: Reinhard Sandtner <rs...@apache.org>
Committed: Mon Feb 15 15:59:12 2016 +0100
----------------------------------------------------------------------
.../controller/PartitionedStepController.java | 8 +-
.../test/partitioned/PartitionedBatchTest.java | 141 +++++++++++++++++++
.../META-INF/batch-jobs/partition-stop.xml | 30 ++++
3 files changed, 178 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/24097670/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
----------------------------------------------------------------------
diff --git
a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
index f98420e..e072a73 100755
---
a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
+++
b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/PartitionedStepController.java
@@ -103,7 +103,13 @@ public class PartitionedStepController extends
BaseStepController {
if (parallelBatchWorkUnits != null) {
for (BatchWorkUnit subJob : parallelBatchWorkUnits) {
try {
-
kernelService.stopJob(subJob.getJobExecutionImpl().getExecutionId());
+
+ // only try to stop the sub-jobs if they are
running
+ if (subJob.getJobExecutionImpl().getBatchStatus()
== BatchStatus.STARTING ||
+ subJob.getJobExecutionImpl().getBatchStatus()
== BatchStatus.STARTED) {
+
+
kernelService.stopJob(subJob.getJobExecutionImpl().getExecutionId());
+ }
} catch (Exception e) {
// TODO - Is this what we want to know.
// Blow up if it happens to force the issue.
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/24097670/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java
----------------------------------------------------------------------
diff --git
a/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java
b/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java
new file mode 100644
index 0000000..2599b34
--- /dev/null
+++
b/jbatch/src/test/java/org/apache/batchee/test/partitioned/PartitionedBatchTest.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional
information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in
compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.batchee.test.partitioned;
+
+import org.apache.batchee.test.tck.lifecycle.ContainerLifecycle;
+import org.apache.batchee.util.Batches;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Listeners;
+import org.testng.annotations.Test;
+
+import javax.batch.api.BatchProperty;
+import javax.batch.api.chunk.AbstractItemReader;
+import javax.batch.api.chunk.AbstractItemWriter;
+import javax.batch.api.partition.PartitionMapper;
+import javax.batch.api.partition.PartitionPlan;
+import javax.batch.api.partition.PartitionPlanImpl;
+import javax.batch.operations.JobOperator;
+import javax.batch.runtime.BatchRuntime;
+import javax.batch.runtime.BatchStatus;
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+@Listeners(ContainerLifecycle.class)
+public class PartitionedBatchTest {
+
+ private static final Logger log =
LoggerFactory.getLogger(PartitionedBatchTest.class);
+
+
+ @Test
+ public void testStopPartitionedBatch() throws Exception {
+
+ JobOperator jobOperator = BatchRuntime.getJobOperator();
+ long executionId = jobOperator.start("partition-stop", new
Properties());
+
+ do {
+ log.info("Waiting til batch is started");
+ Thread.sleep(50);
+ }
+ while (jobOperator.getJobExecution(executionId).getBatchStatus()
!= BatchStatus.STARTED);
+
+ Thread.sleep(100);
+
+ jobOperator.stop(executionId);
+
+ BatchStatus status = Batches.waitFor(jobOperator, executionId);
+ Assert.assertEquals(status, BatchStatus.STOPPED);
+ }
+
+
+ public static class StopReader extends AbstractItemReader {
+
+ private static final int MAX_INVOCATIONS = 2;
+
+
+ @Inject
+ @BatchProperty
+ private Integer idx;
+
+ private int invocations;
+
+
+ @Override
+ public Object readItem() throws Exception {
+ if (invocations++ < MAX_INVOCATIONS) {
+
+ Thread.sleep(5);
+ return invocations;
+ }
+
+ log.info("{} invoked {} times", idx, invocations);
+ return null;
+ }
+ }
+
+ public static class StopWriter extends AbstractItemWriter {
+
+ private static final Map<Integer, List<Object>> STORAGE = new
HashMap<Integer, List<Object>>(2);
+
+
+ @Inject
+ @BatchProperty
+ private Integer idx;
+
+ @Override
+ public void writeItems(List<Object> items) throws Exception {
+
+ List<Object> objects = STORAGE.get(idx);
+ if (objects == null) {
+ objects = new ArrayList<Object>();
+ STORAGE.put(idx, objects);
+ }
+
+ objects.addAll(items);
+ }
+ }
+
+ public static class StopMapper implements PartitionMapper {
+
+ private static final int NUMBER_OF_PARTITIONS = 50;
+ private static final int NUMBER_OF_THREADS = 5;
+
+ @Override
+ public PartitionPlan mapPartitions() throws Exception {
+
+ Properties[] props = new Properties[NUMBER_OF_PARTITIONS];
+ for (int i = 0; i < NUMBER_OF_PARTITIONS; i++) {
+ Properties properties = new Properties();
+ properties.setProperty("idx", String.valueOf(i + 1));
+
+ props[i] = properties;
+ }
+
+ PartitionPlanImpl plan = new PartitionPlanImpl();
+ plan.setPartitions(NUMBER_OF_PARTITIONS);
+ plan.setThreads(NUMBER_OF_THREADS);
+ plan.setPartitionProperties(props);
+
+ return plan;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/24097670/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml
----------------------------------------------------------------------
diff --git
a/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml
b/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml
new file mode 100644
index 0000000..341ab8c
--- /dev/null
+++ b/jbatch/src/test/resources/META-INF/batch-jobs/partition-stop.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ See the NOTICE file distributed with this work for additional information
+ regarding copyright ownership. Licensed under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in
compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<job id="partition-stop" version="1.0" xmlns="
http://xmlns.jcp.org/xml/ns/javaee">
+ <step id="the-step">
+ <chunk item-count="10">
+ <reader
ref="org.apache.batchee.test.partitioned.PartitionedBatchTest$StopReader">
+ <properties>
+ <property name="idx" value="#{partitionPlan['idx']}" />
+ </properties>
+ </reader>
+ <writer
ref="org.apache.batchee.test.partitioned.PartitionedBatchTest$StopWriter" />
+ </chunk>
+ <partition>
+ <mapper
ref="org.apache.batchee.test.partitioned.PartitionedBatchTest$StopMapper" />
+ </partition>
+ </step>
+</job>