You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by st...@apache.org on 2014/04/24 14:58:27 UTC
[4/4] git commit: BATCHEE-29 delete more 'old' proxies
BATCHEE-29 delete more 'old' proxies
and replace them with 'real' dynamic proxies
Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/cb289b9e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/cb289b9e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/cb289b9e
Branch: refs/heads/master
Commit: cb289b9e2fd3d368814469a761e74a64960dd527
Parents: 914cd64
Author: Mark Struberg <st...@apache.org>
Authored: Thu Apr 24 14:56:53 2014 +0200
Committer: Mark Struberg <st...@apache.org>
Committed: Thu Apr 24 14:56:53 2014 +0200
----------------------------------------------------------------------
.../controller/chunk/CheckpointManager.java | 6 +-
.../controller/chunk/ChunkStepController.java | 138 ++++++++++++-------
.../proxy/ItemProcessListenerProxy.java | 58 --------
.../container/proxy/ItemProcessorProxy.java | 35 -----
.../container/proxy/ItemReadListenerProxy.java | 58 --------
.../container/proxy/ItemWriteListenerProxy.java | 59 --------
.../container/proxy/ItemWriterProxy.java | 70 ----------
.../container/proxy/ListenerFactory.java | 21 ++-
8 files changed, 100 insertions(+), 345 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/cb289b9e/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
index 48390e1..b5b6a3c 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
@@ -18,11 +18,11 @@ package org.apache.batchee.container.impl.controller.chunk;
import org.apache.batchee.container.exception.BatchContainerRuntimeException;
import org.apache.batchee.container.exception.BatchContainerServiceException;
-import org.apache.batchee.container.proxy.ItemWriterProxy;
import org.apache.batchee.spi.PersistenceManagerService;
import javax.batch.api.chunk.CheckpointAlgorithm;
import javax.batch.api.chunk.ItemReader;
+import javax.batch.api.chunk.ItemWriter;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
@@ -30,13 +30,13 @@ import java.io.ObjectOutputStream;
public class CheckpointManager {
private final PersistenceManagerService persistenceManagerService;
private final ItemReader readerProxy;
- private final ItemWriterProxy writerProxy;
+ private final ItemWriter writerProxy;
private final CheckpointAlgorithm checkpointAlgorithm;
private final String stepId;
private final long jobInstanceID;
- public CheckpointManager(final ItemReader reader, final ItemWriterProxy writer,
+ public CheckpointManager(final ItemReader reader, final ItemWriter writer,
final CheckpointAlgorithm chkptAlg,
final long jobInstanceID, final String stepId,
final PersistenceManagerService persistenceManagerService) {
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/cb289b9e/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
index ff91ebd..2a551da 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
@@ -25,11 +25,6 @@ import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
import org.apache.batchee.container.proxy.CheckpointAlgorithmProxy;
import org.apache.batchee.container.proxy.ChunkListenerProxy;
import org.apache.batchee.container.proxy.InjectionReferences;
-import org.apache.batchee.container.proxy.ItemProcessListenerProxy;
-import org.apache.batchee.container.proxy.ItemProcessorProxy;
-import org.apache.batchee.container.proxy.ItemReadListenerProxy;
-import org.apache.batchee.container.proxy.ItemWriteListenerProxy;
-import org.apache.batchee.container.proxy.ItemWriterProxy;
import org.apache.batchee.container.proxy.ProxyFactory;
import org.apache.batchee.container.proxy.RetryProcessListenerProxy;
import org.apache.batchee.container.proxy.RetryReadListenerProxy;
@@ -41,15 +36,18 @@ import org.apache.batchee.container.services.ServicesManager;
import org.apache.batchee.container.util.PartitionDataWrapper;
import org.apache.batchee.container.util.TCCLObjectInputStream;
import org.apache.batchee.jaxb.Chunk;
-import org.apache.batchee.jaxb.ItemProcessor;
-import org.apache.batchee.jaxb.ItemWriter;
import org.apache.batchee.jaxb.Property;
import org.apache.batchee.jaxb.Step;
import org.apache.batchee.spi.BatchArtifactFactory;
import org.apache.batchee.spi.PersistenceManagerService;
import javax.batch.api.chunk.CheckpointAlgorithm;
+import javax.batch.api.chunk.ItemProcessor;
import javax.batch.api.chunk.ItemReader;
+import javax.batch.api.chunk.ItemWriter;
+import javax.batch.api.chunk.listener.ItemProcessListener;
+import javax.batch.api.chunk.listener.ItemReadListener;
+import javax.batch.api.chunk.listener.ItemWriteListener;
import javax.batch.runtime.BatchStatus;
import java.io.ByteArrayInputStream;
import java.io.Serializable;
@@ -68,8 +66,8 @@ public class ChunkStepController extends SingleThreadedStepController {
private Chunk chunk = null;
private ItemReader readerProxy = null;
- private ItemProcessorProxy processorProxy = null;
- private ItemWriterProxy writerProxy = null;
+ private ItemProcessor processorProxy = null;
+ private ItemWriter writerProxy = null;
private CheckpointAlgorithmProxy checkpointProxy = null;
private CheckpointAlgorithm chkptAlg = null;
private CheckpointManager checkpointManager;
@@ -77,9 +75,9 @@ public class ChunkStepController extends SingleThreadedStepController {
private CheckpointDataKey readerChkptDK = null;
private CheckpointDataKey writerChkptDK = null;
private List<ChunkListenerProxy> chunkListeners = null;
- private List<ItemReadListenerProxy> itemReadListeners = null;
- private List<ItemProcessListenerProxy> itemProcessListeners = null;
- private List<ItemWriteListenerProxy> itemWriteListeners = null;
+ private List<ItemReadListener> itemReadListeners = null;
+ private List<ItemProcessListener> itemProcessListeners = null;
+ private List<ItemWriteListener> itemWriteListeners = null;
private RetryHandler retryHandler;
private boolean rollbackRetry = false;
@@ -232,13 +230,13 @@ public class ChunkStepController extends SingleThreadedStepController {
try {
// call read listeners before and after the actual read
- for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
+ for (ItemReadListener readListenerProxy : itemReadListeners) {
readListenerProxy.beforeRead();
}
itemRead = readerProxy.readItem();
- for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
+ for (ItemReadListener readListenerProxy : itemReadListeners) {
readListenerProxy.afterRead(itemRead);
}
@@ -250,13 +248,21 @@ public class ChunkStepController extends SingleThreadedStepController {
}
} catch (Exception e) {
stepContext.setException(e);
- for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
- readListenerProxy.onReadError(e);
+ for (ItemReadListener readListenerProxy : itemReadListeners) {
+ try {
+ readListenerProxy.onReadError(e);
+ } catch (Exception e1) {
+ handleBatchException(e1);
+ }
}
if (!rollbackRetry) {
if (retryReadException(e)) {
- for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
- readListenerProxy.onReadError(e);
+ for (ItemReadListener readListenerProxy : itemReadListeners) {
+ try {
+ readListenerProxy.onReadError(e);
+ } catch (Exception e1) {
+ handleBatchException(e1);
+ }
}
// if not a rollback exception, just retry the current item
if (!retryHandler.isRollbackException(e)) {
@@ -300,6 +306,14 @@ public class ChunkStepController extends SingleThreadedStepController {
return itemRead;
}
+ private void handleBatchException(Exception e) {
+ if (e instanceof RuntimeException) {
+ throw (RuntimeException) e;
+ } else {
+ throw new BatchContainerRuntimeException(e);
+ }
+ }
+
/**
* Process an item previously read by the reader
*
@@ -318,7 +332,7 @@ public class ChunkStepController extends SingleThreadedStepController {
try {
// call process listeners before and after the actual process call
- for (final ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+ for (final ItemProcessListener processListenerProxy : itemProcessListeners) {
processListenerProxy.beforeProcess(itemRead);
}
@@ -330,20 +344,28 @@ public class ChunkStepController extends SingleThreadedStepController {
status.setFiltered(true);
}
- for (final ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+ for (final ItemProcessListener processListenerProxy : itemProcessListeners) {
processListenerProxy.afterProcess(itemRead, processedItem);
}
} catch (final Exception e) {
- for (final ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
- processListenerProxy.onProcessError(processedItem, e);
+ for (final ItemProcessListener processListenerProxy : itemProcessListeners) {
+ try {
+ processListenerProxy.onProcessError(processedItem, e);
+ } catch (Exception e1) {
+ handleBatchException(e1);
+ }
}
if (!rollbackRetry) {
if (retryProcessException(e, itemRead)) {
if (!retryHandler.isRollbackException(e)) {
// call process listeners before and after the actual
// process call
- for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
- processListenerProxy.beforeProcess(itemRead);
+ for (ItemProcessListener processListenerProxy : itemProcessListeners) {
+ try {
+ processListenerProxy.beforeProcess(itemRead);
+ } catch (Exception e1) {
+ handleBatchException(e1);
+ }
}
processedItem = processItem(itemRead, status);
if (processedItem == null) {
@@ -352,8 +374,12 @@ public class ChunkStepController extends SingleThreadedStepController {
status.setFiltered(true);
}
- for (final ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
- processListenerProxy.afterProcess(itemRead, processedItem);
+ for (final ItemProcessListener processListenerProxy : itemProcessListeners) {
+ try {
+ processListenerProxy.afterProcess(itemRead, processedItem);
+ } catch (Exception e1) {
+ handleBatchException(e1);
+ }
}
} else {
status.setRollback(true);
@@ -375,8 +401,12 @@ public class ChunkStepController extends SingleThreadedStepController {
if (!retryHandler.isRollbackException(e)) {
// call process listeners before and after the actual
// process call
- for (final ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
- processListenerProxy.beforeProcess(itemRead);
+ for (final ItemProcessListener processListenerProxy : itemProcessListeners) {
+ try {
+ processListenerProxy.beforeProcess(itemRead);
+ } catch (Exception e1) {
+ handleBatchException(e1);
+ }
}
processedItem = processItem(itemRead, status);
if (processedItem == null) {
@@ -385,8 +415,12 @@ public class ChunkStepController extends SingleThreadedStepController {
status.setFiltered(true);
}
- for (final ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
- processListenerProxy.afterProcess(itemRead, processedItem);
+ for (final ItemProcessListener processListenerProxy : itemProcessListeners) {
+ try {
+ processListenerProxy.afterProcess(itemRead, processedItem);
+ } catch (Exception e1) {
+ handleBatchException(e1);
+ }
}
} else {
status.setRollback(true);
@@ -416,20 +450,24 @@ public class ChunkStepController extends SingleThreadedStepController {
try {
// call read listeners before and after the actual read
- for (ItemWriteListenerProxy writeListenerProxy : itemWriteListeners) {
+ for (ItemWriteListener writeListenerProxy : itemWriteListeners) {
writeListenerProxy.beforeWrite(theChunk);
}
writerProxy.writeItems(theChunk);
- for (ItemWriteListenerProxy writeListenerProxy : itemWriteListeners) {
+ for (ItemWriteListener writeListenerProxy : itemWriteListeners) {
writeListenerProxy.afterWrite(theChunk);
}
stepContext.getMetric(MetricImpl.MetricType.WRITE_COUNT).incValueBy(theChunk.size());
} catch (Exception e) {
this.stepContext.setException(e);
- for (ItemWriteListenerProxy writeListenerProxy : itemWriteListeners) {
- writeListenerProxy.onWriteError(theChunk, e);
+ for (ItemWriteListener writeListenerProxy : itemWriteListeners) {
+ try {
+ writeListenerProxy.onWriteError(theChunk, e);
+ } catch (Exception e1) {
+ handleBatchException(e1);
+ }
}
if (!rollbackRetry) {
if (retryWriteException(e, theChunk)) {
@@ -603,10 +641,10 @@ public class ChunkStepController extends SingleThreadedStepController {
transactionManager.setRollbackOnly();
try {
readerProxy.close();
+ writerProxy.close();
} catch (Exception e) {
// ignore, we blow up anyway
}
- writerProxy.close();
transactionManager.rollback();
throw new BatchContainerRuntimeException("Failure in Read-Process-Write Loop", t);
}
@@ -647,7 +685,7 @@ public class ChunkStepController extends SingleThreadedStepController {
}
{
- final ItemProcessor itemProcessor = chunk.getProcessor();
+ final org.apache.batchee.jaxb.ItemProcessor itemProcessor = chunk.getProcessor();
if (itemProcessor != null) {
final List<Property> itemProcessorProps = itemProcessor.getProperties() == null ? null : itemProcessor.getProperties().getPropertyList();
final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, itemProcessorProps);
@@ -656,7 +694,7 @@ public class ChunkStepController extends SingleThreadedStepController {
}
{
- final ItemWriter itemWriter = chunk.getWriter();
+ final org.apache.batchee.jaxb.ItemWriter itemWriter = chunk.getWriter();
final List<Property> itemWriterProps = itemWriter.getProperties() == null ? null : itemWriter.getProperties().getPropertyList();
final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, itemWriterProps);
writerProxy = ProxyFactory.createItemWriterProxy(artifactFactory, itemWriter.getRef(), injectionRef, stepContext, jobExecutionImpl);
@@ -733,7 +771,7 @@ public class ChunkStepController extends SingleThreadedStepController {
readerOIS.close();
} catch (final Exception ex) {
// is this what I should be throwing here?
- throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
+ throw new BatchContainerServiceException("Cannot read the checkpoint data for [" + step.getId() + "]", ex);
}
} else {
// no chkpt data exists in the backing store
@@ -742,7 +780,7 @@ public class ChunkStepController extends SingleThreadedStepController {
readerProxy.open(null);
} catch (final Exception ex) {
// is this what I should be throwing here?
- throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
+ throw new BatchContainerServiceException("Exception while opening step [" + step.getId() + "]", ex);
}
}
} catch (final ClassCastException e) {
@@ -762,24 +800,20 @@ public class ChunkStepController extends SingleThreadedStepController {
writerProxy.open((Serializable) writerOIS.readObject());
writerOIS.close();
} catch (final Exception ex) {
- // is this what I should be throwing here?
throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
}
} else {
// no chkpt data exists in the backing store
writerChkptData = null;
- writerProxy.open(null);
+ try {
+ writerProxy.open(null);
+ } catch (Exception e) {
+ handleBatchException(e);
+ }
}
} catch (final ClassCastException e) {
throw new IllegalStateException("Expected Checkpoint but found" + writerChkptData);
}
-
- // set up metrics
- // stepContext.addMetric(MetricImpl.Counter.valueOf("READ_COUNT"), 0);
- // stepContext.addMetric(MetricImpl.Counter.valueOf("WRITE_COUNT"), 0);
- // stepContext.addMetric(MetricImpl.Counter.valueOf("READ_SKIP_COUNT"), 0);
- // stepContext.addMetric(MetricImpl.Counter.valueOf("PROCESS_SKIP_COUNT"), 0);
- // stepContext.addMetric(MetricImpl.Counter.valueOf("WRITE_SKIP_COUNT"), 0);
}
@Override
@@ -897,12 +931,16 @@ public class ChunkStepController extends SingleThreadedStepController {
writerOIS.close();
} catch (Exception ex) {
// is this what I should be throwing here?
- throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
+ throw new BatchContainerServiceException("Cannot read the checkpoint data for [" + step.getId() + "]", ex);
}
} else {
// no chkpt data exists in the backing store
writerData = null;
- writerProxy.open(null);
+ try {
+ writerProxy.open(null);
+ } catch (Exception ex) {
+ throw new BatchContainerServiceException("Cannot open the step [" + step.getId() + "]", ex);
+ }
}
} catch (ClassCastException e) {
throw new IllegalStateException("Expected CheckpointData but found" + writerData);
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/cb289b9e/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemProcessListenerProxy.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemProcessListenerProxy.java b/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemProcessListenerProxy.java
deleted file mode 100755
index 0298e1b..0000000
--- a/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemProcessListenerProxy.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.batchee.container.proxy;
-
-import org.apache.batchee.container.exception.BatchContainerRuntimeException;
-
-import javax.batch.api.chunk.listener.ItemProcessListener;
-
-public class ItemProcessListenerProxy extends AbstractProxy<ItemProcessListener> implements ItemProcessListener {
- ItemProcessListenerProxy(final ItemProcessListener delegate) {
- super(delegate);
-
- }
-
- @Override
- public void afterProcess(Object item, Object result) {
- try {
- this.delegate.afterProcess(item, result);
- } catch (final Exception e) {
- this.stepContext.setException(e);
- throw new BatchContainerRuntimeException(e);
- }
- }
-
- @Override
- public void beforeProcess(final Object item) {
- try {
- this.delegate.beforeProcess(item);
- } catch (final Exception e) {
- this.stepContext.setException(e);
- throw new BatchContainerRuntimeException(e);
- }
- }
-
- @Override
- public void onProcessError(Object item, Exception ex) {
- try {
- this.delegate.onProcessError(item, ex);
- } catch (Exception e) {
- this.stepContext.setException(e);
- throw new BatchContainerRuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/cb289b9e/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemProcessorProxy.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemProcessorProxy.java b/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemProcessorProxy.java
deleted file mode 100755
index 0e4e965..0000000
--- a/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemProcessorProxy.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.batchee.container.proxy;
-
-import javax.batch.api.chunk.ItemProcessor;
-
-public class ItemProcessorProxy extends AbstractProxy<ItemProcessor> implements ItemProcessor {
- ItemProcessorProxy(final ItemProcessor delegate) {
- super(delegate);
- }
-
- /*
- * In order to provide skip/retry logic, these exceptions
- * are thrown as-is rather than beeing wrapped.
- * @see javax.batch.api.ItemReader#readItem()
- */
- @Override
- public Object processItem(final Object item) throws Exception {
- return this.delegate.processItem(item);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/cb289b9e/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemReadListenerProxy.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemReadListenerProxy.java b/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemReadListenerProxy.java
deleted file mode 100755
index 27fd947..0000000
--- a/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemReadListenerProxy.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.batchee.container.proxy;
-
-import org.apache.batchee.container.exception.BatchContainerRuntimeException;
-
-import javax.batch.api.chunk.listener.ItemReadListener;
-
-public class ItemReadListenerProxy extends AbstractProxy<ItemReadListener> implements ItemReadListener {
- ItemReadListenerProxy(final ItemReadListener delegate) {
- super(delegate);
- }
-
- @Override
- public void afterRead(final Object item) {
- try {
- this.delegate.afterRead(item);
- } catch (final Exception e) {
- this.stepContext.setException(e);
- throw new BatchContainerRuntimeException(e);
- }
- }
-
- @Override
- public void beforeRead() {
- try {
- this.delegate.beforeRead();
- } catch (final Exception e) {
- this.stepContext.setException(e);
- throw new BatchContainerRuntimeException(e);
- }
- }
-
- @Override
- public void onReadError(final Exception ex) {
- try {
- this.delegate.onReadError(ex);
- } catch (final Exception e) {
- this.stepContext.setException(e);
- throw new BatchContainerRuntimeException(e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/cb289b9e/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemWriteListenerProxy.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemWriteListenerProxy.java b/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemWriteListenerProxy.java
deleted file mode 100755
index 9691281..0000000
--- a/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemWriteListenerProxy.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.batchee.container.proxy;
-
-import org.apache.batchee.container.exception.BatchContainerRuntimeException;
-
-import javax.batch.api.chunk.listener.ItemWriteListener;
-import java.util.List;
-
-public class ItemWriteListenerProxy extends AbstractProxy<ItemWriteListener> implements ItemWriteListener {
- ItemWriteListenerProxy(final ItemWriteListener delegate) {
- super(delegate);
- }
-
- @Override
- public void afterWrite(final List<Object> items) {
- try {
- this.delegate.afterWrite(items);
- } catch (final Exception e) {
- this.stepContext.setException(e);
- throw new BatchContainerRuntimeException(e);
- }
- }
-
- @Override
- public void beforeWrite(final List<Object> items) {
-
- try {
- this.delegate.beforeWrite(items);
- } catch (final Exception e) {
- this.stepContext.setException(e);
- throw new BatchContainerRuntimeException(e);
- }
- }
-
- @Override
- public void onWriteError(final List<Object> items, final Exception ex) {
- try {
- this.delegate.onWriteError(items, ex);
- } catch (Exception e) {
- this.stepContext.setException(e);
- throw new BatchContainerRuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/cb289b9e/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemWriterProxy.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemWriterProxy.java b/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemWriterProxy.java
deleted file mode 100755
index 2dc6e1a..0000000
--- a/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemWriterProxy.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.batchee.container.proxy;
-
-import org.apache.batchee.container.exception.BatchContainerRuntimeException;
-
-import javax.batch.api.chunk.ItemWriter;
-import java.io.Serializable;
-import java.util.List;
-
-public class ItemWriterProxy extends AbstractProxy<ItemWriter> implements ItemWriter {
- ItemWriterProxy(final ItemWriter delegate) {
- super(delegate);
- }
-
- @Override
- public Serializable checkpointInfo() {
- try {
- return this.delegate.checkpointInfo();
- } catch (final Exception e) {
- this.stepContext.setException(e);
- throw new BatchContainerRuntimeException(e);
- }
- }
-
- @Override
- public void close() {
- try {
- this.delegate.close();
- } catch (final Exception e) {
- this.stepContext.setException(e);
- throw new BatchContainerRuntimeException(e);
- }
- }
-
- @Override
- public void open(final Serializable checkpoint) {
- try {
- this.delegate.open(checkpoint);
- } catch (final Exception e) {
- this.stepContext.setException(e);
- throw new BatchContainerRuntimeException(e);
- }
- }
-
-
- /*
- * In order to provide skip/retry logic, these exceptions
- * are thrown as-is rather than beeing wrapped.
- * @see javax.batch.api.ItemReader#readItem()
- */
- @Override
- public void writeItems(final List<Object> items) throws Exception {
- this.delegate.writeItems(items);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/cb289b9e/jbatch/src/main/java/org/apache/batchee/container/proxy/ListenerFactory.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/proxy/ListenerFactory.java b/jbatch/src/main/java/org/apache/batchee/container/proxy/ListenerFactory.java
index eb4e60a..eb84b2f 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/proxy/ListenerFactory.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/proxy/ListenerFactory.java
@@ -159,14 +159,13 @@ public class ListenerFactory {
return retVal;
}
- public List<ItemProcessListenerProxy> getItemProcessListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext,
+ public List<ItemProcessListener> getItemProcessListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext,
final RuntimeJobExecution execution) {
final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(factory, step, injectionRefs, execution);
- final List<ItemProcessListenerProxy> retVal = new ArrayList<ItemProcessListenerProxy>();
+ final List<ItemProcessListener> retVal = new ArrayList<ItemProcessListener>();
for (final ListenerInfo li : stepListenerInfo) {
if (li.isItemProcessListener()) {
- ItemProcessListenerProxy proxy = new ItemProcessListenerProxy((ItemProcessListener) li.getArtifact());
- proxy.setStepContext(stepContext);
+ ItemProcessListener proxy = ProxyFactory.createProxy((ItemProcessListener) li.getArtifact(), stepContext);
retVal.add(proxy);
}
}
@@ -174,14 +173,13 @@ public class ListenerFactory {
return retVal;
}
- public List<ItemReadListenerProxy> getItemReadListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext,
+ public List<ItemReadListener> getItemReadListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext,
final RuntimeJobExecution execution) {
final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(factory, step, injectionRefs, execution);
- final List<ItemReadListenerProxy> retVal = new ArrayList<ItemReadListenerProxy>();
+ final List<ItemReadListener> retVal = new ArrayList<ItemReadListener>();
for (final ListenerInfo li : stepListenerInfo) {
if (li.isItemReadListener()) {
- final ItemReadListenerProxy proxy = new ItemReadListenerProxy((ItemReadListener) li.getArtifact());
- proxy.setStepContext(stepContext);
+ final ItemReadListener proxy = ProxyFactory.createProxy((ItemReadListener) li.getArtifact(), stepContext);
retVal.add(proxy);
}
}
@@ -189,14 +187,13 @@ public class ListenerFactory {
return retVal;
}
- public List<ItemWriteListenerProxy> getItemWriteListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext,
+ public List<ItemWriteListener> getItemWriteListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext,
final RuntimeJobExecution execution) {
final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(factory, step, injectionRefs, execution);
- final List<ItemWriteListenerProxy> retVal = new ArrayList<ItemWriteListenerProxy>();
+ final List<ItemWriteListener> retVal = new ArrayList<ItemWriteListener>();
for (final ListenerInfo li : stepListenerInfo) {
if (li.isItemWriteListener()) {
- final ItemWriteListenerProxy proxy = new ItemWriteListenerProxy((ItemWriteListener) li.getArtifact());
- proxy.setStepContext(stepContext);
+ final ItemWriteListener proxy = ProxyFactory.createProxy((ItemWriteListener) li.getArtifact(), stepContext);
retVal.add(proxy);
}
}