You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by st...@apache.org on 2014/04/24 14:58:27 UTC

[4/4] git commit: BATCHEE-29 delete more 'old' proxies

BATCHEE-29 delete more 'old' proxies

and replace them with 'real' dynamic proxies


Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/cb289b9e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/cb289b9e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/cb289b9e

Branch: refs/heads/master
Commit: cb289b9e2fd3d368814469a761e74a64960dd527
Parents: 914cd64
Author: Mark Struberg <st...@apache.org>
Authored: Thu Apr 24 14:56:53 2014 +0200
Committer: Mark Struberg <st...@apache.org>
Committed: Thu Apr 24 14:56:53 2014 +0200

----------------------------------------------------------------------
 .../controller/chunk/CheckpointManager.java     |   6 +-
 .../controller/chunk/ChunkStepController.java   | 138 ++++++++++++-------
 .../proxy/ItemProcessListenerProxy.java         |  58 --------
 .../container/proxy/ItemProcessorProxy.java     |  35 -----
 .../container/proxy/ItemReadListenerProxy.java  |  58 --------
 .../container/proxy/ItemWriteListenerProxy.java |  59 --------
 .../container/proxy/ItemWriterProxy.java        |  70 ----------
 .../container/proxy/ListenerFactory.java        |  21 ++-
 8 files changed, 100 insertions(+), 345 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/cb289b9e/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
index 48390e1..b5b6a3c 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/CheckpointManager.java
@@ -18,11 +18,11 @@ package org.apache.batchee.container.impl.controller.chunk;
 
 import org.apache.batchee.container.exception.BatchContainerRuntimeException;
 import org.apache.batchee.container.exception.BatchContainerServiceException;
-import org.apache.batchee.container.proxy.ItemWriterProxy;
 import org.apache.batchee.spi.PersistenceManagerService;
 
 import javax.batch.api.chunk.CheckpointAlgorithm;
 import javax.batch.api.chunk.ItemReader;
+import javax.batch.api.chunk.ItemWriter;
 
 import java.io.ByteArrayOutputStream;
 import java.io.ObjectOutputStream;
@@ -30,13 +30,13 @@ import java.io.ObjectOutputStream;
 public class CheckpointManager {
     private final PersistenceManagerService persistenceManagerService;
     private final ItemReader readerProxy;
-    private final ItemWriterProxy writerProxy;
+    private final ItemWriter writerProxy;
     private final CheckpointAlgorithm checkpointAlgorithm;
     private final String stepId;
     private final long jobInstanceID;
 
 
-    public CheckpointManager(final ItemReader reader, final ItemWriterProxy writer,
+    public CheckpointManager(final ItemReader reader, final ItemWriter writer,
                              final CheckpointAlgorithm chkptAlg,
                              final long jobInstanceID, final String stepId,
                              final PersistenceManagerService persistenceManagerService) {

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/cb289b9e/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
index ff91ebd..2a551da 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/impl/controller/chunk/ChunkStepController.java
@@ -25,11 +25,6 @@ import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
 import org.apache.batchee.container.proxy.CheckpointAlgorithmProxy;
 import org.apache.batchee.container.proxy.ChunkListenerProxy;
 import org.apache.batchee.container.proxy.InjectionReferences;
-import org.apache.batchee.container.proxy.ItemProcessListenerProxy;
-import org.apache.batchee.container.proxy.ItemProcessorProxy;
-import org.apache.batchee.container.proxy.ItemReadListenerProxy;
-import org.apache.batchee.container.proxy.ItemWriteListenerProxy;
-import org.apache.batchee.container.proxy.ItemWriterProxy;
 import org.apache.batchee.container.proxy.ProxyFactory;
 import org.apache.batchee.container.proxy.RetryProcessListenerProxy;
 import org.apache.batchee.container.proxy.RetryReadListenerProxy;
@@ -41,15 +36,18 @@ import org.apache.batchee.container.services.ServicesManager;
 import org.apache.batchee.container.util.PartitionDataWrapper;
 import org.apache.batchee.container.util.TCCLObjectInputStream;
 import org.apache.batchee.jaxb.Chunk;
-import org.apache.batchee.jaxb.ItemProcessor;
-import org.apache.batchee.jaxb.ItemWriter;
 import org.apache.batchee.jaxb.Property;
 import org.apache.batchee.jaxb.Step;
 import org.apache.batchee.spi.BatchArtifactFactory;
 import org.apache.batchee.spi.PersistenceManagerService;
 
 import javax.batch.api.chunk.CheckpointAlgorithm;
+import javax.batch.api.chunk.ItemProcessor;
 import javax.batch.api.chunk.ItemReader;
+import javax.batch.api.chunk.ItemWriter;
+import javax.batch.api.chunk.listener.ItemProcessListener;
+import javax.batch.api.chunk.listener.ItemReadListener;
+import javax.batch.api.chunk.listener.ItemWriteListener;
 import javax.batch.runtime.BatchStatus;
 import java.io.ByteArrayInputStream;
 import java.io.Serializable;
@@ -68,8 +66,8 @@ public class ChunkStepController extends SingleThreadedStepController {
 
     private Chunk chunk = null;
     private ItemReader readerProxy = null;
-    private ItemProcessorProxy processorProxy = null;
-    private ItemWriterProxy writerProxy = null;
+    private ItemProcessor processorProxy = null;
+    private ItemWriter writerProxy = null;
     private CheckpointAlgorithmProxy checkpointProxy = null;
     private CheckpointAlgorithm chkptAlg = null;
     private CheckpointManager checkpointManager;
@@ -77,9 +75,9 @@ public class ChunkStepController extends SingleThreadedStepController {
     private CheckpointDataKey readerChkptDK = null;
     private CheckpointDataKey writerChkptDK = null;
     private List<ChunkListenerProxy> chunkListeners = null;
-    private List<ItemReadListenerProxy> itemReadListeners = null;
-    private List<ItemProcessListenerProxy> itemProcessListeners = null;
-    private List<ItemWriteListenerProxy> itemWriteListeners = null;
+    private List<ItemReadListener> itemReadListeners = null;
+    private List<ItemProcessListener> itemProcessListeners = null;
+    private List<ItemWriteListener> itemWriteListeners = null;
     private RetryHandler retryHandler;
 
     private boolean rollbackRetry = false;
@@ -232,13 +230,13 @@ public class ChunkStepController extends SingleThreadedStepController {
 
         try {
             // call read listeners before and after the actual read
-            for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
+            for (ItemReadListener readListenerProxy : itemReadListeners) {
                 readListenerProxy.beforeRead();
             }
 
             itemRead = readerProxy.readItem();
 
-            for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
+            for (ItemReadListener readListenerProxy : itemReadListeners) {
                 readListenerProxy.afterRead(itemRead);
             }
 
@@ -250,13 +248,21 @@ public class ChunkStepController extends SingleThreadedStepController {
             }
         } catch (Exception e) {
             stepContext.setException(e);
-            for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
-                readListenerProxy.onReadError(e);
+            for (ItemReadListener readListenerProxy : itemReadListeners) {
+                try {
+                    readListenerProxy.onReadError(e);
+                } catch (Exception e1) {
+                    handleBatchException(e1);
+                }
             }
             if (!rollbackRetry) {
                 if (retryReadException(e)) {
-                    for (ItemReadListenerProxy readListenerProxy : itemReadListeners) {
-                        readListenerProxy.onReadError(e);
+                    for (ItemReadListener readListenerProxy : itemReadListeners) {
+                        try {
+                            readListenerProxy.onReadError(e);
+                        } catch (Exception e1) {
+                            handleBatchException(e1);
+                        }
                     }
                     // if not a rollback exception, just retry the current item
                     if (!retryHandler.isRollbackException(e)) {
@@ -300,6 +306,14 @@ public class ChunkStepController extends SingleThreadedStepController {
         return itemRead;
     }
 
+    private void handleBatchException(Exception e) {
+        if (e instanceof RuntimeException) {
+            throw (RuntimeException) e;
+        } else {
+            throw new BatchContainerRuntimeException(e);
+        }
+    }
+
     /**
      * Process an item previously read by the reader
      *
@@ -318,7 +332,7 @@ public class ChunkStepController extends SingleThreadedStepController {
         try {
 
             // call process listeners before and after the actual process call
-            for (final ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+            for (final ItemProcessListener processListenerProxy : itemProcessListeners) {
                 processListenerProxy.beforeProcess(itemRead);
             }
 
@@ -330,20 +344,28 @@ public class ChunkStepController extends SingleThreadedStepController {
                 status.setFiltered(true);
             }
 
-            for (final ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
+            for (final ItemProcessListener processListenerProxy : itemProcessListeners) {
                 processListenerProxy.afterProcess(itemRead, processedItem);
             }
         } catch (final Exception e) {
-            for (final ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
-                processListenerProxy.onProcessError(processedItem, e);
+            for (final ItemProcessListener processListenerProxy : itemProcessListeners) {
+                try {
+                    processListenerProxy.onProcessError(processedItem, e);
+                } catch (Exception e1) {
+                    handleBatchException(e1);
+                }
             }
             if (!rollbackRetry) {
                 if (retryProcessException(e, itemRead)) {
                     if (!retryHandler.isRollbackException(e)) {
                         // call process listeners before and after the actual
                         // process call
-                        for (ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
-                            processListenerProxy.beforeProcess(itemRead);
+                        for (ItemProcessListener processListenerProxy : itemProcessListeners) {
+                            try {
+                                processListenerProxy.beforeProcess(itemRead);
+                            } catch (Exception e1) {
+                                handleBatchException(e1);
+                            }
                         }
                         processedItem = processItem(itemRead, status);
                         if (processedItem == null) {
@@ -352,8 +374,12 @@ public class ChunkStepController extends SingleThreadedStepController {
                             status.setFiltered(true);
                         }
 
-                        for (final ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
-                            processListenerProxy.afterProcess(itemRead, processedItem);
+                        for (final ItemProcessListener processListenerProxy : itemProcessListeners) {
+                            try {
+                                processListenerProxy.afterProcess(itemRead, processedItem);
+                            } catch (Exception e1) {
+                                handleBatchException(e1);
+                            }
                         }
                     } else {
                         status.setRollback(true);
@@ -375,8 +401,12 @@ public class ChunkStepController extends SingleThreadedStepController {
                     if (!retryHandler.isRollbackException(e)) {
                         // call process listeners before and after the actual
                         // process call
-                        for (final ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
-                            processListenerProxy.beforeProcess(itemRead);
+                        for (final ItemProcessListener processListenerProxy : itemProcessListeners) {
+                            try {
+                                processListenerProxy.beforeProcess(itemRead);
+                            } catch (Exception e1) {
+                                handleBatchException(e1);
+                            }
                         }
                         processedItem = processItem(itemRead, status);
                         if (processedItem == null) {
@@ -385,8 +415,12 @@ public class ChunkStepController extends SingleThreadedStepController {
                             status.setFiltered(true);
                         }
 
-                        for (final ItemProcessListenerProxy processListenerProxy : itemProcessListeners) {
-                            processListenerProxy.afterProcess(itemRead, processedItem);
+                        for (final ItemProcessListener processListenerProxy : itemProcessListeners) {
+                            try {
+                                processListenerProxy.afterProcess(itemRead, processedItem);
+                            } catch (Exception e1) {
+                                handleBatchException(e1);
+                            }
                         }
                     } else {
                         status.setRollback(true);
@@ -416,20 +450,24 @@ public class ChunkStepController extends SingleThreadedStepController {
             try {
 
                 // call read listeners before and after the actual read
-                for (ItemWriteListenerProxy writeListenerProxy : itemWriteListeners) {
+                for (ItemWriteListener writeListenerProxy : itemWriteListeners) {
                     writeListenerProxy.beforeWrite(theChunk);
                 }
 
                 writerProxy.writeItems(theChunk);
 
-                for (ItemWriteListenerProxy writeListenerProxy : itemWriteListeners) {
+                for (ItemWriteListener writeListenerProxy : itemWriteListeners) {
                     writeListenerProxy.afterWrite(theChunk);
                 }
                 stepContext.getMetric(MetricImpl.MetricType.WRITE_COUNT).incValueBy(theChunk.size());
             } catch (Exception e) {
                 this.stepContext.setException(e);
-                for (ItemWriteListenerProxy writeListenerProxy : itemWriteListeners) {
-                    writeListenerProxy.onWriteError(theChunk, e);
+                for (ItemWriteListener writeListenerProxy : itemWriteListeners) {
+                    try {
+                        writeListenerProxy.onWriteError(theChunk, e);
+                    } catch (Exception e1) {
+                        handleBatchException(e1);
+                    }
                 }
                 if (!rollbackRetry) {
                     if (retryWriteException(e, theChunk)) {
@@ -603,10 +641,10 @@ public class ChunkStepController extends SingleThreadedStepController {
         transactionManager.setRollbackOnly();
         try {
             readerProxy.close();
+            writerProxy.close();
         } catch (Exception e) {
             // ignore, we blow up anyway
         }
-        writerProxy.close();
         transactionManager.rollback();
         throw new BatchContainerRuntimeException("Failure in Read-Process-Write Loop", t);
     }
@@ -647,7 +685,7 @@ public class ChunkStepController extends SingleThreadedStepController {
         }
 
         {
-            final ItemProcessor itemProcessor = chunk.getProcessor();
+            final org.apache.batchee.jaxb.ItemProcessor itemProcessor = chunk.getProcessor();
             if (itemProcessor != null) {
                 final List<Property> itemProcessorProps = itemProcessor.getProperties() == null ? null : itemProcessor.getProperties().getPropertyList();
                 final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, itemProcessorProps);
@@ -656,7 +694,7 @@ public class ChunkStepController extends SingleThreadedStepController {
         }
 
         {
-            final ItemWriter itemWriter = chunk.getWriter();
+            final org.apache.batchee.jaxb.ItemWriter itemWriter = chunk.getWriter();
             final List<Property> itemWriterProps = itemWriter.getProperties() == null ? null : itemWriter.getProperties().getPropertyList();
             final InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext, itemWriterProps);
             writerProxy = ProxyFactory.createItemWriterProxy(artifactFactory, itemWriter.getRef(), injectionRef, stepContext, jobExecutionImpl);
@@ -733,7 +771,7 @@ public class ChunkStepController extends SingleThreadedStepController {
                     readerOIS.close();
                 } catch (final Exception ex) {
                     // is this what I should be throwing here?
-                    throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
+                    throw new BatchContainerServiceException("Cannot read the checkpoint data for [" + step.getId() + "]", ex);
                 }
             } else {
                 // no chkpt data exists in the backing store
@@ -742,7 +780,7 @@ public class ChunkStepController extends SingleThreadedStepController {
                     readerProxy.open(null);
                 } catch (final Exception ex) {
                     // is this what I should be throwing here?
-                    throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
+                    throw new BatchContainerServiceException("Exception while opening step [" + step.getId() + "]", ex);
                 }
             }
         } catch (final ClassCastException e) {
@@ -762,24 +800,20 @@ public class ChunkStepController extends SingleThreadedStepController {
                     writerProxy.open((Serializable) writerOIS.readObject());
                     writerOIS.close();
                 } catch (final Exception ex) {
-                    // is this what I should be throwing here?
                     throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
                 }
             } else {
                 // no chkpt data exists in the backing store
                 writerChkptData = null;
-                writerProxy.open(null);
+                try {
+                    writerProxy.open(null);
+                } catch (Exception e) {
+                    handleBatchException(e);
+                }
             }
         } catch (final ClassCastException e) {
             throw new IllegalStateException("Expected Checkpoint but found" + writerChkptData);
         }
-
-        // set up metrics
-        // stepContext.addMetric(MetricImpl.Counter.valueOf("READ_COUNT"), 0);
-        // stepContext.addMetric(MetricImpl.Counter.valueOf("WRITE_COUNT"), 0);
-        // stepContext.addMetric(MetricImpl.Counter.valueOf("READ_SKIP_COUNT"), 0);
-        // stepContext.addMetric(MetricImpl.Counter.valueOf("PROCESS_SKIP_COUNT"), 0);
-        // stepContext.addMetric(MetricImpl.Counter.valueOf("WRITE_SKIP_COUNT"), 0);
     }
 
     @Override
@@ -897,12 +931,16 @@ public class ChunkStepController extends SingleThreadedStepController {
                     writerOIS.close();
                 } catch (Exception ex) {
                     // is this what I should be throwing here?
-                    throw new BatchContainerServiceException("Cannot persist the checkpoint data for [" + step.getId() + "]", ex);
+                    throw new BatchContainerServiceException("Cannot read the checkpoint data for [" + step.getId() + "]", ex);
                 }
             } else {
                 // no chkpt data exists in the backing store
                 writerData = null;
-                writerProxy.open(null);
+                try {
+                    writerProxy.open(null);
+                } catch (Exception ex) {
+                    throw new BatchContainerServiceException("Cannot open the step [" + step.getId() + "]", ex);
+                }
             }
         } catch (ClassCastException e) {
             throw new IllegalStateException("Expected CheckpointData but found" + writerData);

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/cb289b9e/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemProcessListenerProxy.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemProcessListenerProxy.java b/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemProcessListenerProxy.java
deleted file mode 100755
index 0298e1b..0000000
--- a/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemProcessListenerProxy.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- * 
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License, 
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.batchee.container.proxy;
-
-import org.apache.batchee.container.exception.BatchContainerRuntimeException;
-
-import javax.batch.api.chunk.listener.ItemProcessListener;
-
-public class ItemProcessListenerProxy extends AbstractProxy<ItemProcessListener> implements ItemProcessListener {
-    ItemProcessListenerProxy(final ItemProcessListener delegate) {
-        super(delegate);
-
-    }
-
-    @Override
-    public void afterProcess(Object item, Object result) {
-        try {
-            this.delegate.afterProcess(item, result);
-        } catch (final Exception e) {
-            this.stepContext.setException(e);
-            throw new BatchContainerRuntimeException(e);
-        }
-    }
-
-    @Override
-    public void beforeProcess(final Object item) {
-        try {
-            this.delegate.beforeProcess(item);
-        } catch (final Exception e) {
-            this.stepContext.setException(e);
-            throw new BatchContainerRuntimeException(e);
-        }
-    }
-
-    @Override
-    public void onProcessError(Object item, Exception ex) {
-        try {
-            this.delegate.onProcessError(item, ex);
-        } catch (Exception e) {
-            this.stepContext.setException(e);
-            throw new BatchContainerRuntimeException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/cb289b9e/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemProcessorProxy.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemProcessorProxy.java b/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemProcessorProxy.java
deleted file mode 100755
index 0e4e965..0000000
--- a/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemProcessorProxy.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- * 
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License, 
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.batchee.container.proxy;
-
-import javax.batch.api.chunk.ItemProcessor;
-
-public class ItemProcessorProxy extends AbstractProxy<ItemProcessor> implements ItemProcessor {
-    ItemProcessorProxy(final ItemProcessor delegate) {
-        super(delegate);
-    }
-
-    /*
-     * In order to provide skip/retry logic, these exceptions
-     * are thrown as-is rather than beeing wrapped.
-     * @see javax.batch.api.ItemReader#readItem()
-     */
-    @Override
-    public Object processItem(final Object item) throws Exception {
-        return this.delegate.processItem(item);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/cb289b9e/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemReadListenerProxy.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemReadListenerProxy.java b/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemReadListenerProxy.java
deleted file mode 100755
index 27fd947..0000000
--- a/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemReadListenerProxy.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- * 
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License, 
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.batchee.container.proxy;
-
-import org.apache.batchee.container.exception.BatchContainerRuntimeException;
-
-import javax.batch.api.chunk.listener.ItemReadListener;
-
-public class ItemReadListenerProxy extends AbstractProxy<ItemReadListener> implements ItemReadListener {
-    ItemReadListenerProxy(final ItemReadListener delegate) {
-        super(delegate);
-    }
-
-    @Override
-    public void afterRead(final Object item) {
-        try {
-            this.delegate.afterRead(item);
-        } catch (final Exception e) {
-            this.stepContext.setException(e);
-            throw new BatchContainerRuntimeException(e);
-        }
-    }
-
-    @Override
-    public void beforeRead() {
-        try {
-            this.delegate.beforeRead();
-        } catch (final Exception e) {
-            this.stepContext.setException(e);
-            throw new BatchContainerRuntimeException(e);
-        }
-    }
-
-    @Override
-    public void onReadError(final Exception ex) {
-        try {
-            this.delegate.onReadError(ex);
-        } catch (final Exception e) {
-            this.stepContext.setException(e);
-            throw new BatchContainerRuntimeException(e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/cb289b9e/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemWriteListenerProxy.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemWriteListenerProxy.java b/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemWriteListenerProxy.java
deleted file mode 100755
index 9691281..0000000
--- a/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemWriteListenerProxy.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- * 
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License, 
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.batchee.container.proxy;
-
-import org.apache.batchee.container.exception.BatchContainerRuntimeException;
-
-import javax.batch.api.chunk.listener.ItemWriteListener;
-import java.util.List;
-
-public class ItemWriteListenerProxy extends AbstractProxy<ItemWriteListener> implements ItemWriteListener {
-    ItemWriteListenerProxy(final ItemWriteListener delegate) {
-        super(delegate);
-    }
-
-    @Override
-    public void afterWrite(final List<Object> items) {
-        try {
-            this.delegate.afterWrite(items);
-        } catch (final Exception e) {
-            this.stepContext.setException(e);
-            throw new BatchContainerRuntimeException(e);
-        }
-    }
-
-    @Override
-    public void beforeWrite(final List<Object> items) {
-
-        try {
-            this.delegate.beforeWrite(items);
-        } catch (final Exception e) {
-            this.stepContext.setException(e);
-            throw new BatchContainerRuntimeException(e);
-        }
-    }
-
-    @Override
-    public void onWriteError(final List<Object> items, final Exception ex) {
-        try {
-            this.delegate.onWriteError(items, ex);
-        } catch (Exception e) {
-            this.stepContext.setException(e);
-            throw new BatchContainerRuntimeException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/cb289b9e/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemWriterProxy.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemWriterProxy.java b/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemWriterProxy.java
deleted file mode 100755
index 2dc6e1a..0000000
--- a/jbatch/src/main/java/org/apache/batchee/container/proxy/ItemWriterProxy.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- * 
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License, 
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.batchee.container.proxy;
-
-import org.apache.batchee.container.exception.BatchContainerRuntimeException;
-
-import javax.batch.api.chunk.ItemWriter;
-import java.io.Serializable;
-import java.util.List;
-
-public class ItemWriterProxy extends AbstractProxy<ItemWriter> implements ItemWriter {
-    ItemWriterProxy(final ItemWriter delegate) {
-        super(delegate);
-    }
-
-    @Override
-    public Serializable checkpointInfo() {
-        try {
-            return this.delegate.checkpointInfo();
-        } catch (final Exception e) {
-            this.stepContext.setException(e);
-            throw new BatchContainerRuntimeException(e);
-        }
-    }
-
-    @Override
-    public void close() {
-        try {
-            this.delegate.close();
-        } catch (final Exception e) {
-            this.stepContext.setException(e);
-            throw new BatchContainerRuntimeException(e);
-        }
-    }
-
-    @Override
-    public void open(final Serializable checkpoint) {
-        try {
-            this.delegate.open(checkpoint);
-        } catch (final Exception e) {
-            this.stepContext.setException(e);
-            throw new BatchContainerRuntimeException(e);
-        }
-    }
-
-
-    /*
-     * In order to provide skip/retry logic, these exceptions
-     * are thrown as-is rather than beeing wrapped.
-     * @see javax.batch.api.ItemReader#readItem()
-     */
-    @Override
-    public void writeItems(final List<Object> items) throws Exception {
-        this.delegate.writeItems(items);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/cb289b9e/jbatch/src/main/java/org/apache/batchee/container/proxy/ListenerFactory.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/proxy/ListenerFactory.java b/jbatch/src/main/java/org/apache/batchee/container/proxy/ListenerFactory.java
index eb4e60a..eb84b2f 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/proxy/ListenerFactory.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/proxy/ListenerFactory.java
@@ -159,14 +159,13 @@ public class ListenerFactory {
         return retVal;
     }
 
-    public List<ItemProcessListenerProxy> getItemProcessListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext,
+    public List<ItemProcessListener> getItemProcessListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext,
                                                                   final RuntimeJobExecution execution) {
         final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(factory, step, injectionRefs, execution);
-        final List<ItemProcessListenerProxy> retVal = new ArrayList<ItemProcessListenerProxy>();
+        final List<ItemProcessListener> retVal = new ArrayList<ItemProcessListener>();
         for (final ListenerInfo li : stepListenerInfo) {
             if (li.isItemProcessListener()) {
-                ItemProcessListenerProxy proxy = new ItemProcessListenerProxy((ItemProcessListener) li.getArtifact());
-                proxy.setStepContext(stepContext);
+                ItemProcessListener proxy = ProxyFactory.createProxy((ItemProcessListener) li.getArtifact(), stepContext);
                 retVal.add(proxy);
             }
         }
@@ -174,14 +173,13 @@ public class ListenerFactory {
         return retVal;
     }
 
-    public List<ItemReadListenerProxy> getItemReadListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext,
+    public List<ItemReadListener> getItemReadListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext,
                                                             final RuntimeJobExecution execution) {
         final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(factory, step, injectionRefs, execution);
-        final List<ItemReadListenerProxy> retVal = new ArrayList<ItemReadListenerProxy>();
+        final List<ItemReadListener> retVal = new ArrayList<ItemReadListener>();
         for (final ListenerInfo li : stepListenerInfo) {
             if (li.isItemReadListener()) {
-                final ItemReadListenerProxy proxy = new ItemReadListenerProxy((ItemReadListener) li.getArtifact());
-                proxy.setStepContext(stepContext);
+                final ItemReadListener proxy = ProxyFactory.createProxy((ItemReadListener) li.getArtifact(), stepContext);
                 retVal.add(proxy);
             }
         }
@@ -189,14 +187,13 @@ public class ListenerFactory {
         return retVal;
     }
 
-    public List<ItemWriteListenerProxy> getItemWriteListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext,
+    public List<ItemWriteListener> getItemWriteListeners(final Step step, final InjectionReferences injectionRefs, final StepContextImpl stepContext,
                                                               final RuntimeJobExecution execution) {
         final List<ListenerInfo> stepListenerInfo = getStepListenerInfo(factory, step, injectionRefs, execution);
-        final List<ItemWriteListenerProxy> retVal = new ArrayList<ItemWriteListenerProxy>();
+        final List<ItemWriteListener> retVal = new ArrayList<ItemWriteListener>();
         for (final ListenerInfo li : stepListenerInfo) {
             if (li.isItemWriteListener()) {
-                final ItemWriteListenerProxy proxy = new ItemWriteListenerProxy((ItemWriteListener) li.getArtifact());
-                proxy.setStepContext(stepContext);
+                final ItemWriteListener proxy = ProxyFactory.createProxy((ItemWriteListener) li.getArtifact(), stepContext);
                 retVal.add(proxy);
             }
         }