You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rm...@apache.org on 2013/11/05 08:39:21 UTC
[44/62] importing batchee from github - a fork from the IBm RI
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/camel/src/main/java/org/apache/batchee/camel/CamelChainItemProcessor.java
----------------------------------------------------------------------
diff --git a/extensions/camel/src/main/java/org/apache/batchee/camel/CamelChainItemProcessor.java b/extensions/camel/src/main/java/org/apache/batchee/camel/CamelChainItemProcessor.java
new file mode 100644
index 0000000..de9a642
--- /dev/null
+++ b/extensions/camel/src/main/java/org/apache/batchee/camel/CamelChainItemProcessor.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.camel;
+
+import org.apache.batchee.extras.chain.Chain;
+import org.apache.batchee.extras.locator.BeanLocator;
+
+import javax.batch.api.BatchProperty;
+import javax.batch.api.chunk.ItemProcessor;
+import javax.inject.Inject;
+
+public class CamelChainItemProcessor extends Chain<ItemProcessor> implements ItemProcessor {
+ @Inject
+ @BatchProperty
+ private String templateLocator;
+
+ private BeanLocator locatorInstance;
+
+ @Override
+ public Object processItem(final Object item) throws Exception {
+ if (locator == null) {
+ locatorInstance = new CamelLocator(templateLocator);
+ } else {
+ locatorInstance = super.getBeanLocator();
+ }
+ return super.runChain(item);
+ }
+
+ @Override
+ protected BeanLocator getBeanLocator() {
+ return locatorInstance;
+ }
+
+ @Override
+ protected Object invoke(final BeanLocator.LocatorInstance<ItemProcessor> next, final Object current) throws Exception {
+ return next.getValue().processItem(current);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/camel/src/main/java/org/apache/batchee/camel/CamelItemProcessor.java
----------------------------------------------------------------------
diff --git a/extensions/camel/src/main/java/org/apache/batchee/camel/CamelItemProcessor.java b/extensions/camel/src/main/java/org/apache/batchee/camel/CamelItemProcessor.java
new file mode 100644
index 0000000..43c8c45
--- /dev/null
+++ b/extensions/camel/src/main/java/org/apache/batchee/camel/CamelItemProcessor.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.camel;
+
+import javax.batch.api.BatchProperty;
+import javax.batch.api.chunk.ItemProcessor;
+import javax.inject.Inject;
+
+public class CamelItemProcessor implements ItemProcessor {
+ @Inject
+ @BatchProperty
+ private String endpoint;
+
+ @Inject
+ @BatchProperty
+ private String templateLocator;
+
+ @Override
+ public Object processItem(final Object item) throws Exception {
+ return CamelBridge.process(templateLocator, endpoint, item);
+ }
+
+ public void setEndpoint(final String endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ public void setLocator(final String locator) {
+ templateLocator = locator;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/camel/src/main/java/org/apache/batchee/camel/CamelItemReader.java
----------------------------------------------------------------------
diff --git a/extensions/camel/src/main/java/org/apache/batchee/camel/CamelItemReader.java b/extensions/camel/src/main/java/org/apache/batchee/camel/CamelItemReader.java
new file mode 100644
index 0000000..bf4d289
--- /dev/null
+++ b/extensions/camel/src/main/java/org/apache/batchee/camel/CamelItemReader.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.camel;
+
+import javax.batch.api.BatchProperty;
+import javax.batch.api.chunk.ItemReader;
+import javax.inject.Inject;
+import java.io.Serializable;
+
+public class CamelItemReader implements ItemReader {
+ @Inject
+ @BatchProperty
+ private String endpoint;
+
+ @Inject
+ @BatchProperty(name = "timeout")
+ private String timeoutStr;
+
+ @Inject
+ @BatchProperty
+ private String expectedClass;
+
+ @Inject
+ @BatchProperty
+ private String templateLocator;
+
+ @Override
+ public Object readItem() throws Exception {
+ final Class<?> expected;
+ if (expectedClass != null) {
+ expected = Thread.currentThread().getContextClassLoader().loadClass(expectedClass);
+ } else {
+ expected = null;
+ }
+
+ final long timeout;
+ if (timeoutStr == null) {
+ timeout = 1000;
+ } else {
+ timeout = Integer.parseInt(timeoutStr);
+ }
+
+ return CamelBridge.receive(templateLocator, endpoint, timeout, expected);
+ }
+
+ @Override
+ public void close() throws Exception {
+ // nothing
+ }
+
+ @Override
+ public void open(final Serializable checkpoint) throws Exception {
+ // no-op: supportable?
+ }
+
+ @Override
+ public Serializable checkpointInfo() throws Exception {
+ return null; // supportable?
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/camel/src/main/java/org/apache/batchee/camel/CamelItemWriter.java
----------------------------------------------------------------------
diff --git a/extensions/camel/src/main/java/org/apache/batchee/camel/CamelItemWriter.java b/extensions/camel/src/main/java/org/apache/batchee/camel/CamelItemWriter.java
new file mode 100644
index 0000000..77b9eb3
--- /dev/null
+++ b/extensions/camel/src/main/java/org/apache/batchee/camel/CamelItemWriter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.camel;
+
+import javax.batch.api.BatchProperty;
+import javax.batch.api.chunk.ItemWriter;
+import javax.inject.Inject;
+import java.io.Serializable;
+import java.util.List;
+
+public class CamelItemWriter implements ItemWriter {
+ @Inject
+ @BatchProperty
+ private String endpoint;
+
+ @Inject
+ @BatchProperty
+ private String templateLocator;
+
+ @Override
+ public void writeItems(final List<Object> items) throws Exception {
+ for (final Object item : items) {
+ CamelBridge.process(templateLocator, endpoint, item);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ // no-op
+ }
+
+ @Override
+ public void open(final Serializable checkpoint) throws Exception {
+ //no-op: supportable?
+ }
+
+ @Override
+ public Serializable checkpointInfo() throws Exception {
+ return null; // supportable?
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/camel/src/main/java/org/apache/batchee/camel/CamelLocator.java
----------------------------------------------------------------------
diff --git a/extensions/camel/src/main/java/org/apache/batchee/camel/CamelLocator.java b/extensions/camel/src/main/java/org/apache/batchee/camel/CamelLocator.java
new file mode 100644
index 0000000..8974bbe
--- /dev/null
+++ b/extensions/camel/src/main/java/org/apache/batchee/camel/CamelLocator.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.camel;
+
+import org.apache.batchee.extras.locator.BeanLocator;
+
+public class CamelLocator implements BeanLocator {
+ private final String locator;
+
+ public CamelLocator(final String templateLocator) {
+ this.locator = templateLocator;
+ }
+
+ @Override
+ public <T> LocatorInstance<T> newInstance(final Class<T> expected, final String batchId) {
+ final CamelItemProcessor camelItemProcessor = new CamelItemProcessor();
+ camelItemProcessor.setEndpoint(batchId);
+ camelItemProcessor.setLocator(locator);
+ return new LocatorInstance<T>((T) camelItemProcessor, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/camel/src/main/java/org/apache/batchee/camel/CamelTemplateLocator.java
----------------------------------------------------------------------
diff --git a/extensions/camel/src/main/java/org/apache/batchee/camel/CamelTemplateLocator.java b/extensions/camel/src/main/java/org/apache/batchee/camel/CamelTemplateLocator.java
new file mode 100644
index 0000000..5a24f19
--- /dev/null
+++ b/extensions/camel/src/main/java/org/apache/batchee/camel/CamelTemplateLocator.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.camel;
+
+import org.apache.camel.ConsumerTemplate;
+import org.apache.camel.ProducerTemplate;
+
+public interface CamelTemplateLocator {
+ ProducerTemplate findProducerTemplate();
+ ConsumerTemplate findConsumerTemplate();
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/camel/src/main/java/org/apache/batchee/camel/component/JBatchComponent.java
----------------------------------------------------------------------
diff --git a/extensions/camel/src/main/java/org/apache/batchee/camel/component/JBatchComponent.java b/extensions/camel/src/main/java/org/apache/batchee/camel/component/JBatchComponent.java
new file mode 100644
index 0000000..01c5422
--- /dev/null
+++ b/extensions/camel/src/main/java/org/apache/batchee/camel/component/JBatchComponent.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.camel.component;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
+
+import javax.batch.operations.JobOperator;
+import javax.batch.runtime.BatchRuntime;
+import java.util.Map;
+
+public class JBatchComponent extends DefaultComponent {
+ private final JobOperator operator;
+
+ public JBatchComponent() {
+ operator = BatchRuntime.getJobOperator();
+ }
+
+ @Override
+ protected Endpoint createEndpoint(final String uri, final String remaining, final Map<String, Object> parameters) throws Exception {
+ return new JBatchEndpoint(uri, remaining, this, operator);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/camel/src/main/java/org/apache/batchee/camel/component/JBatchEndpoint.java
----------------------------------------------------------------------
diff --git a/extensions/camel/src/main/java/org/apache/batchee/camel/component/JBatchEndpoint.java b/extensions/camel/src/main/java/org/apache/batchee/camel/component/JBatchEndpoint.java
new file mode 100644
index 0000000..27a0388
--- /dev/null
+++ b/extensions/camel/src/main/java/org/apache/batchee/camel/component/JBatchEndpoint.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.camel.component;
+
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+
+import javax.batch.operations.JobOperator;
+
+public class JBatchEndpoint extends DefaultEndpoint {
+ private final JobOperator operator;
+ private final String job;
+ private int synchronous;
+ private boolean restart;
+ private boolean abandon;
+ private boolean stop;
+
+ public JBatchEndpoint(final String uri, final String remaining, final Component jBatchComponent, final JobOperator operator) {
+ super(uri, jBatchComponent);
+ this.operator = operator;
+ this.job = remaining;
+ }
+
+ @Override
+ public Producer createProducer() throws Exception {
+ return new JBatchProducer(this, operator, job, synchronous, restart, stop, abandon);
+ }
+
+ public void setSynchronous(final int synchronous) {
+ this.synchronous = synchronous;
+ }
+
+ public void setRestart(final boolean restart) {
+ this.restart = restart;
+ }
+
+ public void setAbandon(final boolean abandon) {
+ this.abandon = abandon;
+ }
+
+ public void setStop(final boolean stop) {
+ this.stop = stop;
+ }
+
+ @Override
+ public Consumer createConsumer(final Processor processor) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isSingleton() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/camel/src/main/java/org/apache/batchee/camel/component/JBatchProducer.java
----------------------------------------------------------------------
diff --git a/extensions/camel/src/main/java/org/apache/batchee/camel/component/JBatchProducer.java b/extensions/camel/src/main/java/org/apache/batchee/camel/component/JBatchProducer.java
new file mode 100644
index 0000000..28a46d4
--- /dev/null
+++ b/extensions/camel/src/main/java/org/apache/batchee/camel/component/JBatchProducer.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.camel.component;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultProducer;
+
+import javax.batch.operations.JobOperator;
+import javax.batch.runtime.BatchStatus;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+
+public class JBatchProducer extends DefaultProducer {
+ public static final String JBATCH_EXECUTION_ID = "JBatchExecutionId";
+ public static final String JBATCH_OPERATOR = "JBatchOperator";
+
+ private final JobOperator operator;
+ private final String jobName;
+ private final int synchronous;
+ private final boolean restart;
+ private final boolean stop;
+ private final boolean abandon;
+
+ public JBatchProducer(final Endpoint jBatchEndpoint, final JobOperator operator, final String job, final int synchronous,
+ final boolean restart, final boolean stop, final boolean abandon) {
+ super(jBatchEndpoint);
+ this.operator = operator;
+ this.jobName = job;
+ this.synchronous = synchronous;
+ this.restart = restart;
+ this.stop = stop;
+ this.abandon = abandon;
+ }
+
+ @Override
+ public void process(final Exchange exchange) throws Exception {
+ final long id;
+ if (stop) {
+ final long stopId = exchange.getIn().getHeader(JBATCH_EXECUTION_ID, Long.class);
+ operator.stop(stopId);
+ return;
+ } else if (abandon) {
+ final long abandonId = exchange.getIn().getHeader(JBATCH_EXECUTION_ID, Long.class);
+ operator.abandon(abandonId);
+ return;
+ } else if (restart) {
+ final long restartId = exchange.getIn().getHeader(JBATCH_EXECUTION_ID, Long.class);
+ id = operator.restart(restartId, toProperties(exchange.getIn().getHeaders()));
+ } else {
+ id = operator.start(jobName, toProperties(exchange.getIn().getHeaders()));
+ }
+
+ exchange.getIn().setHeader(JBATCH_EXECUTION_ID, id);
+ exchange.getIn().setHeader(JBATCH_OPERATOR, operator);
+ if (synchronous > 0) {
+ final Collection<BatchStatus> endStatuses = Arrays.asList(BatchStatus.COMPLETED, BatchStatus.FAILED);
+ do {
+ try {
+ Thread.sleep(synchronous);
+ } catch (final InterruptedException e) {
+ return;
+ }
+ } while (!endStatuses.contains(operator.getJobExecution(id).getBatchStatus()));
+ }
+ }
+
+ private static Properties toProperties(final Map<String, Object> headers) {
+ final Properties parametersBuilder = new Properties();
+ for (final Map.Entry<String, Object> headerEntry : headers.entrySet()) {
+ final String headerKey = headerEntry.getKey();
+ final Object headerValue = headerEntry.getValue();
+ if (headerValue != null) {
+ parametersBuilder.setProperty(headerKey, headerValue.toString());
+ } else {
+ parametersBuilder.setProperty(headerKey, "");
+ }
+ }
+ return parametersBuilder;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/camel/src/main/resources/META-INF/batchee.xml
----------------------------------------------------------------------
diff --git a/extensions/camel/src/main/resources/META-INF/batchee.xml b/extensions/camel/src/main/resources/META-INF/batchee.xml
new file mode 100644
index 0000000..278ffe7
--- /dev/null
+++ b/extensions/camel/src/main/resources/META-INF/batchee.xml
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<batch-artifacts xmlns="http://xmlns.jcp.org/xml/ns/javaee">
+ <ref id="camelReader" class="org.apache.batchee.camel.CamelItemReader" />
+ <ref id="camelProcessor" class="org.apache.batchee.camel.CamelItemProcessor" />
+ <ref id="camelChainProcessor" class="org.apache.batchee.camel.CamelChainItemProcessor" />
+ <ref id="camelWriter" class="org.apache.batchee.camel.CamelItemWriter" />
+</batch-artifacts>
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/camel/src/main/resources/META-INF/services/org/apache/camel/component/jbatch
----------------------------------------------------------------------
diff --git a/extensions/camel/src/main/resources/META-INF/services/org/apache/camel/component/jbatch b/extensions/camel/src/main/resources/META-INF/services/org/apache/camel/component/jbatch
new file mode 100644
index 0000000..7d08d99
--- /dev/null
+++ b/extensions/camel/src/main/resources/META-INF/services/org/apache/camel/component/jbatch
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+class=org.apache.batchee.camel.component.JBatchComponent
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/camel/src/test/java/org/apache/batchee/camel/CamelChainProcessorTest.java
----------------------------------------------------------------------
diff --git a/extensions/camel/src/test/java/org/apache/batchee/camel/CamelChainProcessorTest.java b/extensions/camel/src/test/java/org/apache/batchee/camel/CamelChainProcessorTest.java
new file mode 100644
index 0000000..65b9a00
--- /dev/null
+++ b/extensions/camel/src/test/java/org/apache/batchee/camel/CamelChainProcessorTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.camel;
+
+import org.apache.batchee.util.Batches;
+import org.testng.annotations.Test;
+
+import javax.batch.api.chunk.ItemReader;
+import javax.batch.api.chunk.ItemWriter;
+import javax.batch.operations.JobOperator;
+import javax.batch.runtime.BatchRuntime;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.testng.Assert.assertEquals;
+
+public class CamelChainProcessorTest {
+ @Test
+ public void chain() throws Exception {
+ final JobOperator jobOperator = BatchRuntime.getJobOperator();
+ Batches.waitForEnd(jobOperator, jobOperator.start("camel-chain-processor", new Properties()));
+ assertEquals(StoreItems.ITEMS.size(), 2);
+ assertEquals("1firstsecond", StoreItems.ITEMS.get(0));
+ assertEquals("2firstsecond", StoreItems.ITEMS.get(1));
+ }
+
+ public static class StoreItems implements ItemWriter {
+ public static final List<Object> ITEMS = new ArrayList<Object>(2);
+
+ @Override
+ public void open(final Serializable checkpoint) throws Exception {
+ // no-op
+ }
+
+ @Override
+ public void close() throws Exception {
+ // no-op
+ }
+
+ @Override
+ public void writeItems(final List<Object> items) throws Exception {
+ ITEMS.addAll(items);
+ }
+
+ @Override
+ public Serializable checkpointInfo() throws Exception {
+ return null;
+ }
+ }
+
+ public static class TwoItemsReader implements ItemReader {
+ private int count = 0;
+
+ @Override
+ public void open(final Serializable checkpoint) throws Exception {
+ // no-op
+ }
+
+ @Override
+ public void close() throws Exception {
+ // no-op
+ }
+
+ @Override
+ public Object readItem() throws Exception {
+ if (count++ < 2) {
+ return "" + count;
+ }
+ return null;
+ }
+
+ @Override
+ public Serializable checkpointInfo() throws Exception {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/camel/src/test/java/org/apache/batchee/camel/CamelProcessorTest.java
----------------------------------------------------------------------
diff --git a/extensions/camel/src/test/java/org/apache/batchee/camel/CamelProcessorTest.java b/extensions/camel/src/test/java/org/apache/batchee/camel/CamelProcessorTest.java
new file mode 100644
index 0000000..316035e
--- /dev/null
+++ b/extensions/camel/src/test/java/org/apache/batchee/camel/CamelProcessorTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.camel;
+
+import org.apache.batchee.util.Batches;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.testng.annotations.Test;
+
+import javax.batch.api.chunk.ItemReader;
+import javax.batch.api.chunk.ItemWriter;
+import javax.batch.operations.JobOperator;
+import javax.batch.runtime.BatchRuntime;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.testng.Assert.assertEquals;
+
+public class CamelProcessorTest extends CamelBridge {
+ @Test
+ public void process() throws Exception {
+ final List<Exchange> exchanges = new ArrayList<Exchange>(2);
+ final Consumer consumer = CONTEXT.getEndpoint("direct:processor").createConsumer(new Processor() {
+ @Override
+ public void process(final Exchange exchange) throws Exception {
+ exchanges.add(exchange);
+ }
+ });
+ consumer.start();
+
+ final JobOperator jobOperator = BatchRuntime.getJobOperator();
+ Batches.waitForEnd(jobOperator, jobOperator.start("camel-processor", new Properties()));
+ assertEquals(StoreItems.ITEMS.size(), 2);
+ assertEquals(exchanges.size(), 2);
+
+ for (int i = 1; i <= 2; i++) {
+ assertEquals("" + i, StoreItems.ITEMS.get(i - 1));
+ assertEquals("" + i, exchanges.get(i - 1).getIn().getBody());
+ }
+
+ consumer.stop();
+ }
+
+ public static class StoreItems implements ItemWriter {
+ public static final List<Object> ITEMS = new ArrayList<Object>(2);
+
+ @Override
+ public void open(final Serializable checkpoint) throws Exception {
+ // no-op
+ }
+
+ @Override
+ public void close() throws Exception {
+ // no-op
+ }
+
+ @Override
+ public void writeItems(final List<Object> items) throws Exception {
+ ITEMS.addAll(items);
+ }
+
+ @Override
+ public Serializable checkpointInfo() throws Exception {
+ return null;
+ }
+ }
+
+ public static class TwoItemsReader implements ItemReader {
+ private int count = 0;
+
+ @Override
+ public void open(final Serializable checkpoint) throws Exception {
+ // no-op
+ }
+
+ @Override
+ public void close() throws Exception {
+ // no-op
+ }
+
+ @Override
+ public Object readItem() throws Exception {
+ if (count++ < 2) {
+ return "" + count;
+ }
+ return null;
+ }
+
+ @Override
+ public Serializable checkpointInfo() throws Exception {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/camel/src/test/java/org/apache/batchee/camel/CamelReaderTest.java
----------------------------------------------------------------------
diff --git a/extensions/camel/src/test/java/org/apache/batchee/camel/CamelReaderTest.java b/extensions/camel/src/test/java/org/apache/batchee/camel/CamelReaderTest.java
new file mode 100644
index 0000000..66f4d5f
--- /dev/null
+++ b/extensions/camel/src/test/java/org/apache/batchee/camel/CamelReaderTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.camel;
+
+import org.apache.batchee.util.Batches;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.direct.DirectEndpoint;
+import org.testng.annotations.Test;
+
+import javax.batch.api.chunk.ItemProcessor;
+import javax.batch.operations.JobOperator;
+import javax.batch.runtime.BatchRuntime;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import static org.testng.Assert.assertEquals;
+
+public class CamelReaderTest extends CamelBridge {
+ @Test
+ public void read() throws Exception {
+ final ProducerTemplate tpl = CONTEXT.createProducerTemplate();
+
+ final JobOperator jobOperator = BatchRuntime.getJobOperator();
+
+ final long id = jobOperator.start("camel-reader", new Properties());
+
+ while (DirectEndpoint.class.cast(CONTEXT.getEndpoint("direct:reader")).getConsumer() == null) {
+ Thread.sleep(100);
+ }
+
+ tpl.sendBody("direct:reader", "input#1");
+ tpl.sendBody("direct:reader", null);
+
+ Batches.waitForEnd(jobOperator, id);
+
+ assertEquals(StoreItems.ITEMS.size(), 1);
+ assertEquals("input#1", StoreItems.ITEMS.get(0));
+ }
+
+ public static class StoreItems implements ItemProcessor {
+ public static final List<Object> ITEMS = new ArrayList<Object>(2);
+
+ @Override
+ public Object processItem(final Object item) throws Exception {
+ ITEMS.add(item);
+ return item;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/camel/src/test/java/org/apache/batchee/camel/CamelWriterTest.java
----------------------------------------------------------------------
diff --git a/extensions/camel/src/test/java/org/apache/batchee/camel/CamelWriterTest.java b/extensions/camel/src/test/java/org/apache/batchee/camel/CamelWriterTest.java
new file mode 100644
index 0000000..46ae090
--- /dev/null
+++ b/extensions/camel/src/test/java/org/apache/batchee/camel/CamelWriterTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.camel;
+
+import org.apache.batchee.util.Batches;
+import org.apache.camel.ConsumerTemplate;
+import org.testng.annotations.Test;
+
+import javax.batch.api.chunk.ItemReader;
+import javax.batch.operations.JobOperator;
+import javax.batch.runtime.BatchRuntime;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.assertTrue;
+
+public class CamelWriterTest extends CamelBridge {
+ @Test
+ public void write() throws Exception {
+ final ConsumerTemplate tpl = CONTEXT.createConsumerTemplate();
+ final Collection<Object> received = new ArrayList<Object>(2);
+ final ExecutorService thread = Executors.newFixedThreadPool(1);
+ thread.submit(new Runnable() {
+ @Override
+ public void run() {
+ Object o;
+ do {
+ o = tpl.receiveBody("direct:writer");
+ received.add(o);
+ } while (received.size() < 2);
+ }
+ });
+ thread.shutdown();
+
+ final JobOperator jobOperator = BatchRuntime.getJobOperator();
+ Batches.waitForEnd(jobOperator, jobOperator.start("camel-writer", new Properties()));
+
+ thread.awaitTermination(5, TimeUnit.MINUTES); // wait end of the thread before checking received
+
+ assertTrue(received.contains("1"), received.toString());
+ assertTrue(received.contains("2"), received.toString());
+ }
+
+ public static class Reader implements ItemReader {
+ private int count = 0;
+
+ @Override
+ public void open(final Serializable checkpoint) throws Exception {
+ // no-op
+ }
+
+ @Override
+ public void close() throws Exception {
+ // no-op
+ }
+
+ @Override
+ public Object readItem() throws Exception {
+ if (count++ < 2) {
+ return "" + count;
+ }
+ return null;
+ }
+
+ @Override
+ public Serializable checkpointInfo() throws Exception {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/camel/src/test/java/org/apache/batchee/camel/component/JBatchComponentTest.java
----------------------------------------------------------------------
diff --git a/extensions/camel/src/test/java/org/apache/batchee/camel/component/JBatchComponentTest.java b/extensions/camel/src/test/java/org/apache/batchee/camel/component/JBatchComponentTest.java
new file mode 100644
index 0000000..15425fc
--- /dev/null
+++ b/extensions/camel/src/test/java/org/apache/batchee/camel/component/JBatchComponentTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.camel.component;
+
+import org.apache.batchee.util.Batches;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import javax.batch.api.Batchlet;
+import javax.batch.operations.JobOperator;
+import javax.batch.runtime.context.JobContext;
+import javax.inject.Inject;
+
+public class JBatchComponentTest extends CamelTestSupport {
+ @EndpointInject(uri = "mock:result")
+ protected MockEndpoint resultEndpoint;
+
+ @Produce(uri = "direct:start")
+ protected ProducerTemplate template;
+
+ @Produce(uri = "direct:start-synch")
+ protected ProducerTemplate templateSynch;
+
+ @Test
+ public void checkJobWasExecuted() throws Exception {
+ final Object[] result = Object[].class.cast(template.requestBody(null));
+ final long id = Number.class.cast(result[0]).longValue();
+ final JobOperator operator = JobOperator.class.cast(result[1]);
+ Batches.waitForEnd(operator, id);
+ assertEquals("JBatch-Camel", operator.getJobExecution(id).getExitStatus());
+ }
+
+ @Test
+ public void checkJobWasExecutedSynchonously() throws Exception {
+ final Object[] result = Object[].class.cast(templateSynch.requestBody(null));
+ final long id = Number.class.cast(result[0]).longValue();
+ final JobOperator operator = JobOperator.class.cast(result[1]);
+ assertEquals("JBatch-Camel", operator.getJobExecution(id).getExitStatus());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:start").to("jbatch:component")
+ .process(new Processor() {
+ @Override
+ public void process(final Exchange exchange) throws Exception {
+ exchange.getIn().setBody(new Object[]{exchange.getIn().getHeader(JBatchProducer.JBATCH_EXECUTION_ID), exchange.getIn().getHeader(JBatchProducer.JBATCH_OPERATOR)});
+ }
+ }).to("mock:result");
+
+ from("direct:start-synch").to("jbatch:component?synchronous=100")
+ .process(new Processor() {
+ @Override
+ public void process(final Exchange exchange) throws Exception {
+ exchange.getIn().setBody(new Object[]{exchange.getIn().getHeader(JBatchProducer.JBATCH_EXECUTION_ID), exchange.getIn().getHeader(JBatchProducer.JBATCH_OPERATOR)});
+ }
+ }).to("mock:result");
+ }
+ };
+ }
+
+ public static class ABatchlet implements Batchlet {
+ @Inject
+ private JobContext jobContext;
+
+ @Override
+ public String process() throws Exception {
+ Thread.sleep(1000);
+ jobContext.setExitStatus("JBatch-Camel");
+ return null;
+ }
+
+ @Override
+ public void stop() throws Exception {
+ // no-op
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/camel/src/test/java/org/apache/batchee/camel/util/TestComponent.java
----------------------------------------------------------------------
diff --git a/extensions/camel/src/test/java/org/apache/batchee/camel/util/TestComponent.java b/extensions/camel/src/test/java/org/apache/batchee/camel/util/TestComponent.java
new file mode 100644
index 0000000..d62943f
--- /dev/null
+++ b/extensions/camel/src/test/java/org/apache/batchee/camel/util/TestComponent.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.camel.util;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultProducer;
+
+import java.util.Map;
+
+public class TestComponent extends DefaultComponent {
+ @Override
+ protected Endpoint createEndpoint(final String uri, final String remaining, final Map<String, Object> parameters) throws Exception {
+ final String value = String.class.cast(parameters.remove("value"));
+ return new DefaultEndpoint() {
+ @Override
+ protected String createEndpointUri() {
+ return uri;
+ }
+
+ @Override
+ public Producer createProducer() throws Exception {
+ return new DefaultProducer(this) {
+ @Override
+ public void process(final Exchange exchange) throws Exception {
+ exchange.getIn().setBody(exchange.getIn().getBody(String.class) + value);
+ }
+ };
+ }
+
+ @Override
+ public Consumer createConsumer(final Processor processor) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isSingleton() {
+ return true;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/camel/src/test/resources/META-INF/batch-jobs/camel-chain-processor.xml
----------------------------------------------------------------------
diff --git a/extensions/camel/src/test/resources/META-INF/batch-jobs/camel-chain-processor.xml b/extensions/camel/src/test/resources/META-INF/batch-jobs/camel-chain-processor.xml
new file mode 100644
index 0000000..f4b2977
--- /dev/null
+++ b/extensions/camel/src/test/resources/META-INF/batch-jobs/camel-chain-processor.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ See the NOTICE file distributed with this work for additional information
+ regarding copyright ownership. Licensed under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<job id="flat-reader" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
+ <step id="step1">
+ <chunk>
+ <reader ref="org.apache.batchee.camel.CamelChainProcessorTest$TwoItemsReader" />
+ <processor ref="camelChainProcessor">
+ <properties>
+ <property name="chain" value="test:useless?value=first,test:useless?value=second"/>
+ </properties>
+ </processor>
+ <writer ref="org.apache.batchee.camel.CamelChainProcessorTest$StoreItems" />
+ </chunk>
+ </step>
+</job>
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/camel/src/test/resources/META-INF/batch-jobs/camel-processor.xml
----------------------------------------------------------------------
diff --git a/extensions/camel/src/test/resources/META-INF/batch-jobs/camel-processor.xml b/extensions/camel/src/test/resources/META-INF/batch-jobs/camel-processor.xml
new file mode 100644
index 0000000..97d9040
--- /dev/null
+++ b/extensions/camel/src/test/resources/META-INF/batch-jobs/camel-processor.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ See the NOTICE file distributed with this work for additional information
+ regarding copyright ownership. Licensed under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<job id="flat-reader" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
+ <step id="step1">
+ <chunk>
+ <reader ref="org.apache.batchee.camel.CamelProcessorTest$TwoItemsReader" />
+ <processor ref="camelProcessor">
+ <properties>
+ <property name="endpoint" value="direct:processor"/>
+ </properties>
+ </processor>
+ <writer ref="org.apache.batchee.camel.CamelProcessorTest$StoreItems" />
+ </chunk>
+ </step>
+</job>
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/camel/src/test/resources/META-INF/batch-jobs/camel-reader.xml
----------------------------------------------------------------------
diff --git a/extensions/camel/src/test/resources/META-INF/batch-jobs/camel-reader.xml b/extensions/camel/src/test/resources/META-INF/batch-jobs/camel-reader.xml
new file mode 100644
index 0000000..28268fc
--- /dev/null
+++ b/extensions/camel/src/test/resources/META-INF/batch-jobs/camel-reader.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ See the NOTICE file distributed with this work for additional information
+ regarding copyright ownership. Licensed under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<job id="flat-reader" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
+ <step id="step1">
+ <chunk>
+ <reader ref="camelReader">
+ <properties>
+ <property name="endpoint" value="direct:reader"/>
+ </properties>
+ </reader>
+ <processor ref="org.apache.batchee.camel.CamelReaderTest$StoreItems" />
+ <writer ref="org.apache.batchee.extras.noop.NoopItemWriter" />
+ </chunk>
+ </step>
+</job>
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/camel/src/test/resources/META-INF/batch-jobs/camel-writer.xml
----------------------------------------------------------------------
diff --git a/extensions/camel/src/test/resources/META-INF/batch-jobs/camel-writer.xml b/extensions/camel/src/test/resources/META-INF/batch-jobs/camel-writer.xml
new file mode 100644
index 0000000..3ce7aa8
--- /dev/null
+++ b/extensions/camel/src/test/resources/META-INF/batch-jobs/camel-writer.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ See the NOTICE file distributed with this work for additional information
+ regarding copyright ownership. Licensed under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<job id="flat-reader" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
+ <step id="step1">
+ <chunk>
+ <reader ref="org.apache.batchee.camel.CamelWriterTest$Reader" />
+ <writer ref="camelWriter">
+ <properties>
+ <property name="endpoint" value="direct:writer"/>
+ </properties>
+ </writer>
+ </chunk>
+ </step>
+</job>
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/camel/src/test/resources/META-INF/batch-jobs/component.xml
----------------------------------------------------------------------
diff --git a/extensions/camel/src/test/resources/META-INF/batch-jobs/component.xml b/extensions/camel/src/test/resources/META-INF/batch-jobs/component.xml
new file mode 100644
index 0000000..4cdd61d
--- /dev/null
+++ b/extensions/camel/src/test/resources/META-INF/batch-jobs/component.xml
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ See the NOTICE file distributed with this work for additional information
+ regarding copyright ownership. Licensed under the Apache License,
+ Version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<job id="flat-reader" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
+ <step id="step1">
+ <batchlet ref="org.apache.batchee.camel.component.JBatchComponentTest$ABatchlet" />
+ </step>
+</job>
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/camel/src/test/resources/META-INF/services/org/apache/camel/component/test
----------------------------------------------------------------------
diff --git a/extensions/camel/src/test/resources/META-INF/services/org/apache/camel/component/test b/extensions/camel/src/test/resources/META-INF/services/org/apache/camel/component/test
new file mode 100644
index 0000000..f5fa1b5
--- /dev/null
+++ b/extensions/camel/src/test/resources/META-INF/services/org/apache/camel/component/test
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+class=org.apache.batchee.camel.util.TestComponent
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/cdi/pom.xml
----------------------------------------------------------------------
diff --git a/extensions/cdi/pom.xml b/extensions/cdi/pom.xml
new file mode 100644
index 0000000..3b92ac8
--- /dev/null
+++ b/extensions/cdi/pom.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>batchee-extensions</artifactId>
+ <groupId>org.apache.batchee</groupId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>batchee-cdi</artifactId>
+ <name>BatchEE :: Extensions :: CDI</name>
+ <description>This module implements some CDI scopes batch oriented.</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jcdi_1.0_spec</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-servlet_3.0_spec</artifactId>
+ <version>1.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-interceptor_1.1_spec</artifactId>
+ <version>1.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.openwebbeans.test</groupId>
+ <artifactId>cditest-owb</artifactId>
+ <version>1.2.1-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BaseContext.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BaseContext.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BaseContext.java
new file mode 100644
index 0000000..e76ca88
--- /dev/null
+++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BaseContext.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.cdi.impl;
+
+import javax.enterprise.context.ContextNotActiveException;
+import javax.enterprise.context.spi.Context;
+import javax.enterprise.context.spi.Contextual;
+import javax.enterprise.context.spi.CreationalContext;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public abstract class BaseContext<K> implements Context {
+ private final ConcurrentMap<K, ConcurrentMap<Contextual<?>, Instance<?>>> storages = new ConcurrentHashMap<K, ConcurrentMap<Contextual<?>, Instance<?>>>();
+
+ /**
+ * @return current keys (we inherit contexts here) sorted by order (the last is the most specific)
+ */
+ protected abstract K[] currentKeys();
+
+ @Override
+ public <T> T get(final Contextual<T> component, final CreationalContext<T> creationalContext) {
+ checkActive();
+
+ final ConcurrentMap<Contextual<?>, Instance<?>> storage = getOrCreateCurrentStorage();
+ Instance<T> instance = (Instance<T>) storage.get(component);
+ if (instance == null) {
+ synchronized (this) {
+ instance = (Instance<T>) storage.get(component);
+ if (instance == null) {
+ final T value = component.create(creationalContext);
+ instance = new Instance<T>(value, creationalContext);
+ storage.putIfAbsent(component, instance);
+ }
+ }
+ }
+
+ return instance.value;
+ }
+
+ @Override
+ public <T> T get(final Contextual<T> component) {
+ checkActive();
+
+ for (final K key : currentKeys()) {
+ final ConcurrentMap<Contextual<?>, Instance<?>> storage = storages.get(key);
+ if (storage != null) {
+ final Instance<?> instance = storage.get(component);
+ if (instance == null) {
+ return null;
+ }
+ return (T) instance.value;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public boolean isActive() {
+ final K[] ks = currentKeys();
+ return ks != null && ks.length != 0;
+ }
+
+ public void endContext() {
+ final ConcurrentMap<Contextual<?>, Instance<?>> storage = storages.remove(lastKey());
+ if (storage == null) {
+ return;
+ }
+
+ for (final Map.Entry<Contextual<?>, Instance<?>> entry : storage.entrySet()) {
+ final Instance<?> instance = entry.getValue();
+ Contextual.class.cast(entry.getKey()).destroy(instance.value, instance.context);
+ }
+ storage.clear();
+ }
+
+ private K lastKey() {
+ final K[] keys = currentKeys();
+ if (keys == null || keys.length == 0) {
+ return null;
+ }
+ return keys[keys.length - 1];
+ }
+
+ private void checkActive() {
+ if (!isActive()) {
+ throw new ContextNotActiveException("CDI context with scope annotation @" + getScope().getName() + " is not active with respect to the current thread");
+ }
+ }
+
+ private ConcurrentMap<Contextual<?>, Instance<?>> getOrCreateCurrentStorage() {
+ final K key = lastKey();
+
+ ConcurrentMap<Contextual<?>, Instance<?>> storage = storages.get(key);
+ if (storage == null) {
+ storage = new ConcurrentHashMap<Contextual<?>, Instance<?>>();
+ final ConcurrentMap<Contextual<?>, Instance<?>> existing = storages.putIfAbsent(key, storage);
+ if (existing != null) {
+ storage = existing;
+ }
+ }
+ return storage;
+ }
+
+ private static class Instance<T> {
+ private final T value;
+ private final CreationalContext<T> context;
+
+ private Instance(final T value, final CreationalContext<T> context) {
+ this.value = value;
+ this.context = context;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BatchEEScopeExtension.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BatchEEScopeExtension.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BatchEEScopeExtension.java
new file mode 100644
index 0000000..ba63756
--- /dev/null
+++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/BatchEEScopeExtension.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.cdi.impl;
+
+import javax.enterprise.event.Observes;
+import javax.enterprise.inject.spi.AfterBeanDiscovery;
+import javax.enterprise.inject.spi.BeanManager;
+import javax.enterprise.inject.spi.Extension;
+import javax.enterprise.inject.spi.ProcessAnnotatedType;
+
+public class BatchEEScopeExtension implements Extension {
+ void vetoInternalBeans(final @Observes ProcessAnnotatedType<?> pat) {
+ if (pat.getAnnotatedType().getJavaClass().getName().startsWith(BatchEEScopeExtension.class.getPackage().getName())) {
+ pat.veto();
+ }
+ }
+
+ void addBatchScopes(final @Observes AfterBeanDiscovery afterBeanDiscovery, final BeanManager bm) {
+ afterBeanDiscovery.addContext(JobContextImpl.INSTANCE);
+ afterBeanDiscovery.addContext(StepContextImpl.INSTANCE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/JobContextImpl.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/JobContextImpl.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/JobContextImpl.java
new file mode 100644
index 0000000..aab48e3
--- /dev/null
+++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/JobContextImpl.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.cdi.impl;
+
+import org.apache.batchee.cdi.scope.JobScoped;
+
+import java.lang.annotation.Annotation;
+
+import static org.apache.batchee.cdi.impl.LocationHolder.currentJob;
+
+public class JobContextImpl extends BaseContext<JobContextImpl.JobKey> {
+ public static final BaseContext<?> INSTANCE = new JobContextImpl();
+
+ private JobContextImpl() {
+ // no-op
+ }
+
+ @Override
+ public Class<? extends Annotation> getScope() {
+ return JobScoped.class;
+ }
+
+ @Override
+ protected JobKey[] currentKeys() {
+ return new JobKey[] { new JobKey(currentJob().getExecutionId()) };
+ }
+
+ public static class JobKey {
+ private final long executionId;
+
+ private final int hashCode;
+
+ public JobKey(final long executionId) {
+ this.executionId = executionId;
+
+ hashCode = (int) (executionId ^ (executionId >>> 32));
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ return this == o
+ || (!(o == null || getClass() != o.getClass()) && executionId == JobKey.class.cast(o).executionId);
+
+ }
+
+ @Override
+ public int hashCode() {
+ return hashCode;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/LocationHolder.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/LocationHolder.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/LocationHolder.java
new file mode 100644
index 0000000..92285dc
--- /dev/null
+++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/LocationHolder.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.cdi.impl;
+
+import javax.batch.runtime.context.JobContext;
+import javax.batch.runtime.context.StepContext;
+import java.util.LinkedList;
+
+public abstract class LocationHolder {
+ private static final StashThreadLocal<JobContext> JOB = new StashThreadLocal<JobContext>();
+ private static final StashThreadLocal<StepContext> STEP = new StashThreadLocal<StepContext>();
+
+ protected static void enterJob(final JobContext jc) {
+ JOB.get().add(jc);
+ }
+
+ protected static void enterStep(final StepContext sc) {
+ STEP.get().add(sc);
+ }
+
+ protected static void exitStep(final BaseContext<?> context) {
+ cleanUp(context, STEP);
+ }
+
+ protected static void exitJob(final BaseContext<?> context) {
+ cleanUp(context, JOB);
+ }
+
+ public static JobContext currentJob() {
+ final LinkedList<JobContext> jobContexts = JOB.get();
+ if (jobContexts.isEmpty()) {
+ throw new IllegalStateException("No job registered, did you set the job listener?");
+ }
+ return jobContexts.getLast();
+ }
+
+ public static LinkedList<StepContext> currentSteps() {
+ return STEP.get();
+ }
+
+ private static <T, K> void cleanUp(final BaseContext<K> context, final StashThreadLocal<T> stash) {
+ context.endContext();
+
+ final LinkedList<T> stepContexts = stash.get();
+ stepContexts.removeLast();
+ if (stepContexts.isEmpty()) {
+ stash.remove();
+ }
+ }
+
+ private static class StashThreadLocal<T> extends ThreadLocal<LinkedList<T>> {
+ @Override
+ public LinkedList<T> initialValue() {
+ return new LinkedList<T>();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/StepContextImpl.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/StepContextImpl.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/StepContextImpl.java
new file mode 100644
index 0000000..3c85d55
--- /dev/null
+++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/impl/StepContextImpl.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.cdi.impl;
+
+import org.apache.batchee.cdi.scope.StepScoped;
+
+import javax.batch.runtime.context.StepContext;
+import java.lang.annotation.Annotation;
+import java.util.List;
+
+import static org.apache.batchee.cdi.impl.LocationHolder.currentSteps;
+
+public class StepContextImpl extends BaseContext<StepContextImpl.StepKey> {
+ public static final BaseContext<?> INSTANCE = new StepContextImpl();
+
+ private StepContextImpl() {
+ // no-op
+ }
+
+ @Override
+ public Class<? extends Annotation> getScope() {
+ return StepScoped.class;
+ }
+
+ @Override
+ protected StepKey[] currentKeys() {
+ final List<StepContext> stepContexts = currentSteps();
+ final StepKey[] keys = new StepKey[stepContexts.size()];
+
+ int i = 0;
+ for (final StepContext stepContext : stepContexts) {
+ keys[i++] = new StepKey(stepContext.getStepExecutionId());
+ }
+ return keys;
+ }
+
+ public static class StepKey {
+ private final long stepExecutionId;
+
+ private final int hashCode;
+
+ public StepKey(final long stepExecutionId) {
+ this.stepExecutionId = stepExecutionId;
+
+ hashCode = (int) (stepExecutionId ^ (stepExecutionId >>> 32));
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ return this == o
+ || (!(o == null || getClass() != o.getClass()) && stepExecutionId == StepKey.class.cast(o).stepExecutionId);
+
+ }
+
+ @Override
+ public int hashCode() {
+ return hashCode;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterJobScopeListener.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterJobScopeListener.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterJobScopeListener.java
new file mode 100644
index 0000000..041a0e7
--- /dev/null
+++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterJobScopeListener.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.cdi.listener;
+
+import org.apache.batchee.cdi.impl.JobContextImpl;
+import org.apache.batchee.cdi.impl.LocationHolder;
+
+import javax.batch.api.listener.JobListener;
+import javax.inject.Named;
+
+@Named
+public class AfterJobScopeListener extends LocationHolder implements JobListener {
+ @Override
+ public void beforeJob() throws Exception {
+ // no-op
+ }
+
+ @Override
+ public void afterJob() throws Exception {
+ exitJob(JobContextImpl.INSTANCE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterStepScopeListener.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterStepScopeListener.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterStepScopeListener.java
new file mode 100644
index 0000000..aa1fb21
--- /dev/null
+++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/AfterStepScopeListener.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.cdi.listener;
+
+import org.apache.batchee.cdi.impl.LocationHolder;
+import org.apache.batchee.cdi.impl.StepContextImpl;
+
+import javax.batch.api.listener.StepListener;
+import javax.inject.Named;
+
+@Named
+public class AfterStepScopeListener extends LocationHolder implements StepListener {
+ @Override
+ public void beforeStep() throws Exception {
+ // no-op
+ }
+
+ @Override
+ public void afterStep() throws Exception {
+ exitStep(StepContextImpl.INSTANCE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeJobScopeListener.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeJobScopeListener.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeJobScopeListener.java
new file mode 100644
index 0000000..4b54729
--- /dev/null
+++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeJobScopeListener.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.cdi.listener;
+
+import org.apache.batchee.cdi.impl.LocationHolder;
+
+import javax.batch.api.listener.JobListener;
+import javax.batch.runtime.context.JobContext;
+import javax.inject.Inject;
+import javax.inject.Named;
+
+@Named
+public class BeforeJobScopeListener extends LocationHolder implements JobListener {
+ @Inject
+ private JobContext jobContext;
+
+ @Override
+ public void beforeJob() throws Exception {
+ enterJob(jobContext);
+ }
+
+ @Override
+ public void afterJob() throws Exception {
+ // no-op
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeStepScopeListener.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeStepScopeListener.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeStepScopeListener.java
new file mode 100644
index 0000000..fefc074
--- /dev/null
+++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/listener/BeforeStepScopeListener.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.cdi.listener;
+
+import org.apache.batchee.cdi.impl.LocationHolder;
+
+import javax.batch.api.listener.StepListener;
+import javax.batch.runtime.context.StepContext;
+import javax.inject.Inject;
+import javax.inject.Named;
+
+@Named
+public class BeforeStepScopeListener extends LocationHolder implements StepListener {
+ @Inject
+ private StepContext stepContext;
+
+ @Override
+ public void beforeStep() throws Exception {
+ enterStep(stepContext);
+ }
+
+ @Override
+ public void afterStep() throws Exception {
+ // no-op
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/cdi/src/main/java/org/apache/batchee/cdi/scope/JobScoped.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/scope/JobScoped.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/scope/JobScoped.java
new file mode 100644
index 0000000..7d4d4a1
--- /dev/null
+++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/scope/JobScoped.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.cdi.scope;
+
+import javax.enterprise.context.NormalScope;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+import static java.lang.annotation.ElementType.TYPE;
+
+@Target(TYPE)
+@Retention(RUNTIME)
+@NormalScope
+public @interface JobScoped {
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/cdi/src/main/java/org/apache/batchee/cdi/scope/StepScoped.java
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/main/java/org/apache/batchee/cdi/scope/StepScoped.java b/extensions/cdi/src/main/java/org/apache/batchee/cdi/scope/StepScoped.java
new file mode 100644
index 0000000..aaba852
--- /dev/null
+++ b/extensions/cdi/src/main/java/org/apache/batchee/cdi/scope/StepScoped.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.batchee.cdi.scope;
+
+import javax.enterprise.context.NormalScope;
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import static java.lang.annotation.ElementType.TYPE;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+@Target(TYPE)
+@Retention(RUNTIME)
+@NormalScope
+public @interface StepScoped {
+}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/extensions/cdi/src/main/resources/META-INF/beans.xml
----------------------------------------------------------------------
diff --git a/extensions/cdi/src/main/resources/META-INF/beans.xml b/extensions/cdi/src/main/resources/META-INF/beans.xml
new file mode 100644
index 0000000..c913a0e
--- /dev/null
+++ b/extensions/cdi/src/main/resources/META-INF/beans.xml
@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<beans xmlns="http://java.sun.com/xml/ns/javaee"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/beans_1_0.xsd"/>