You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rm...@apache.org on 2014/03/02 18:34:26 UTC

git commit: BATCHEE-18 async processor

Repository: incubator-batchee
Updated Branches:
  refs/heads/master 19114f5d3 -> 1e5e7dd82


BATCHEE-18 async processor


Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/1e5e7dd8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/1e5e7dd8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/1e5e7dd8

Branch: refs/heads/master
Commit: 1e5e7dd8228b6801faaa92714edc23f7dde677f8
Parents: 19114f5
Author: Romain Manni-Bucau <rm...@gmail.com>
Authored: Sun Mar 2 18:34:07 2014 +0100
Committer: Romain Manni-Bucau <rm...@gmail.com>
Committed: Sun Mar 2 18:34:07 2014 +0100

----------------------------------------------------------------------
 .../extras/async/AsynchronousItemProcessor.java | 106 +++++++++++++++++++
 .../src/main/resources/META-INF/batchee.xml     |   1 +
 .../batchee/extras/AsyncProcessorTest.java      |  74 +++++++++++++
 .../META-INF/batch-jobs/async-processor.xml     |  29 +++++
 4 files changed, 210 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/1e5e7dd8/extensions/extras/src/main/java/org/apache/batchee/extras/async/AsynchronousItemProcessor.java
----------------------------------------------------------------------
diff --git a/extensions/extras/src/main/java/org/apache/batchee/extras/async/AsynchronousItemProcessor.java b/extensions/extras/src/main/java/org/apache/batchee/extras/async/AsynchronousItemProcessor.java
new file mode 100644
index 0000000..aae2d87
--- /dev/null
+++ b/extensions/extras/src/main/java/org/apache/batchee/extras/async/AsynchronousItemProcessor.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.extras.async;
+
+import org.apache.batchee.extras.locator.BeanLocator;
+
+import javax.batch.api.BatchProperty;
+import javax.batch.api.chunk.ItemProcessor;
+import javax.inject.Inject;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class AsynchronousItemProcessor<T> implements ItemProcessor {
+    protected ExecutorService es = null;
+    protected ItemProcessor delegate = null;
+
+    @Inject
+    @BatchProperty
+    private String poolSize;
+
+    @Inject
+    @BatchProperty
+    private String locator;
+
+    @Inject
+    @BatchProperty
+    private String delegateRef;
+
+    protected ExecutorService getExecutor() {
+        if (es == null) {
+            if (poolSize == null || poolSize.trim().isEmpty()) {
+                poolSize = "0";
+            }
+            final int size = Integer.parseInt(poolSize);
+            final DaemonThreadFactory threadFactory = new DaemonThreadFactory();
+            if (size <= 0) {
+                es = Executors.newCachedThreadPool(threadFactory);
+            } else {
+                es = Executors.newFixedThreadPool(size, threadFactory);
+            }
+        }
+        return es;
+    }
+
+    protected ItemProcessor getDelegate() { // note with cdi delegate scope shouldn't need cleanup
+        if (delegate == null) {
+            delegate = BeanLocator.Finder.get(locator).newInstance(ItemProcessor.class, delegateRef).getValue();
+        }
+        return delegate;
+    }
+
+    @Override
+    public Object processItem(final Object o) throws Exception {
+        return getExecutor().submit(new Callable<Object>() {
+            @Override
+            public Object call() throws Exception {
+                return getDelegate().processItem(o);
+            }
+        });
+    }
+
+    public static class DaemonThreadFactory implements ThreadFactory {
+        private static final AtomicInteger ids = new AtomicInteger(0);
+
+        private final ThreadGroup group;
+
+        public DaemonThreadFactory() {
+            final SecurityManager securityManager = System.getSecurityManager();
+            if (securityManager != null) {
+                group = securityManager.getThreadGroup();
+            } else {
+                group = Thread.currentThread().getThreadGroup();
+            }
+        }
+
+        @Override
+        public Thread newThread(final Runnable runnable) {
+            final Thread thread = new Thread(group, runnable, getClass().getSimpleName() + " - " + ids.incrementAndGet());
+            if (!thread.isDaemon()) {
+                thread.setDaemon(true);
+            }
+            if (thread.getPriority() != Thread.NORM_PRIORITY) {
+                thread.setPriority(Thread.NORM_PRIORITY);
+            }
+            return thread;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/1e5e7dd8/extensions/extras/src/main/resources/META-INF/batchee.xml
----------------------------------------------------------------------
diff --git a/extensions/extras/src/main/resources/META-INF/batchee.xml b/extensions/extras/src/main/resources/META-INF/batchee.xml
index 2d31b7e..d5bb1b8 100644
--- a/extensions/extras/src/main/resources/META-INF/batchee.xml
+++ b/extensions/extras/src/main/resources/META-INF/batchee.xml
@@ -28,4 +28,5 @@
   <ref id="staxReader" class="org.apache.batchee.extras.stax.StaxItemReader" />
   <ref id="staxWriter" class="org.apache.batchee.extras.stax.StaxItemWriter" />
   <ref id="beanValidationProcessor" class="org.apache.batchee.extras.validation.BeanValidationItemProcessor" />
+  <ref id="asyncProcessor" class="org.apache.batchee.extras.async.AsynchronousItemProcessor" />
 </batch-artifacts>

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/1e5e7dd8/extensions/extras/src/test/java/org/apache/batchee/extras/AsyncProcessorTest.java
----------------------------------------------------------------------
diff --git a/extensions/extras/src/test/java/org/apache/batchee/extras/AsyncProcessorTest.java b/extensions/extras/src/test/java/org/apache/batchee/extras/AsyncProcessorTest.java
new file mode 100644
index 0000000..5fc03a9
--- /dev/null
+++ b/extensions/extras/src/test/java/org/apache/batchee/extras/AsyncProcessorTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.extras;
+
+import org.apache.batchee.util.Batches;
+import org.testng.annotations.Test;
+
+import javax.batch.api.chunk.AbstractItemReader;
+import javax.batch.api.chunk.AbstractItemWriter;
+import javax.batch.api.chunk.ItemProcessor;
+import javax.batch.operations.JobOperator;
+import javax.batch.runtime.BatchRuntime;
+import javax.batch.runtime.BatchStatus;
+import java.util.List;
+import java.util.concurrent.Future;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+import static org.testng.Assert.assertEquals;
+
+public class AsyncProcessorTest {
+    @Test
+    public void async() throws Exception {
+        final JobOperator jobOperator = BatchRuntime.getJobOperator();
+        final long id = jobOperator.start("async-processor", null);
+        Batches.waitForEnd(jobOperator, id);
+        assertEquals(BatchStatus.COMPLETED, jobOperator.getJobExecution(id).getBatchStatus());
+    }
+
+
+
+    public static class TwoItemsReader extends AbstractItemReader {
+        private int count = 0;
+
+        @Override
+        public Object readItem() throws Exception {
+            if (count++ < 2) {
+                return "line " + count;
+            }
+            return null;
+        }
+    }
+
+    public static class Delegate implements ItemProcessor {
+        @Override
+        public Object processItem(final Object o) throws Exception {
+            return "line";
+        }
+    }
+
+    public static class Writer extends AbstractItemWriter {
+        @Override
+        public void writeItems(final List<Object> objects) throws Exception {
+            for (final Object o : objects) {
+                assertThat(o, instanceOf(Future.class));
+                assertEquals("line", Future.class.cast(o).get().toString());
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/1e5e7dd8/extensions/extras/src/test/resources/META-INF/batch-jobs/async-processor.xml
----------------------------------------------------------------------
diff --git a/extensions/extras/src/test/resources/META-INF/batch-jobs/async-processor.xml b/extensions/extras/src/test/resources/META-INF/batch-jobs/async-processor.xml
new file mode 100644
index 0000000..47b9d4e
--- /dev/null
+++ b/extensions/extras/src/test/resources/META-INF/batch-jobs/async-processor.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  See the NOTICE file distributed with this work for additional information
+  regarding copyright ownership. Licensed under the Apache License,
+  Version 2.0 (the "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<job id="async-proc" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
+  <step id="step1">
+    <chunk>
+      <reader ref="org.apache.batchee.extras.AsyncProcessorTest$TwoItemsReader" />
+      <processor ref="org.apache.batchee.extras.async.AsynchronousItemProcessor">
+        <properties>
+          <property name="poolSize" value="2" />
+          <property name="delegateRef" value="org.apache.batchee.extras.AsyncProcessorTest$Delegate" />
+        </properties>
+      </processor>
+      <writer ref="org.apache.batchee.extras.AsyncProcessorTest$Writer" />
+    </chunk>
+  </step>
+</job>