You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rs...@apache.org on 2016/12/20 23:11:58 UTC

incubator-batchee git commit: BATCHEE-113 added test for improved scope handling

Repository: incubator-batchee
Updated Branches:
  refs/heads/master ce60c30e8 -> 1a6634f10


BATCHEE-113 added test for improved scope handling


Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/1a6634f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/1a6634f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/1a6634f1

Branch: refs/heads/master
Commit: 1a6634f10952f2aee626741c327410878f6ffbe5
Parents: ce60c30
Author: Reinhard Sandtner <rs...@apache.org>
Authored: Tue Dec 20 21:37:37 2016 +0100
Committer: Reinhard Sandtner <rs...@apache.org>
Committed: Tue Dec 20 23:56:00 2016 +0100

----------------------------------------------------------------------
 .../org/apache/batchee/cdi/BatchScopesTest.java | 41 +++++++++-
 .../batchee/cdi/component/JobScopedBean.java    |  4 +
 .../partitioned/PartitionedJobScopedReader.java | 86 ++++++++++++++++++++
 .../partitioned/PartitionedJobScopedWriter.java | 46 +++++++++++
 .../batch-jobs/partitioned-job-scoped.xml       | 51 ++++++++++++
 5 files changed, 225 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/1a6634f1/extensions/cdi/src/test/java/org/apache/batchee/cdi/BatchScopesTest.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/test/java/org/apache/batchee/cdi/BatchScopesTest.java b/extensions/cdi/src/test/java/org/apache/batchee/cdi/BatchScopesTest.java
index 38c9cde..380c111 100644
--- a/extensions/cdi/src/test/java/org/apache/batchee/cdi/BatchScopesTest.java
+++ b/extensions/cdi/src/test/java/org/apache/batchee/cdi/BatchScopesTest.java
@@ -19,20 +19,29 @@ package org.apache.batchee.cdi;
 import org.apache.batchee.cdi.component.Holder;
 import org.apache.batchee.cdi.component.JobScopedBean;
 import org.apache.batchee.cdi.component.StepScopedBean;
+import org.apache.batchee.cdi.partitioned.PartitionedJobScopedReader;
 import org.apache.batchee.cdi.testng.CdiContainerLifecycle;
 import org.apache.batchee.util.Batches;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Listeners;
 import org.testng.annotations.Test;
 
 import javax.batch.operations.JobOperator;
 import javax.batch.runtime.BatchRuntime;
+import javax.batch.runtime.BatchStatus;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotSame;
-import static org.testng.Assert.assertTrue;
+import java.util.Properties;
+
+import static org.testng.Assert.*;
 
 @Listeners(CdiContainerLifecycle.class)
 public class BatchScopesTest {
+
+    @BeforeMethod
+    public void resetJobScopedBean() {
+        JobScopedBean.reset();
+    }
+
     @Test
     public void test() {
         final JobOperator jobOperator = BatchRuntime.getJobOperator();
@@ -47,4 +56,30 @@ public class BatchScopesTest {
         assertTrue(JobScopedBean.isDestroyed());
         assertTrue(StepScopedBean.isDestroyed());
     }
+
+    @Test
+    public void testPartitionedJobScoped() throws Exception {
+
+        JobOperator jobOperator = BatchRuntime.getJobOperator();
+
+        long executionId = jobOperator.start("partitioned-job-scoped", new Properties());
+
+        Thread.sleep(100);
+
+        assertEquals(PartitionedJobScopedReader.currentBeanId(), PartitionedJobScopedReader.originalBeanId());
+
+        PartitionedJobScopedReader.stop();
+
+        Thread.sleep(100);
+
+        assertEquals(PartitionedJobScopedReader.currentBeanId(), PartitionedJobScopedReader.originalBeanId());
+        assertFalse(JobScopedBean.isDestroyed(), "JobScopedBean must not be destroyed -> partition 2 is still running :(");
+
+        PartitionedJobScopedReader.stopPartition2();
+
+        assertEquals(Batches.waitFor(executionId), BatchStatus.COMPLETED);
+
+        assertEquals(PartitionedJobScopedReader.currentBeanId(), PartitionedJobScopedReader.originalBeanId());
+        assertTrue(JobScopedBean.isDestroyed());
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/1a6634f1/extensions/cdi/src/test/java/org/apache/batchee/cdi/component/JobScopedBean.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/test/java/org/apache/batchee/cdi/component/JobScopedBean.java b/extensions/cdi/src/test/java/org/apache/batchee/cdi/component/JobScopedBean.java
index 7b66b35..902eabf 100644
--- a/extensions/cdi/src/test/java/org/apache/batchee/cdi/component/JobScopedBean.java
+++ b/extensions/cdi/src/test/java/org/apache/batchee/cdi/component/JobScopedBean.java
@@ -41,4 +41,8 @@ public class JobScopedBean {
     void destroy() {
         destroyed = true;
     }
+
+    public static void reset() {
+        destroyed = false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/1a6634f1/extensions/cdi/src/test/java/org/apache/batchee/cdi/partitioned/PartitionedJobScopedReader.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/test/java/org/apache/batchee/cdi/partitioned/PartitionedJobScopedReader.java b/extensions/cdi/src/test/java/org/apache/batchee/cdi/partitioned/PartitionedJobScopedReader.java
new file mode 100644
index 0000000..3b9aedc
--- /dev/null
+++ b/extensions/cdi/src/test/java/org/apache/batchee/cdi/partitioned/PartitionedJobScopedReader.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.cdi.partitioned;
+
+import org.apache.batchee.cdi.component.JobScopedBean;
+
+import javax.batch.api.BatchProperty;
+import javax.batch.api.chunk.AbstractItemReader;
+import javax.enterprise.context.Dependent;
+import javax.inject.Inject;
+import javax.inject.Named;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@Named
+@Dependent
+public class PartitionedJobScopedReader extends AbstractItemReader {
+
+    private static final Logger LOG = Logger.getLogger(PartitionedJobScopedReader.class.getName());
+
+    private static volatile boolean stop;
+    private static volatile boolean stopPartition2;
+
+    private static long originalBeanId;
+    private static long currentBeanId;
+
+    private boolean firstRun = true;
+
+    @Inject
+    @BatchProperty
+    private Integer partition;
+
+    @Inject
+    private JobScopedBean jobScopedBean;
+
+
+    @Override
+    public Object readItem() throws Exception {
+
+        if (firstRun) {
+            originalBeanId = jobScopedBean.getId();
+        }
+
+        currentBeanId = jobScopedBean.getId();
+
+        if (partition == 2 && stopPartition2 || partition != 2 && stop) {
+            LOG.log(Level.INFO, "Finished partition "+  partition);
+            return null;
+        }
+
+        Thread.sleep(10);
+        return "Partition " + partition + ": JobScopedBean destroyed? " + JobScopedBean.isDestroyed();
+    }
+
+
+    public static void stop() {
+        LOG.log(Level.INFO, "Stopping all partitions except Partition 2");
+        stop = true;
+    }
+
+    public static void stopPartition2() {
+        stopPartition2 = true;
+    }
+
+    public static long originalBeanId() {
+        return originalBeanId;
+    }
+
+    public static long currentBeanId() {
+        return currentBeanId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/1a6634f1/extensions/cdi/src/test/java/org/apache/batchee/cdi/partitioned/PartitionedJobScopedWriter.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/test/java/org/apache/batchee/cdi/partitioned/PartitionedJobScopedWriter.java b/extensions/cdi/src/test/java/org/apache/batchee/cdi/partitioned/PartitionedJobScopedWriter.java
new file mode 100644
index 0000000..48c51aa
--- /dev/null
+++ b/extensions/cdi/src/test/java/org/apache/batchee/cdi/partitioned/PartitionedJobScopedWriter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.cdi.partitioned;
+
+import org.apache.batchee.cdi.component.JobScopedBean;
+
+import javax.batch.api.BatchProperty;
+import javax.batch.api.chunk.AbstractItemWriter;
+import javax.enterprise.context.Dependent;
+import javax.inject.Inject;
+import javax.inject.Named;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@Named
+@Dependent
+public class PartitionedJobScopedWriter extends AbstractItemWriter {
+
+    private static final Logger LOG = Logger.getLogger(PartitionedJobScopedWriter.class.getName());
+
+    @Inject
+    @BatchProperty
+    private Integer partition;
+
+
+    @Override
+    public void writeItems(List<Object> items) throws Exception {
+        LOG.log(Level.INFO, "Writing {0} items from partition {1}", new Object[]{items.size(), partition});
+        LOG.log(Level.INFO, "JobScopedBean destroyed? {0}", JobScopedBean.isDestroyed());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/1a6634f1/extensions/cdi/src/test/resources/META-INF/batch-jobs/partitioned-job-scoped.xml
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/test/resources/META-INF/batch-jobs/partitioned-job-scoped.xml b/extensions/cdi/src/test/resources/META-INF/batch-jobs/partitioned-job-scoped.xml
new file mode 100644
index 0000000..58f9fd5
--- /dev/null
+++ b/extensions/cdi/src/test/resources/META-INF/batch-jobs/partitioned-job-scoped.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  See the NOTICE file distributed with this work for additional information
+  regarding copyright ownership. Licensed under the Apache License,
+  Version 2.0 (the "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<job id="partitioned-job-scoped-job" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
+
+  <step id="partitioned-step">
+
+    <chunk item-count="100">
+      <reader ref="partitionedJobScopedReader">
+        <properties>
+          <property name="partition" value="#{partitionPlan['index']}" />
+        </properties>
+      </reader>
+      <writer ref="partitionedJobScopedWriter">
+        <properties>
+          <property name="partition" value="#{partitionPlan['index']}" />
+        </properties>
+      </writer>
+    </chunk>
+
+    <partition>
+
+      <plan partitions="2" threads="2">
+
+        <properties partition="0">
+          <property name="index" value="1" />
+        </properties>
+
+        <properties partition="1">
+          <property name="index" value="2" />
+        </properties>
+
+      </plan>
+
+    </partition>
+
+  </step>
+
+</job>