You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rm...@apache.org on 2014/03/02 18:34:26 UTC
git commit: BATCHEE-18 async processor
Repository: incubator-batchee
Updated Branches:
refs/heads/master 19114f5d3 -> 1e5e7dd82
BATCHEE-18 async processor
Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/1e5e7dd8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/1e5e7dd8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/1e5e7dd8
Branch: refs/heads/master
Commit: 1e5e7dd8228b6801faaa92714edc23f7dde677f8
Parents: 19114f5
Author: Romain Manni-Bucau <rm...@gmail.com>
Authored: Sun Mar 2 18:34:07 2014 +0100
Committer: Romain Manni-Bucau <rm...@gmail.com>
Committed: Sun Mar 2 18:34:07 2014 +0100
----------------------------------------------------------------------
.../extras/async/AsynchronousItemProcessor.java | 106 +++++++++++++++++++
.../src/main/resources/META-INF/batchee.xml | 1 +
.../batchee/extras/AsyncProcessorTest.java | 74 +++++++++++++
.../META-INF/batch-jobs/async-processor.xml | 29 +++++
4 files changed, 210 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/1e5e7dd8/extensions/extras/src/main/java/org/apache/batchee/extras/async/AsynchronousItemProcessor.java
----------------------------------------------------------------------
diff --git a/extensions/extras/src/main/java/org/apache/batchee/extras/async/AsynchronousItemProcessor.java b/extensions/extras/src/main/java/org/apache/batchee/extras/async/AsynchronousItemProcessor.java
new file mode 100644
index 0000000..aae2d87
--- /dev/null
+++ b/extensions/extras/src/main/java/org/apache/batchee/extras/async/AsynchronousItemProcessor.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.extras.async;
+
+import org.apache.batchee.extras.locator.BeanLocator;
+
+import javax.batch.api.BatchProperty;
+import javax.batch.api.chunk.ItemProcessor;
+import javax.inject.Inject;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class AsynchronousItemProcessor<T> implements ItemProcessor {
+ protected ExecutorService es = null;
+ protected ItemProcessor delegate = null;
+
+ @Inject
+ @BatchProperty
+ private String poolSize;
+
+ @Inject
+ @BatchProperty
+ private String locator;
+
+ @Inject
+ @BatchProperty
+ private String delegateRef;
+
+ protected ExecutorService getExecutor() {
+ if (es == null) {
+ if (poolSize == null || poolSize.trim().isEmpty()) {
+ poolSize = "0";
+ }
+ final int size = Integer.parseInt(poolSize);
+ final DaemonThreadFactory threadFactory = new DaemonThreadFactory();
+ if (size <= 0) {
+ es = Executors.newCachedThreadPool(threadFactory);
+ } else {
+ es = Executors.newFixedThreadPool(size, threadFactory);
+ }
+ }
+ return es;
+ }
+
+ protected ItemProcessor getDelegate() { // note with cdi delegate scope shouldn't need cleanup
+ if (delegate == null) {
+ delegate = BeanLocator.Finder.get(locator).newInstance(ItemProcessor.class, delegateRef).getValue();
+ }
+ return delegate;
+ }
+
+ @Override
+ public Object processItem(final Object o) throws Exception {
+ return getExecutor().submit(new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ return getDelegate().processItem(o);
+ }
+ });
+ }
+
+ public static class DaemonThreadFactory implements ThreadFactory {
+ private static final AtomicInteger ids = new AtomicInteger(0);
+
+ private final ThreadGroup group;
+
+ public DaemonThreadFactory() {
+ final SecurityManager securityManager = System.getSecurityManager();
+ if (securityManager != null) {
+ group = securityManager.getThreadGroup();
+ } else {
+ group = Thread.currentThread().getThreadGroup();
+ }
+ }
+
+ @Override
+ public Thread newThread(final Runnable runnable) {
+ final Thread thread = new Thread(group, runnable, getClass().getSimpleName() + " - " + ids.incrementAndGet());
+ if (!thread.isDaemon()) {
+ thread.setDaemon(true);
+ }
+ if (thread.getPriority() != Thread.NORM_PRIORITY) {
+ thread.setPriority(Thread.NORM_PRIORITY);
+ }
+ return thread;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/1e5e7dd8/extensions/extras/src/main/resources/META-INF/batchee.xml
----------------------------------------------------------------------
diff --git a/extensions/extras/src/main/resources/META-INF/batchee.xml b/extensions/extras/src/main/resources/META-INF/batchee.xml
index 2d31b7e..d5bb1b8 100644
--- a/extensions/extras/src/main/resources/META-INF/batchee.xml
+++ b/extensions/extras/src/main/resources/META-INF/batchee.xml
@@ -28,4 +28,5 @@
<ref id="staxReader" class="org.apache.batchee.extras.stax.StaxItemReader" />
<ref id="staxWriter" class="org.apache.batchee.extras.stax.StaxItemWriter" />
<ref id="beanValidationProcessor" class="org.apache.batchee.extras.validation.BeanValidationItemProcessor" />
+ <ref id="asyncProcessor" class="org.apache.batchee.extras.async.AsynchronousItemProcessor" />
</batch-artifacts>
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/1e5e7dd8/extensions/extras/src/test/java/org/apache/batchee/extras/AsyncProcessorTest.java
----------------------------------------------------------------------
diff --git a/extensions/extras/src/test/java/org/apache/batchee/extras/AsyncProcessorTest.java b/extensions/extras/src/test/java/org/apache/batchee/extras/AsyncProcessorTest.java
new file mode 100644
index 0000000..5fc03a9
--- /dev/null
+++ b/extensions/extras/src/test/java/org/apache/batchee/extras/AsyncProcessorTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.extras;
+
+import org.apache.batchee.util.Batches;
+import org.testng.annotations.Test;
+
+import javax.batch.api.chunk.AbstractItemReader;
+import javax.batch.api.chunk.AbstractItemWriter;
+import javax.batch.api.chunk.ItemProcessor;
+import javax.batch.operations.JobOperator;
+import javax.batch.runtime.BatchRuntime;
+import javax.batch.runtime.BatchStatus;
+import java.util.List;
+import java.util.concurrent.Future;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.testng.Assert.assertEquals;
+
+public class AsyncProcessorTest {
+ @Test
+ public void async() throws Exception {
+ final JobOperator jobOperator = BatchRuntime.getJobOperator();
+ final long id = jobOperator.start("async-processor", null);
+ Batches.waitForEnd(jobOperator, id);
+ assertEquals(BatchStatus.COMPLETED, jobOperator.getJobExecution(id).getBatchStatus());
+ }
+
+
+
+ public static class TwoItemsReader extends AbstractItemReader {
+ private int count = 0;
+
+ @Override
+ public Object readItem() throws Exception {
+ if (count++ < 2) {
+ return "line " + count;
+ }
+ return null;
+ }
+ }
+
+ public static class Delegate implements ItemProcessor {
+ @Override
+ public Object processItem(final Object o) throws Exception {
+ return "line";
+ }
+ }
+
+ public static class Writer extends AbstractItemWriter {
+ @Override
+ public void writeItems(final List<Object> objects) throws Exception {
+ for (final Object o : objects) {
+ assertThat(o, instanceOf(Future.class));
+ assertEquals("line", Future.class.cast(o).get().toString());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/1e5e7dd8/extensions/extras/src/test/resources/META-INF/batch-jobs/async-processor.xml
----------------------------------------------------------------------
diff --git a/extensions/extras/src/test/resources/META-INF/batch-jobs/async-processor.xml b/extensions/extras/src/test/resources/META-INF/batch-jobs/async-processor.xml
new file mode 100644
index 0000000..47b9d4e
--- /dev/null
+++ b/extensions/extras/src/test/resources/META-INF/batch-jobs/async-processor.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ See the NOTICE file distributed with this work for additional information
+ regarding copyright ownership. Licensed under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<job id="async-proc" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
+ <step id="step1">
+ <chunk>
+ <reader ref="org.apache.batchee.extras.AsyncProcessorTest$TwoItemsReader" />
+ <processor ref="org.apache.batchee.extras.async.AsynchronousItemProcessor">
+ <properties>
+ <property name="poolSize" value="2" />
+ <property name="delegateRef" value="org.apache.batchee.extras.AsyncProcessorTest$Delegate" />
+ </properties>
+ </processor>
+ <writer ref="org.apache.batchee.extras.AsyncProcessorTest$Writer" />
+ </chunk>
+ </step>
+</job>