You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rm...@apache.org on 2013/11/05 08:38:41 UTC

[04/62] Importing JBatch Reference Implementation from IBM. We'll fork it but this commit is to keep a track of what we forked.

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/services/impl/JDBCPersistenceManagerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/services/impl/JDBCPersistenceManagerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/services/impl/JDBCPersistenceManagerImpl.java
new file mode 100755
index 0000000..1f322a5
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/services/impl/JDBCPersistenceManagerImpl.java
@@ -0,0 +1,2208 @@
+/*
+ * Copyright 2012 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.jbatch.container.services.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.io.StringWriter;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.batch.operations.NoSuchJobExecutionException;
+import javax.batch.runtime.BatchStatus;
+import javax.batch.runtime.JobInstance;
+import javax.batch.runtime.Metric;
+import javax.batch.runtime.StepExecution;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import javax.sql.DataSource;
+
+import com.ibm.jbatch.container.context.impl.MetricImpl;
+import com.ibm.jbatch.container.context.impl.StepContextImpl;
+import com.ibm.jbatch.container.exception.BatchContainerServiceException;
+import com.ibm.jbatch.container.exception.PersistenceException;
+import com.ibm.jbatch.container.impl.PartitionedStepBuilder;
+import com.ibm.jbatch.container.jobinstance.JobInstanceImpl;
+import com.ibm.jbatch.container.jobinstance.JobOperatorJobExecution;
+import com.ibm.jbatch.container.jobinstance.RuntimeFlowInSplitExecution;
+import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
+import com.ibm.jbatch.container.jobinstance.StepExecutionImpl;
+import com.ibm.jbatch.container.persistence.CheckpointData;
+import com.ibm.jbatch.container.persistence.CheckpointDataKey;
+import com.ibm.jbatch.container.services.IJobExecution;
+import com.ibm.jbatch.container.services.IPersistenceManagerService;
+import com.ibm.jbatch.container.status.JobStatus;
+import com.ibm.jbatch.container.status.StepStatus;
+import com.ibm.jbatch.container.util.TCCLObjectInputStream;
+import com.ibm.jbatch.spi.services.IBatchConfig;
+
+public class JDBCPersistenceManagerImpl implements IPersistenceManagerService, JDBCPersistenceManagerSQLConstants {
+
+	private static final String CLASSNAME = JDBCPersistenceManagerImpl.class.getName();
+
+	private final static Logger logger = Logger.getLogger(CLASSNAME);
+
+	private IBatchConfig batchConfig = null;
+
+	protected DataSource dataSource = null;
+	protected String jndiName = null;
+
+	protected String driver = ""; 
+	protected String schema = "";
+	protected String url = ""; 
+	protected String userId = "";
+	protected String pwd = "";
+
+	/* (non-Javadoc)
+	 * @see com.ibm.jbatch.container.services.impl.AbstractPersistenceManagerImpl#init(com.ibm.jbatch.container.IBatchConfig)
+	 */
+	@Override
+	public void init(IBatchConfig batchConfig) throws BatchContainerServiceException {
+		logger.config("Entering CLASSNAME.init(), batchConfig =" + batchConfig);
+
+		this.batchConfig = batchConfig;
+
+		schema = batchConfig.getDatabaseConfigurationBean().getSchema();
+
+		if (!batchConfig.isJ2seMode()) {
+			jndiName = batchConfig.getDatabaseConfigurationBean().getJndiName();
+
+			logger.config("JNDI name = " + jndiName);
+
+			if (jndiName == null || jndiName.equals("")) {
+				throw new BatchContainerServiceException("JNDI name is not defined.");
+			}
+
+			try {
+				Context ctx = new InitialContext();
+				dataSource = (DataSource) ctx.lookup(jndiName);
+
+			} catch (NamingException e) {
+				logger.severe("Lookup failed for JNDI name: " + jndiName + 
+						".  One cause of this could be that the batch runtime is incorrectly configured to EE mode when it should be in SE mode.");
+				throw new BatchContainerServiceException(e);
+			}
+
+		} else {
+			driver = batchConfig.getDatabaseConfigurationBean().getJdbcDriver();
+			url = batchConfig.getDatabaseConfigurationBean().getJdbcUrl();
+			userId = batchConfig.getDatabaseConfigurationBean().getDbUser();
+			pwd = batchConfig.getDatabaseConfigurationBean().getDbPassword();
+
+			logger.config("driver: " + driver + ", url: " + url);
+		}
+
+		try {
+			// only auto-create on Derby
+			if(isDerby()) {	
+				if(!isSchemaValid()) {
+					createSchema();
+				}
+				checkAllTables();
+			}
+		} catch (SQLException e) {
+			logger.severe(e.getLocalizedMessage());
+			throw new BatchContainerServiceException(e);
+		}
+
+		logger.config("Exiting CLASSNAME.init()");
+	}
+
+	/**
+	 * Checks if the default schema JBATCH or the schema defined in batch-config exists.
+	 * 
+	 * @return true if the schema exists, false otherwise.
+	 * @throws SQLException
+	 */
+	private boolean isSchemaValid() throws SQLException {
+		logger.entering(CLASSNAME, "isSchemaValid");
+		Connection conn = getConnectionToDefaultSchema();
+		DatabaseMetaData dbmd = conn.getMetaData();
+		ResultSet rs = dbmd.getSchemas();
+		while(rs.next()) {
+			if (schema.equalsIgnoreCase(rs.getString("TABLE_SCHEM")) ) {
+				cleanupConnection(conn, rs, null);
+				logger.exiting(CLASSNAME, "isSchemaValid", true);
+				return true;
+			}
+		}
+		cleanupConnection(conn, rs, null);
+		logger.exiting(CLASSNAME, "isSchemaValid", false);
+		return false;
+	}
+
+	private boolean isDerby() throws SQLException {
+		logger.entering(CLASSNAME, "isDerby");
+		Connection conn = getConnectionToDefaultSchema();
+		DatabaseMetaData dbmd = conn.getMetaData();
+		boolean derby = dbmd.getDatabaseProductName().toLowerCase().indexOf("derby") > 0;
+		logger.exiting(CLASSNAME, "isDerby", derby);
+		return derby;
+	}
+
+	/**
+	 * Creates the default schema JBATCH or the schema defined in batch-config.
+	 * 
+	 * @throws SQLException
+	 */
+	private void createSchema() throws SQLException {
+		logger.entering(CLASSNAME, "createSchema");
+		Connection conn = getConnectionToDefaultSchema();
+
+		logger.log(Level.INFO, schema + " schema does not exists. Trying to create it.");
+		PreparedStatement ps = null;
+		ps = conn.prepareStatement("CREATE SCHEMA " + schema);
+		ps.execute();
+
+		cleanupConnection(conn, null, ps);
+		logger.exiting(CLASSNAME, "createSchema");
+	}
+
+	/**
+	 * Checks if all the runtime batch table exists. If not, it creates them.
+	 *  
+	 * @throws SQLException
+	 */
+	private void checkAllTables() throws SQLException {
+		logger.entering(CLASSNAME, "checkAllTables");
+
+		createIfNotExists(CHECKPOINTDATA_TABLE, CREATE_TAB_CHECKPOINTDATA);
+		executeStatement(CREATE_CHECKPOINTDATA_INDEX);
+		createIfNotExists(JOBINSTANCEDATA_TABLE, CREATE_TAB_JOBINSTANCEDATA);
+
+		createIfNotExists(EXECUTIONINSTANCEDATA_TABLE,
+				CREATE_TAB_EXECUTIONINSTANCEDATA);
+		createIfNotExists(STEPEXECUTIONINSTANCEDATA_TABLE,
+				CREATE_TAB_STEPEXECUTIONINSTANCEDATA);
+
+		createIfNotExists(JOBSTATUS_TABLE, CREATE_TAB_JOBSTATUS);
+		createIfNotExists(STEPSTATUS_TABLE, CREATE_TAB_STEPSTATUS);	
+
+		logger.exiting(CLASSNAME, "checkAllTables");
+	}
+
+	/**
+	 * Creates tableName using the createTableStatement DDL.
+	 * 
+	 * @param tableName
+	 * @param createTableStatement
+	 * @throws SQLException
+	 */
+	private void createIfNotExists(String tableName, String createTableStatement) throws SQLException {
+		logger.entering(CLASSNAME, "createIfNotExists", new Object[] {tableName, createTableStatement});
+
+		Connection conn = getConnection();
+		DatabaseMetaData dbmd = conn.getMetaData();
+		ResultSet rs = dbmd.getTables(null, schema, tableName, null);
+		PreparedStatement ps = null;
+		if(!rs.next()) {
+			logger.log(Level.INFO, tableName + " table does not exists. Trying to create it.");
+			ps = conn.prepareStatement(createTableStatement);
+			ps.executeUpdate();
+		}
+
+		cleanupConnection(conn, rs, ps);
+		logger.exiting(CLASSNAME, "createIfNotExists");
+	}
+
+	/**
+	 * Executes the provided SQL statement
+	 * 
+	 * @param statement
+	 * @throws SQLException
+	 */
+	private void executeStatement(String statement) throws SQLException {
+		logger.entering(CLASSNAME, "executeStatement", statement);
+
+		Connection conn = getConnection();
+		PreparedStatement ps = null;
+
+		ps = conn.prepareStatement(statement);
+		ps.executeUpdate();
+
+		cleanupConnection(conn, ps);
+		logger.exiting(CLASSNAME, "executeStatement");
+	}
+
+
+	/* (non-Javadoc)
+	 * @see com.ibm.jbatch.container.services.impl.AbstractPersistenceManagerImpl#createCheckpointData(com.ibm.ws.batch.container.checkpoint.CheckpointDataKey, com.ibm.ws.batch.container.checkpoint.CheckpointData)
+	 */
+	@Override
+	public void createCheckpointData(CheckpointDataKey key, CheckpointData value) {
+		logger.entering(CLASSNAME, "createCheckpointData", new Object[] {key, value});
+		insertCheckpointData(key.getCommaSeparatedKey(), value);
+		logger.exiting(CLASSNAME, "createCheckpointData");
+	}
+
+	/* (non-Javadoc)
+	 * @see com.ibm.jbatch.container.services.impl.AbstractPersistenceManagerImpl#getCheckpointData(com.ibm.ws.batch.container.checkpoint.CheckpointDataKey)
+	 */
+	@Override
+	public CheckpointData getCheckpointData(CheckpointDataKey key) {
+		logger.entering(CLASSNAME, "getCheckpointData", key==null ? "<null>" : key);
+		CheckpointData checkpointData = queryCheckpointData(key.getCommaSeparatedKey());
+		logger.exiting(CLASSNAME, "getCheckpointData", checkpointData==null ? "<null>" : checkpointData);
+		return checkpointData;
+	}
+
+	/* (non-Javadoc)
+	 * @see com.ibm.jbatch.container.services.impl.AbstractPersistenceManagerImpl#updateCheckpointData(com.ibm.ws.batch.container.checkpoint.CheckpointDataKey, com.ibm.ws.batch.container.checkpoint.CheckpointData)
+	 */
+	@Override
+	public void updateCheckpointData(CheckpointDataKey key, CheckpointData value) {
+		logger.entering(CLASSNAME, "updateCheckpointData", new Object[] {key, value});
+		CheckpointData data = queryCheckpointData(key.getCommaSeparatedKey());
+		if(data != null) {
+			updateCheckpointData(key.getCommaSeparatedKey(), value);
+		} else {
+			createCheckpointData(key, value);
+		}
+		logger.exiting(CLASSNAME, "updateCheckpointData");
+	}
+
+
+	/**
+	 * @return the database connection and sets it to the default schema JBATCH or the schema defined in batch-config.
+	 * 
+	 * @throws SQLException
+	 */
+	protected Connection getConnection() throws SQLException {
+		logger.finest("Entering: " + CLASSNAME + ".getConnection");
+		Connection connection = null;
+
+		if(!batchConfig.isJ2seMode()) {
+			logger.finest("J2EE mode, getting connection from data source");
+			connection = dataSource.getConnection();
+			logger.finest("autocommit="+connection.getAutoCommit());
+		} else {
+			try {
+				Class.forName(driver);
+			} catch (ClassNotFoundException e) {
+				throw new PersistenceException(e);
+			}
+			logger.finest("JSE mode, getting connection from " + url);
+			connection = DriverManager.getConnection(url, userId, pwd);
+			logger.finest("autocommit="+connection.getAutoCommit());
+		}
+		setSchemaOnConnection(connection);
+
+		logger.finest("Exiting: " + CLASSNAME + ".getConnection() with conn =" + connection);
+		return connection;
+	}
+
+	/**
+	 * @return the database connection. The schema is set to whatever default its used by the underlying database.
+	 * @throws SQLException
+	 */
+	protected Connection getConnectionToDefaultSchema() throws SQLException {
+		logger.finest("Entering getConnectionToDefaultSchema");
+		Connection connection = null;
+
+		if(!batchConfig.isJ2seMode()) {
+			logger.finest("J2EE mode, getting connection from data source");
+			try {
+				connection = dataSource.getConnection();
+			} catch(SQLException e) {
+				logException("FAILED GETTING DATABASE CONNECTION", e);
+				throw new PersistenceException(e);
+			}
+			logger.finest("autocommit="+connection.getAutoCommit());
+		} else {
+			try {
+				Class.forName(driver);
+			} catch (ClassNotFoundException e) {
+				logException("ClassNotFoundException: Cannot load driver class: " + driver, e);
+				throw new PersistenceException(e);
+			}
+			logger.finest("JSE mode, getting connection from " + url);
+			try {
+				connection = DriverManager.getConnection(url, userId, pwd);
+			} catch (SQLException e) {
+				logException("FAILED GETTING DATABASE CONNECTION.  FOR EMBEDDED DERBY CHECK FOR OTHER USER LOCKING THE CURRENT DATABASE (Try using a different database instance).", e); 
+				throw new PersistenceException(e);
+			}
+			logger.finest("autocommit="+connection.getAutoCommit());
+		}
+		logger.finest("Exiting from getConnectionToDefaultSchema, conn= " +connection);
+		return connection;
+	}
+
+	private void logException(String msg, Exception e) {
+		StringWriter sw = new StringWriter();
+		PrintWriter pw = new PrintWriter(sw);
+		e.printStackTrace(pw);
+
+		logger.log(Level.SEVERE, msg +  "; Exception stack trace: " + sw);
+	}
+
+	/**
+	 * Set the default schema JBATCH or the schema defined in batch-config on the connection object.
+	 * 
+	 * @param connection
+	 * @throws SQLException
+	 */
+	private void setSchemaOnConnection(Connection connection) throws SQLException {
+		logger.finest("Entering " + CLASSNAME +".setSchemaOnConnection()");
+
+		PreparedStatement ps = null;
+		ps = connection.prepareStatement("SET SCHEMA ?");
+		ps.setString(1, schema);
+		ps.executeUpdate(); 
+
+		ps.close();
+
+		logger.finest("Exiting " + CLASSNAME +".setSchemaOnConnection()");
+	}
+
+	/**
+	 * select data from DB table
+	 * 
+	 * @param key - the IPersistenceDataKey object
+	 * @return List of serializable objects store in the DB table
+	 * 
+	 * Ex. select id, obj from tablename where id = ?
+	 */
+	private CheckpointData queryCheckpointData(Object key) {
+		logger.entering(CLASSNAME, "queryCheckpointData", new Object[] {key, SELECT_CHECKPOINTDATA});
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ResultSet rs = null;
+		ObjectInputStream objectIn = null;
+		CheckpointData data = null;
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement(SELECT_CHECKPOINTDATA);
+			statement.setObject(1, key);
+			rs = statement.executeQuery();
+			if (rs.next()) {
+				byte[] buf = rs.getBytes("obj");
+				data = (CheckpointData)deserializeObject(buf);
+			}
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		} catch (IOException e) {
+			throw new PersistenceException(e);
+		} catch (ClassNotFoundException e) {
+			throw new PersistenceException(e);
+		} finally {
+			if (objectIn != null) {
+				try {
+					objectIn.close();
+				} catch (IOException e) {
+					throw new PersistenceException(e);
+				}
+			}
+			cleanupConnection(conn, rs, statement);
+		}
+		logger.exiting(CLASSNAME, "queryCheckpointData");
+		return data;	
+	}
+
+
+	/**
+	 * insert data to DB table
+	 * 
+	 * @param key - the IPersistenceDataKey object
+	 * @param value - serializable object to store  
+	 * 
+	 * Ex. insert into tablename values(?, ?)
+	 */
+	private <T> void insertCheckpointData(Object key, T value) {
+		logger.entering(CLASSNAME, "insertCheckpointData", new Object[] {key, value});
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ByteArrayOutputStream baos = null;
+		ObjectOutputStream oout = null;
+		byte[] b;
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement(INSERT_CHECKPOINTDATA);
+			baos = new ByteArrayOutputStream();
+			oout = new ObjectOutputStream(baos);
+			oout.writeObject(value);
+
+			b = baos.toByteArray();
+
+			statement.setObject(1, key);
+			statement.setBytes(2, b);
+			statement.executeUpdate();
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		} catch (IOException e) {
+			throw new PersistenceException(e);
+		} finally {
+			if (baos != null) {
+				try {
+					baos.close();
+				} catch (IOException e) {
+					throw new PersistenceException(e);
+				}
+			}
+			if (oout != null) {
+				try {
+					oout.close();
+				} catch (IOException e) {
+					throw new PersistenceException(e);
+				}
+			}
+			cleanupConnection(conn, null, statement);
+		}
+		logger.exiting(CLASSNAME, "insertCheckpointData");
+	}
+
+	/**
+	 * update data in DB table
+	 * 
+	 * @param value - serializable object to store 
+	 * @param key - the IPersistenceDataKey object
+	 * @param query - SQL statement to execute. 
+	 * 
+	 * Ex. update tablename set obj = ? where id = ?
+	 */
+	private void updateCheckpointData(Object key, CheckpointData value) {
+		logger.entering(CLASSNAME, "updateCheckpointData", new Object[] {key, value});
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ByteArrayOutputStream baos = null;
+		ObjectOutputStream oout = null;
+		byte[] b;
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement(UPDATE_CHECKPOINTDATA);
+			baos = new ByteArrayOutputStream();
+			oout = new ObjectOutputStream(baos);
+			oout.writeObject(value);
+
+			b = baos.toByteArray();
+
+			statement.setBytes(1, b);
+			statement.setObject(2, key);
+			statement.executeUpdate();
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		} catch (IOException e) {
+			throw new PersistenceException(e);
+		} finally {
+			if (baos != null) {
+				try {
+					baos.close();
+				} catch (IOException e) {
+					throw new PersistenceException(e);
+				}
+			}
+			if (oout != null) {
+				try {
+					oout.close();
+				} catch (IOException e) {
+					throw new PersistenceException(e);
+				}
+			}
+			cleanupConnection(conn, null, statement);
+		}
+		logger.exiting(CLASSNAME, "updateCheckpointData");
+	}
+
+
+
+	/**
+	 * closes connection, result set and statement
+	 * 
+	 * @param conn - connection object to close
+	 * @param rs - result set object to close
+	 * @param statement - statement object to close
+	 */
+	private void cleanupConnection(Connection conn, ResultSet rs, PreparedStatement statement) {
+
+		logger.logp(Level.FINEST, CLASSNAME, "cleanupConnection", "Entering",  new Object[] {conn, rs==null ? "<null>" : rs, statement==null ? "<null>" : statement});
+
+		if (statement != null) {
+			try {
+				statement.close();
+			} catch (SQLException e) {
+				throw new PersistenceException(e);
+			}
+		}
+
+		if (rs != null) {
+			try {
+				rs.close();
+			} catch (SQLException e) {
+				throw new PersistenceException(e);
+			}
+		}
+
+		if (conn != null) {
+			try {
+				conn.close();
+			} catch (SQLException e) {
+				throw new PersistenceException(e);
+			} finally {
+				try {
+					conn.close();
+				} catch (SQLException e) {
+					throw new PersistenceException(e);
+				}
+			}
+		}
+		logger.logp(Level.FINEST, CLASSNAME, "cleanupConnection", "Exiting");
+	}
+
+	/**
+	 * closes connection and statement
+	 * 
+	 * @param conn - connection object to close
+	 * @param statement - statement object to close
+	 */
+	private void cleanupConnection(Connection conn, PreparedStatement statement) {
+
+		logger.logp(Level.FINEST, CLASSNAME, "cleanupConnection", "Entering",  new Object[] {conn, statement});
+
+		if (statement != null) {
+			try {
+				statement.close();
+			} catch (SQLException e) {
+				throw new PersistenceException(e);
+			}
+		}
+
+		if (conn != null) {
+			try {
+				conn.close();
+			} catch (SQLException e) {
+				throw new PersistenceException(e);
+			} finally {
+				try {
+					conn.close();
+				} catch (SQLException e) {
+					throw new PersistenceException(e);
+				}
+			}
+		}
+		logger.logp(Level.FINEST, CLASSNAME, "cleanupConnection", "Exiting");
+	}
+
+
+	@Override
+	public int jobOperatorGetJobInstanceCount(String jobName, String appTag) {
+		
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ResultSet rs = null;
+		int count;
+
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement("select count(jobinstanceid) as jobinstancecount from jobinstancedata where name = ? and apptag = ?");
+			statement.setString(1, jobName);
+			statement.setString(2, appTag);
+			rs = statement.executeQuery();
+			rs.next();
+			count = rs.getInt("jobinstancecount");
+
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		}
+		finally {
+			cleanupConnection(conn, rs, statement);
+		}
+		return count;
+	}
+	
+	@Override
+	public int jobOperatorGetJobInstanceCount(String jobName) {
+
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ResultSet rs = null;
+		int count;
+
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement(SELECT_JOBINSTANCEDATA_COUNT);
+			statement.setString(1, jobName);
+			rs = statement.executeQuery();
+			rs.next();
+			count = rs.getInt("jobinstancecount");
+
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		}
+		finally {
+			cleanupConnection(conn, rs, statement);
+		}
+		return count;
+	}
+
+
+	@Override
+	public List<Long> jobOperatorGetJobInstanceIds(String jobName, String appTag, int start, int count) {
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ResultSet rs = null;
+		List<Long> data = new ArrayList<Long>();
+
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement("select jobinstanceid from jobinstancedata where name = ? and apptag = ? order by jobinstanceid desc");
+			statement.setObject(1, jobName);
+			statement.setObject(2, appTag);
+			rs = statement.executeQuery();
+			while (rs.next()) {
+				long id = rs.getLong("jobinstanceid");
+				data.add(id);
+			}
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		}
+		finally {
+			cleanupConnection(conn, rs, statement);
+		}
+
+		if (data.size() > 0){
+			try {
+				return data.subList(start, start+count);
+			}
+			catch (IndexOutOfBoundsException oobEx){
+				return data.subList(start, data.size());
+			}
+		}
+		else return data;
+	}
+	
+	@Override
+	public List<Long> jobOperatorGetJobInstanceIds(String jobName, int start, int count) {
+
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ResultSet rs = null;
+		List<Long> data = new ArrayList<Long>();
+
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement(SELECT_JOBINSTANCEDATA_IDS);
+			statement.setObject(1, jobName);
+			rs = statement.executeQuery();
+			while (rs.next()) {
+				long id = rs.getLong("jobinstanceid");
+				data.add(id);
+			}
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		}
+		finally {
+			cleanupConnection(conn, rs, statement);
+		}
+
+		if (data.size() > 0){
+			try {
+				return data.subList(start, start+count);
+			}
+			catch (IndexOutOfBoundsException oobEx){
+				return data.subList(start, data.size());
+			}
+		}
+		else return data;
+	}
+
+	@Override
+	public Map<Long, String> jobOperatorGetExternalJobInstanceData() {
+
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ResultSet rs = null;
+		HashMap<Long, String> data = new HashMap<Long,String>();
+
+		try {
+			conn = getConnection();
+
+			// Filter out 'subjob' parallel execution entries which start with the special character
+			final String filter = "not like '" + PartitionedStepBuilder.JOB_ID_SEPARATOR + "%'";
+
+			statement = conn.prepareStatement("select distinct jobinstanceid, name from jobinstancedata where name " + filter );
+			rs = statement.executeQuery();
+			while (rs.next()) {
+				long id = rs.getLong("jobinstanceid");
+				String name = rs.getString("name");
+				data.put(id, name);
+			}
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		}
+		finally {
+			cleanupConnection(conn, rs, statement);
+		}
+
+		return data;
+	}
+
+	@Override
+	public Timestamp jobOperatorQueryJobExecutionTimestamp(long key, TimestampType timestampType) {
+
+
+
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ResultSet rs = null;
+		Timestamp createTimestamp = null;
+		Timestamp endTimestamp = null;
+		Timestamp updateTimestamp = null;
+		Timestamp startTimestamp = null;
+
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement("select createtime, endtime, updatetime, starttime from executioninstancedata where jobexecid = ?");
+			statement.setObject(1, key);
+			rs = statement.executeQuery();
+			while (rs.next()) {
+				createTimestamp = rs.getTimestamp(1);
+				endTimestamp = rs.getTimestamp(2);
+				updateTimestamp = rs.getTimestamp(3);
+				startTimestamp = rs.getTimestamp(4);
+			}
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		}
+		finally {
+			cleanupConnection(conn, rs, statement);
+		}
+
+		if (timestampType.equals(TimestampType.CREATE)) {
+			return createTimestamp;
+		} else if (timestampType.equals(TimestampType.END)) {
+			return endTimestamp;
+		} else if (timestampType.equals(TimestampType.LAST_UPDATED)) {
+			return updateTimestamp;
+		} else if (timestampType.equals(TimestampType.STARTED)) {
+			return startTimestamp;
+		} else {
+			throw new IllegalArgumentException("Unexpected enum value.");
+		}
+	}
+
+	@Override
+	public String jobOperatorQueryJobExecutionBatchStatus(long key) {
+
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ResultSet rs = null;
+		String status = null;
+
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement("select batchstatus from executioninstancedata where jobexecid = ?");
+			statement.setLong(1, key);
+			rs = statement.executeQuery();
+			while (rs.next()) {
+				status = rs.getString(1);
+			}
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		} finally {
+			cleanupConnection(conn, rs, statement);
+		}
+
+		return status;
+	}
+
+
+	@Override
+	public String jobOperatorQueryJobExecutionExitStatus(long key) {
+
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ResultSet rs = null;
+		String status = null;
+
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement("select exitstatus from executioninstancedata where jobexecid = ?");
+			statement.setLong(1, key);
+			rs = statement.executeQuery();
+			while (rs.next()) {
+				status = rs.getString(1);
+			}
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		} finally {
+			cleanupConnection(conn, rs, statement);
+		}
+
+		return status;
+	}
+
+	@Override
+	public long jobOperatorQueryJobExecutionJobInstanceId(long executionID) throws NoSuchJobExecutionException {
+
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ResultSet rs = null;
+		long jobinstanceID = 0;
+
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement("select jobinstanceid from executioninstancedata where jobexecid = ?");
+			statement.setLong(1, executionID);
+			rs = statement.executeQuery();
+			if (rs.next()) {
+				jobinstanceID = rs.getLong("jobinstanceid");
+			} else {
+				String msg = "Did not find job instance associated with executionID =" + executionID;
+				logger.fine(msg);
+				throw new NoSuchJobExecutionException(msg);
+			}
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		}
+		finally {
+			cleanupConnection(conn, rs, statement);
+		}
+
+		return jobinstanceID;
+	}
+
+	@Override
+	public Properties getParameters(long executionId) throws NoSuchJobExecutionException {
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ResultSet rs = null;
+		Properties props = null;
+		ObjectInputStream objectIn = null;
+
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement("select parameters from executioninstancedata where jobexecid = ?"); 
+			statement.setLong(1, executionId);
+			rs = statement.executeQuery();
+			
+			if (rs.next()) {
+				// get the object based data
+				byte[] buf = rs.getBytes("parameters");
+				props = (Properties)deserializeObject(buf);
+			} else {
+				String msg = "Did not find table entry for executionID =" + executionId;
+				logger.fine(msg);
+				throw new NoSuchJobExecutionException(msg);
+			}
+
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		} catch (IOException e) {
+			throw new PersistenceException(e);
+		} catch (ClassNotFoundException e) {
+			throw new PersistenceException(e);
+		} finally {
+			if (objectIn != null) {
+				try {
+					objectIn.close();
+				} catch (IOException e) {
+					throw new PersistenceException(e);
+				}
+			}
+			cleanupConnection(conn, rs, statement);
+		}
+
+		return props;
+
+	}
+
+
+	public Map<String, StepExecution> getMostRecentStepExecutionsForJobInstance(long instanceId) {
+
+		Map<String, StepExecution> data = new HashMap<String, StepExecution>();
+
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ResultSet rs = null;
+
+		long jobexecid = 0;
+		long stepexecid = 0;
+		String stepname = null;
+		String batchstatus = null;
+		String exitstatus = null;
+		Exception ex = null;
+		long readCount =0;
+		long writeCount = 0;
+		long commitCount = 0;
+		long rollbackCount = 0;
+		long readSkipCount = 0;
+		long processSkipCount = 0;
+		long filterCount = 0;
+		long writeSkipCount = 0;
+		Timestamp startTS = null;
+		Timestamp endTS = null;
+		StepExecutionImpl stepEx = null;
+		ObjectInputStream objectIn = null;
+
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement("select A.* from stepexecutioninstancedata as A inner join executioninstancedata as B on A.jobexecid = B.jobexecid where B.jobinstanceid = ? order by A.stepexecid desc"); 
+			statement.setLong(1, instanceId);
+			rs = statement.executeQuery();
+			while (rs.next()) {
+				stepname = rs.getString("stepname");
+				if (data.containsKey(stepname)) {
+					continue;
+				} else {
+
+					jobexecid = rs.getLong("jobexecid");
+					batchstatus = rs.getString("batchstatus");
+					exitstatus = rs.getString("exitstatus");
+					readCount = rs.getLong("readcount");
+					writeCount = rs.getLong("writecount");
+					commitCount = rs.getLong("commitcount");
+					rollbackCount = rs.getLong("rollbackcount");
+					readSkipCount = rs.getLong("readskipcount");
+					processSkipCount = rs.getLong("processskipcount");
+					filterCount = rs.getLong("filtercount");
+					writeSkipCount = rs.getLong("writeSkipCount");
+					startTS = rs.getTimestamp("startTime");
+					endTS = rs.getTimestamp("endTime");
+					// get the object based data
+					Serializable persistentData = null;
+					byte[] pDataBytes = rs.getBytes("persistentData");
+					if (pDataBytes != null) {
+						objectIn = new TCCLObjectInputStream(new ByteArrayInputStream(pDataBytes));
+						persistentData = (Serializable)objectIn.readObject();
+					}
+
+					stepEx = new StepExecutionImpl(jobexecid, stepexecid);
+
+					stepEx.setBatchStatus(BatchStatus.valueOf(batchstatus));
+					stepEx.setExitStatus(exitstatus);
+					stepEx.setStepName(stepname);
+					stepEx.setReadCount(readCount);
+					stepEx.setWriteCount(writeCount);
+					stepEx.setCommitCount(commitCount);
+					stepEx.setRollbackCount(rollbackCount);
+					stepEx.setReadSkipCount(readSkipCount);
+					stepEx.setProcessSkipCount(processSkipCount);
+					stepEx.setFilterCount(filterCount);
+					stepEx.setWriteSkipCount(writeSkipCount);
+					stepEx.setStartTime(startTS);
+					stepEx.setEndTime(endTS);	
+					stepEx.setPersistentUserData(persistentData);
+
+					data.put(stepname, stepEx);
+				}
+			}
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		} catch (IOException e) {
+			throw new PersistenceException(e);
+		} catch (ClassNotFoundException e) {
+			throw new PersistenceException(e);
+		} finally {
+			cleanupConnection(conn, rs, statement);
+		}
+
+		return data;
+	}
+
+
+	@Override
+	public List<StepExecution> getStepExecutionsForJobExecution(long execid) {
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ResultSet rs = null;
+
+		long jobexecid = 0;
+		long stepexecid = 0;
+		String stepname = null;
+		String batchstatus = null;
+		String exitstatus = null;
+		Exception ex = null;
+		long readCount =0;
+		long writeCount = 0;
+		long commitCount = 0;
+		long rollbackCount = 0;
+		long readSkipCount = 0;
+		long processSkipCount = 0;
+		long filterCount = 0;
+		long writeSkipCount = 0;
+		Timestamp startTS = null;
+		Timestamp endTS = null;
+		StepExecutionImpl stepEx = null;
+		ObjectInputStream objectIn = null;
+
+		List<StepExecution> data = new ArrayList<StepExecution>();
+
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement("select * from stepexecutioninstancedata where jobexecid = ?"); 
+			statement.setLong(1, execid);
+			rs = statement.executeQuery();
+			while (rs.next()) {
+				jobexecid = rs.getLong("jobexecid");
+				stepexecid = rs.getLong("stepexecid");
+				stepname = rs.getString("stepname");
+				batchstatus = rs.getString("batchstatus");
+				exitstatus = rs.getString("exitstatus");
+				readCount = rs.getLong("readcount");
+				writeCount = rs.getLong("writecount");
+				commitCount = rs.getLong("commitcount");
+				rollbackCount = rs.getLong("rollbackcount");
+				readSkipCount = rs.getLong("readskipcount");
+				processSkipCount = rs.getLong("processskipcount");
+				filterCount = rs.getLong("filtercount");
+				writeSkipCount = rs.getLong("writeSkipCount");
+				startTS = rs.getTimestamp("startTime");
+				endTS = rs.getTimestamp("endTime");
+				// get the object based data
+				Serializable persistentData = null;
+				byte[] pDataBytes = rs.getBytes("persistentData");
+				if (pDataBytes != null) {
+					objectIn = new TCCLObjectInputStream(new ByteArrayInputStream(pDataBytes));
+					persistentData = (Serializable)objectIn.readObject();
+				}
+
+				stepEx = new StepExecutionImpl(jobexecid, stepexecid);
+
+				stepEx.setBatchStatus(BatchStatus.valueOf(batchstatus));
+				stepEx.setExitStatus(exitstatus);
+				stepEx.setStepName(stepname);
+				stepEx.setReadCount(readCount);
+				stepEx.setWriteCount(writeCount);
+				stepEx.setCommitCount(commitCount);
+				stepEx.setRollbackCount(rollbackCount);
+				stepEx.setReadSkipCount(readSkipCount);
+				stepEx.setProcessSkipCount(processSkipCount);
+				stepEx.setFilterCount(filterCount);
+				stepEx.setWriteSkipCount(writeSkipCount);
+				stepEx.setStartTime(startTS);
+				stepEx.setEndTime(endTS);	
+				stepEx.setPersistentUserData(persistentData);
+
+				data.add(stepEx);
+			}
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		} catch (IOException e) {
+			throw new PersistenceException(e);
+		} catch (ClassNotFoundException e) {
+			throw new PersistenceException(e);
+		} finally {
+			cleanupConnection(conn, rs, statement);
+		}
+
+		return data;
+	}
+
+	
+    @Override
+    public StepExecution getStepExecutionByStepExecutionId(long stepExecId) {
+        Connection conn = null;
+        PreparedStatement statement = null;
+        ResultSet rs = null;
+
+        long jobexecid = 0;
+        long stepexecid = 0;
+        String stepname = null;
+        String batchstatus = null;
+        String exitstatus = null;
+        Exception ex = null;
+        long readCount = 0;
+        long writeCount = 0;
+        long commitCount = 0;
+        long rollbackCount = 0;
+        long readSkipCount = 0;
+        long processSkipCount = 0;
+        long filterCount = 0;
+        long writeSkipCount = 0;
+        Timestamp startTS = null;
+        Timestamp endTS = null;
+        StepExecutionImpl stepEx = null;
+        ObjectInputStream objectIn = null;
+
+        try {
+            conn = getConnection();
+            statement = conn.prepareStatement("select * from stepexecutioninstancedata where stepexecid = ?");
+            statement.setLong(1, stepExecId);
+            rs = statement.executeQuery();
+            while (rs.next()) {
+                jobexecid = rs.getLong("jobexecid");
+                stepexecid = rs.getLong("stepexecid");
+                stepname = rs.getString("stepname");
+                batchstatus = rs.getString("batchstatus");
+                exitstatus = rs.getString("exitstatus");
+                readCount = rs.getLong("readcount");
+                writeCount = rs.getLong("writecount");
+                commitCount = rs.getLong("commitcount");
+                rollbackCount = rs.getLong("rollbackcount");
+                readSkipCount = rs.getLong("readskipcount");
+                processSkipCount = rs.getLong("processskipcount");
+                filterCount = rs.getLong("filtercount");
+                writeSkipCount = rs.getLong("writeSkipCount");
+                startTS = rs.getTimestamp("startTime");
+                endTS = rs.getTimestamp("endTime");
+                // get the object based data
+                Serializable persistentData = null;
+                byte[] pDataBytes = rs.getBytes("persistentData");
+                if (pDataBytes != null) {
+                    objectIn = new TCCLObjectInputStream(new ByteArrayInputStream(pDataBytes));
+                    persistentData = (Serializable) objectIn.readObject();
+                }
+
+                stepEx = new StepExecutionImpl(jobexecid, stepexecid);
+
+                stepEx.setBatchStatus(BatchStatus.valueOf(batchstatus));
+                stepEx.setExitStatus(exitstatus);
+                stepEx.setStepName(stepname);
+                stepEx.setReadCount(readCount);
+                stepEx.setWriteCount(writeCount);
+                stepEx.setCommitCount(commitCount);
+                stepEx.setRollbackCount(rollbackCount);
+                stepEx.setReadSkipCount(readSkipCount);
+                stepEx.setProcessSkipCount(processSkipCount);
+                stepEx.setFilterCount(filterCount);
+                stepEx.setWriteSkipCount(writeSkipCount);
+                stepEx.setStartTime(startTS);
+                stepEx.setEndTime(endTS);
+                stepEx.setPersistentUserData(persistentData);
+
+
+            }
+        } catch (SQLException e) {
+            throw new PersistenceException(e);
+        } catch (IOException e) {
+            throw new PersistenceException(e);
+        } catch (ClassNotFoundException e) {
+            throw new PersistenceException(e);
+        } finally {
+            cleanupConnection(conn, rs, statement);
+        }
+
+        return stepEx;
+    }
+
+	@Override
+	public void updateBatchStatusOnly(long key, BatchStatus batchStatus, Timestamp updatets) {
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ByteArrayOutputStream baos = null;
+		ObjectOutputStream oout = null;
+		byte[] b;
+
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement("update executioninstancedata set batchstatus = ?, updatetime = ? where jobexecid = ?");
+			statement.setString(1, batchStatus.name());
+			statement.setTimestamp(2, updatets);
+			statement.setLong(3, key);
+			statement.executeUpdate();
+		} catch (SQLException e) {
+			e.printStackTrace();
+			throw new PersistenceException(e);
+		} finally {
+			if (baos != null) {
+				try {
+					baos.close();
+				} catch (IOException e) {
+					throw new PersistenceException(e);
+				}
+			}
+			if (oout != null) {
+				try {
+					oout.close();
+				} catch (IOException e) {
+					throw new PersistenceException(e);
+				}
+			}
+			cleanupConnection(conn, null, statement);
+		}
+	}
+
+	@Override
+	public void updateWithFinalExecutionStatusesAndTimestamps(long key,
+			BatchStatus batchStatus, String exitStatus, Timestamp updatets) {
+		// TODO Auto-generated methddod stub
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ByteArrayOutputStream baos = null;
+		ObjectOutputStream oout = null;
+		byte[] b;
+
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement("update executioninstancedata set batchstatus = ?, exitstatus = ?, endtime = ?, updatetime = ? where jobexecid = ?");
+
+			statement.setString(1, batchStatus.name());
+			statement.setString(2, exitStatus);
+			statement.setTimestamp(3, updatets);
+			statement.setTimestamp(4, updatets);
+			statement.setLong(5, key);
+
+			statement.executeUpdate();
+		} catch (SQLException e) {
+			e.printStackTrace();
+			throw new PersistenceException(e);
+		} finally {
+			if (baos != null) {
+				try {
+					baos.close();
+				} catch (IOException e) {
+					throw new PersistenceException(e);
+				}
+			}
+			if (oout != null) {
+				try {
+					oout.close();
+				} catch (IOException e) {
+					throw new PersistenceException(e);
+				}
+			}
+			cleanupConnection(conn, null, statement);
+		}
+
+	}
+
+	public void markJobStarted(long key, Timestamp startTS) {
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ByteArrayOutputStream baos = null;
+		ObjectOutputStream oout = null;
+
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement("update executioninstancedata set batchstatus = ?, starttime = ?, updatetime = ? where jobexecid = ?");
+
+			statement.setString(1, BatchStatus.STARTED.name());
+			statement.setTimestamp(2, startTS);
+			statement.setTimestamp(3, startTS);
+			statement.setLong(4, key);
+
+			statement.executeUpdate();
+		} catch (SQLException e) {
+			e.printStackTrace();
+			throw new PersistenceException(e);
+		} finally {
+			if (baos != null) {
+				try {
+					baos.close();
+				} catch (IOException e) {
+					throw new PersistenceException(e);
+				}
+			}
+			if (oout != null) {
+				try {
+					oout.close();
+				} catch (IOException e) {
+					throw new PersistenceException(e);
+				}
+			}
+			cleanupConnection(conn, null, statement);
+		}
+	}
+
+
+	@Override
+	public IJobExecution jobOperatorGetJobExecution(long jobExecutionId) {
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ResultSet rs = null;
+		Timestamp createtime = null;
+		Timestamp starttime = null;
+		Timestamp endtime = null;
+		Timestamp updatetime = null;
+		long instanceId = 0;
+		String batchStatus = null;
+		String exitStatus = null;
+		JobOperatorJobExecution jobEx = null;
+		ObjectInputStream objectIn = null;
+		String jobName = null;
+
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement("select A.createtime, A.starttime, A.endtime, A.updatetime, A.parameters, A.jobinstanceid, A.batchstatus, A.exitstatus, B.name from executioninstancedata as A inner join jobinstancedata as B on A.jobinstanceid = B.jobinstanceid where jobexecid = ?"); 
+			statement.setLong(1, jobExecutionId);
+			rs = statement.executeQuery();
+			while (rs.next()) {
+				createtime = rs.getTimestamp("createtime");
+				starttime = rs.getTimestamp("starttime");
+				endtime = rs.getTimestamp("endtime");
+				updatetime = rs.getTimestamp("updatetime");
+				instanceId = rs.getLong("jobinstanceid");
+
+				// get the object based data
+				batchStatus = rs.getString("batchstatus");
+				exitStatus = rs.getString("exitstatus");
+
+				// get the object based data
+				Properties params = null;
+				byte[] buf = rs.getBytes("parameters");
+				params = (Properties)deserializeObject(buf);
+
+				jobName = rs.getString("name");
+
+				jobEx = new JobOperatorJobExecution(jobExecutionId, instanceId);
+				jobEx.setCreateTime(createtime);
+				jobEx.setStartTime(starttime);
+				jobEx.setEndTime(endtime);
+				jobEx.setJobParameters(params);
+				jobEx.setLastUpdateTime(updatetime);
+				jobEx.setBatchStatus(batchStatus);
+				jobEx.setExitStatus(exitStatus);
+				jobEx.setJobName(jobName);
+			}
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		} catch (IOException e) {
+			throw new PersistenceException(e);
+		} catch (ClassNotFoundException e) {
+			throw new PersistenceException(e);
+		} finally {
+			if (objectIn != null) {
+				try {
+					objectIn.close();
+				} catch (IOException e) {
+					throw new PersistenceException(e);
+				}
+			}
+			cleanupConnection(conn, rs, statement);
+		}
+		return jobEx;
+	}
+
+	@Override
+	public List<IJobExecution> jobOperatorGetJobExecutions(long jobInstanceId) {
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ResultSet rs = null;
+		Timestamp createtime = null;
+		Timestamp starttime = null;
+		Timestamp endtime = null;
+		Timestamp updatetime = null;
+		long jobExecutionId = 0;
+		long instanceId = 0;
+		String batchStatus = null;
+		String exitStatus = null;
+		String jobName = null;
+		List<IJobExecution> data = new ArrayList<IJobExecution>();
+		JobOperatorJobExecution jobEx = null;
+		ObjectInputStream objectIn = null;
+
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement("select A.jobexecid, A.createtime, A.starttime, A.endtime, A.updatetime, A.parameters, A.batchstatus, A.exitstatus, B.name from executioninstancedata as A inner join jobinstancedata as B ON A.jobinstanceid = B.jobinstanceid where A.jobinstanceid = ?"); 
+			statement.setLong(1, jobInstanceId);
+			rs = statement.executeQuery();
+			while (rs.next()) {
+				jobExecutionId = rs.getLong("jobexecid");
+				createtime = rs.getTimestamp("createtime");
+				starttime = rs.getTimestamp("starttime");
+				endtime = rs.getTimestamp("endtime");
+				updatetime = rs.getTimestamp("updatetime");
+				batchStatus = rs.getString("batchstatus");
+				exitStatus = rs.getString("exitstatus");
+				jobName = rs.getString("name");
+
+				// get the object based data
+				byte[] buf = rs.getBytes("parameters");
+				Properties params = (Properties)deserializeObject(buf);
+
+				jobEx = new JobOperatorJobExecution(jobExecutionId, instanceId);
+				jobEx.setCreateTime(createtime);
+				jobEx.setStartTime(starttime);
+				jobEx.setEndTime(endtime);
+				jobEx.setLastUpdateTime(updatetime);
+				jobEx.setBatchStatus(batchStatus);
+				jobEx.setExitStatus(exitStatus);
+				jobEx.setJobName(jobName);
+
+				data.add(jobEx);
+			}
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		} catch (IOException e) {
+			throw new PersistenceException(e);
+		} catch (ClassNotFoundException e) {
+			throw new PersistenceException(e);
+		} finally {
+			if (objectIn != null) {
+				try {
+					objectIn.close();
+				} catch (IOException e) {
+					throw new PersistenceException(e);
+				}
+			}
+			cleanupConnection(conn, rs, statement);
+		}
+		return data;
+	}
+
+	@Override
+	public Set<Long> jobOperatorGetRunningExecutions(String jobName){
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ResultSet rs = null;
+		Set<Long> executionIds = new HashSet<Long>();
+
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement("SELECT A.jobexecid FROM executioninstancedata AS A INNER JOIN jobinstancedata AS B ON A.jobinstanceid = B.jobinstanceid WHERE A.batchstatus IN (?,?,?) AND B.name = ?"); 
+			statement.setString(1, BatchStatus.STARTED.name());
+			statement.setString(2, BatchStatus.STARTING.name());
+			statement.setString(3, BatchStatus.STOPPING.name());
+			statement.setString(4, jobName);
+			rs = statement.executeQuery();
+			while (rs.next()) {
+				executionIds.add(rs.getLong("jobexecid"));
+			}
+
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		} finally {
+			cleanupConnection(conn, rs, statement);
+		}
+		return executionIds;		
+	}
+
+	@Override
+	public String getJobCurrentTag(long jobInstanceId) {
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ResultSet rs = null;
+		String apptag = null;
+
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement(SELECT_JOBINSTANCEDATA_APPTAG); 
+			statement.setLong(1, jobInstanceId);
+			rs = statement.executeQuery();
+			while (rs.next()) {
+				apptag = rs.getString(APPTAG);
+			}
+
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		} finally {
+			cleanupConnection(conn, rs, statement);
+		}
+
+		return apptag;
+	}
+
+	@Override
+	public void purge(String apptag) {
+
+		logger.entering(CLASSNAME, "purge", apptag);
+		String deleteJobs = "DELETE FROM jobinstancedata WHERE apptag = ?";
+		String deleteJobExecutions = "DELETE FROM executioninstancedata " 
+				+ "WHERE jobexecid IN (" 
+				+ "SELECT B.jobexecid FROM jobinstancedata AS A INNER JOIN executioninstancedata AS B " 
+				+ "ON A.jobinstanceid = B.jobinstanceid " 
+				+ "WHERE A.apptag = ?)";
+		String deleteStepExecutions = "DELETE FROM stepexecutioninstancedata " 
+				+ "WHERE stepexecid IN ("
+				+ "SELECT C.stepexecid FROM jobinstancedata AS A INNER JOIN executioninstancedata AS B "
+				+ "ON A.jobinstanceid = B.jobinstanceid INNER JOIN stepexecutioninstancedata AS C "
+				+ "ON B.jobexecid = C.jobexecid "
+				+ "WHERE A.apptag = ?)";
+
+		Connection conn = null;
+		PreparedStatement statement = null;
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement(deleteStepExecutions);
+			statement.setString(1, apptag);
+			statement.executeUpdate();
+
+			statement = conn.prepareStatement(deleteJobExecutions);
+			statement.setString(1, apptag);
+			statement.executeUpdate();
+
+			statement = conn.prepareStatement(deleteJobs);
+			statement.setString(1, apptag);
+			statement.executeUpdate();
+
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		} finally {
+
+			cleanupConnection(conn, null, statement);
+		}
+		logger.exiting(CLASSNAME, "purge");
+
+	}
+
+	@Override
+	public JobStatus getJobStatusFromExecution(long executionId) {
+
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ResultSet rs = null;
+		JobStatus retVal = null;
+
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement("select A.obj from jobstatus as A inner join " + 
+					"executioninstancedata as B on A.id = B.jobinstanceid where B.jobexecid = ?");
+			statement.setLong(1, executionId);
+			rs = statement.executeQuery();
+			byte[] buf = null;
+			if (rs.next()) {
+				buf = rs.getBytes("obj");
+			}
+			retVal = (JobStatus)deserializeObject(buf);
+		} catch (Exception e) {
+			throw new PersistenceException(e);
+		} finally {
+			cleanupConnection(conn, rs, statement);
+		}
+		logger.exiting(CLASSNAME, "executeQuery");
+		return retVal;	
+	}
+
+	public long getJobInstanceIdByExecutionId(long executionId) throws NoSuchJobExecutionException {
+		long instanceId= 0;
+
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ResultSet rs = null;
+
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement("select jobinstanceid from executioninstancedata where jobexecid = ?");
+			statement.setObject(1, executionId);
+			rs = statement.executeQuery();
+			if (rs.next()) {
+				instanceId = rs.getLong("jobinstanceid");
+			} else {
+				String msg = "Did not find job instance associated with executionID =" + executionId;
+				logger.fine(msg);
+				throw new NoSuchJobExecutionException(msg);
+			}
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		} finally {
+			cleanupConnection(conn, rs, statement);
+		}
+
+		return instanceId;
+	}
+
+	/**
+	 * This method is used to serialized an object saved into a table BLOB field.
+	 *  
+	 * @param theObject the object to be serialized
+	 * @return a object byte array
+	 * @throws IOException
+	 */
+	private byte[] serializeObject(Serializable theObject) throws IOException {
+
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		ObjectOutputStream oout = new ObjectOutputStream(baos);
+		oout.writeObject(theObject);
+		byte[] data = baos.toByteArray();
+		baos.close();
+		oout.close();
+
+		return data;
+	}
+
+	/**
+	 * This method is used to de-serialized a table BLOB field to its original object form.
+	 * 
+	 * @param buffer the byte array save a BLOB
+	 * @return the object saved as byte array
+	 * @throws IOException
+	 * @throws ClassNotFoundException
+	 */
+	private Serializable deserializeObject(byte[] buffer) throws IOException, ClassNotFoundException {
+
+		Serializable theObject = null;
+		ObjectInputStream objectIn = null;
+
+		if (buffer != null) {
+			objectIn = new ObjectInputStream(new ByteArrayInputStream(buffer));
+			theObject = (Serializable)objectIn.readObject();
+			objectIn.close();
+		}
+		return theObject;
+	}
+
+	/* (non-Javadoc)
+	 * @see com.ibm.jbatch.container.services.IPersistenceManagerService#createJobInstance(java.lang.String, java.lang.String, java.lang.String, java.util.Properties)
+	 */
+	@Override
+	public JobInstance createSubJobInstance(String name, String apptag) {
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ResultSet rs = null;
+		JobInstanceImpl jobInstance = null;
+
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement("INSERT INTO jobinstancedata (name, apptag) VALUES(?, ?)", Statement.RETURN_GENERATED_KEYS);
+			statement.setString(1, name);
+			statement.setString(2, apptag); 
+			statement.executeUpdate();
+			rs = statement.getGeneratedKeys();
+			if(rs.next()) {
+				long jobInstanceID = rs.getLong(1);
+				jobInstance = new JobInstanceImpl(jobInstanceID);
+				jobInstance.setJobName(name);
+			}
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		} finally {
+			cleanupConnection(conn, rs, statement);
+		}
+		return jobInstance;
+	}
+
+	/* (non-Javadoc)
+	 * @see com.ibm.jbatch.container.services.IPersistenceManagerService#createJobInstance(java.lang.String, java.lang.String, java.lang.String, java.util.Properties)
+	 */
+	@Override
+	public JobInstance createJobInstance(String name, String apptag, String jobXml) {
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ResultSet rs = null;
+		JobInstanceImpl jobInstance = null;
+
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement("INSERT INTO jobinstancedata (name, apptag) VALUES(?, ?)", Statement.RETURN_GENERATED_KEYS);
+			statement.setString(1, name);
+			statement.setString(2, apptag);
+			statement.executeUpdate();
+			rs = statement.getGeneratedKeys();
+			if(rs.next()) {
+				long jobInstanceID = rs.getLong(1);
+				jobInstance = new JobInstanceImpl(jobInstanceID, jobXml);
+				jobInstance.setJobName(name);
+			}
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		} finally {
+			cleanupConnection(conn, rs, statement);
+		}
+		return jobInstance;
+	}
+
+	/* (non-Javadoc)
+	 * @see com.ibm.jbatch.container.services.IPersistenceManagerService#createJobExecution(com.ibm.jbatch.container.jsl.JobNavigator, javax.batch.runtime.JobInstance, java.util.Properties, com.ibm.jbatch.container.context.impl.JobContextImpl)
+	 */
+	@Override
+	public RuntimeJobExecution createJobExecution(JobInstance jobInstance, Properties jobParameters, BatchStatus batchStatus) {
+		Timestamp now = new Timestamp(System.currentTimeMillis());
+		long newExecutionId = createRuntimeJobExecutionEntry(jobInstance, jobParameters, batchStatus, now);
+		RuntimeJobExecution jobExecution = new RuntimeJobExecution(jobInstance, newExecutionId);
+		jobExecution.setBatchStatus(batchStatus.name());
+		jobExecution.setCreateTime(now);
+		jobExecution.setLastUpdateTime(now);
+		return jobExecution;
+	}
+
+	private long createRuntimeJobExecutionEntry(JobInstance jobInstance, Properties jobParameters, BatchStatus batchStatus, Timestamp timestamp) {
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ResultSet rs = null;
+		long newJobExecutionId = 0L;
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement("INSERT INTO executioninstancedata (jobinstanceid, createtime, updatetime, batchstatus, parameters) VALUES(?, ?, ?, ?, ?)", Statement.RETURN_GENERATED_KEYS);
+			statement.setLong(1, jobInstance.getInstanceId());
+			statement.setTimestamp(2, timestamp);
+			statement.setTimestamp(3, timestamp);
+			statement.setString(4, batchStatus.name());
+			statement.setObject(5, serializeObject(jobParameters));
+			statement.executeUpdate();
+			rs = statement.getGeneratedKeys();
+			if(rs.next()) {
+				newJobExecutionId = rs.getLong(1);
+			}
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		} catch (IOException e) {
+			throw new PersistenceException(e);
+		} finally {
+			cleanupConnection(conn, rs, statement);
+		}
+		return newJobExecutionId;
+	}
+
+	@Override
+	public RuntimeFlowInSplitExecution createFlowInSplitExecution(JobInstance jobInstance, BatchStatus batchStatus) {
+		Timestamp now = new Timestamp(System.currentTimeMillis());
+		long newExecutionId = createRuntimeJobExecutionEntry(jobInstance, null, batchStatus, now);
+		RuntimeFlowInSplitExecution flowExecution = new RuntimeFlowInSplitExecution(jobInstance, newExecutionId);
+		flowExecution.setBatchStatus(batchStatus.name());
+		flowExecution.setCreateTime(now);
+		flowExecution.setLastUpdateTime(now);
+		return flowExecution;
+	}
+
+	/* (non-Javadoc)
+	 * @see com.ibm.jbatch.container.services.IPersistenceManagerService#createStepExecution(long, com.ibm.jbatch.container.context.impl.StepContextImpl)
+	 */
+	@Override
+	public StepExecutionImpl createStepExecution(long rootJobExecId, StepContextImpl stepContext) {
+		StepExecutionImpl stepExecution = null;
+		String batchStatus = stepContext.getBatchStatus() == null ? BatchStatus.STARTING.name() : stepContext.getBatchStatus().name();
+		String exitStatus = stepContext.getExitStatus();
+		String stepName = stepContext.getStepName();
+
+		long readCount = 0;
+		long writeCount = 0;
+		long commitCount = 0;
+		long rollbackCount = 0;
+		long readSkipCount = 0;
+		long processSkipCount = 0;
+		long filterCount = 0;
+		long writeSkipCount = 0;
+		Timestamp startTime = stepContext.getStartTimeTS();
+		Timestamp endTime = stepContext.getEndTimeTS();
+
+		Metric[] metrics = stepContext.getMetrics();
+		for (int i = 0; i < metrics.length; i++) {
+			if (metrics[i].getType().equals(MetricImpl.MetricType.READ_COUNT)) {
+				readCount = metrics[i].getValue();
+			} else if (metrics[i].getType().equals(MetricImpl.MetricType.WRITE_COUNT)) {
+				writeCount = metrics[i].getValue();	
+			} else if (metrics[i].getType().equals(MetricImpl.MetricType.PROCESS_SKIP_COUNT)) {
+				processSkipCount = metrics[i].getValue();	
+			} else if (metrics[i].getType().equals(MetricImpl.MetricType.COMMIT_COUNT)) {
+				commitCount = metrics[i].getValue();
+			} else if (metrics[i].getType().equals(MetricImpl.MetricType.ROLLBACK_COUNT)) {
+				rollbackCount = metrics[i].getValue();	
+			} else if (metrics[i].getType().equals(MetricImpl.MetricType.READ_SKIP_COUNT)) {
+				readSkipCount = metrics[i].getValue();
+			} else if (metrics[i].getType().equals(MetricImpl.MetricType.FILTER_COUNT)) {
+				filterCount = metrics[i].getValue();	
+			} else if (metrics[i].getType().equals(MetricImpl.MetricType.WRITE_SKIP_COUNT)) {
+				writeSkipCount = metrics[i].getValue();	
+			}		
+		}
+		Serializable persistentData = stepContext.getPersistentUserData();
+
+		stepExecution = createStepExecution(rootJobExecId, batchStatus, exitStatus, stepName, readCount, 
+				writeCount, commitCount, rollbackCount, readSkipCount, processSkipCount, filterCount, writeSkipCount, startTime,
+				endTime, persistentData);
+
+		return stepExecution;
+	}
+
+
+	private StepExecutionImpl createStepExecution(long rootJobExecId,  String batchStatus, String exitStatus, String stepName, long readCount, 
+			long writeCount, long commitCount, long rollbackCount, long readSkipCount, long processSkipCount, long filterCount,
+			long writeSkipCount, Timestamp startTime, Timestamp endTime, Serializable persistentData) {
+
+		logger.entering(CLASSNAME, "createStepExecution", new Object[] {rootJobExecId, batchStatus, exitStatus==null ? "<null>" : exitStatus, stepName, readCount, 
+				writeCount, commitCount, rollbackCount, readSkipCount, processSkipCount, filterCount, writeSkipCount, startTime == null ? "<null>" : startTime,
+						endTime==null ? "<null>" :endTime , persistentData==null ? "<null>" : persistentData});
+
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ResultSet rs = null;
+		StepExecutionImpl stepExecution = null;
+		String query = "INSERT INTO stepexecutioninstancedata (jobexecid, batchstatus, exitstatus, stepname, readcount," 
+				+ "writecount, commitcount, rollbackcount, readskipcount, processskipcount, filtercount, writeskipcount, starttime,"
+				+ "endtime, persistentdata) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement(query, Statement.RETURN_GENERATED_KEYS);
+			statement.setLong(1, rootJobExecId);
+			statement.setString(2, batchStatus);
+			statement.setString(3, exitStatus);
+			statement.setString(4, stepName);
+			statement.setLong(5, readCount);
+			statement.setLong(6, writeCount);
+			statement.setLong(7, commitCount);
+			statement.setLong(8, rollbackCount);
+			statement.setLong(9, readSkipCount);
+			statement.setLong(10, processSkipCount);
+			statement.setLong(11, filterCount);
+			statement.setLong(12, writeSkipCount);
+			statement.setTimestamp(13, startTime);
+			statement.setTimestamp(14, endTime);
+			statement.setObject(15, serializeObject(persistentData));
+
+			statement.executeUpdate();
+			rs = statement.getGeneratedKeys();
+			if(rs.next()) {
+				long stepExecutionId = rs.getLong(1);
+				stepExecution = new StepExecutionImpl(rootJobExecId, stepExecutionId);
+				stepExecution.setStepName(stepName);
+			}
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		} catch (IOException e) {
+			throw new PersistenceException(e);
+		} finally {
+			cleanupConnection(conn, null, statement);
+		}
+		logger.exiting(CLASSNAME, "createStepExecution");
+
+		return stepExecution;
+	}
+
+	/* (non-Javadoc)
+	 * @see com.ibm.jbatch.container.services.IPersistenceManagerService#updateStepExecution(long, com.ibm.jbatch.container.context.impl.StepContextImpl)
+	 */
+	@Override
+	public void updateStepExecution(long rootJobExecId, StepContextImpl stepContext) {
+		long stepExecutionId = stepContext.getStepExecutionId();
+		String batchStatus = stepContext.getBatchStatus() == null ? BatchStatus.STARTING.name() : stepContext.getBatchStatus().name();
+		String exitStatus = stepContext.getExitStatus();
+		String stepName = stepContext.getStepName();
+
+		long readCount = 0;
+		long writeCount = 0;
+		long commitCount = 0;
+		long rollbackCount = 0;
+		long readSkipCount = 0;
+		long processSkipCount = 0;
+		long filterCount = 0;
+		long writeSkipCount = 0;
+		Timestamp startTime = stepContext.getStartTimeTS();
+		Timestamp endTime = stepContext.getEndTimeTS();
+
+		Metric[] metrics = stepContext.getMetrics();
+		for (int i = 0; i < metrics.length; i++) {
+			if (metrics[i].getType().equals(MetricImpl.MetricType.READ_COUNT)) {
+				readCount = metrics[i].getValue();
+			} else if (metrics[i].getType().equals(MetricImpl.MetricType.WRITE_COUNT)) {
+				writeCount = metrics[i].getValue();	
+			} else if (metrics[i].getType().equals(MetricImpl.MetricType.PROCESS_SKIP_COUNT)) {
+				processSkipCount = metrics[i].getValue();	
+			} else if (metrics[i].getType().equals(MetricImpl.MetricType.COMMIT_COUNT)) {
+				commitCount = metrics[i].getValue();
+			} else if (metrics[i].getType().equals(MetricImpl.MetricType.ROLLBACK_COUNT)) {
+				rollbackCount = metrics[i].getValue();	
+			} else if (metrics[i].getType().equals(MetricImpl.MetricType.READ_SKIP_COUNT)) {
+				readSkipCount = metrics[i].getValue();
+			} else if (metrics[i].getType().equals(MetricImpl.MetricType.FILTER_COUNT)) {
+				filterCount = metrics[i].getValue();	
+			} else if (metrics[i].getType().equals(MetricImpl.MetricType.WRITE_SKIP_COUNT)) {
+				writeSkipCount = metrics[i].getValue();	
+			}
+		}
+		Serializable persistentData = stepContext.getPersistentUserData();
+
+		updateStepExecution(stepExecutionId, rootJobExecId, batchStatus, exitStatus, stepName, readCount, 
+				writeCount, commitCount, rollbackCount, readSkipCount, processSkipCount, filterCount, 
+				writeSkipCount, startTime, endTime, persistentData);
+
+	}
+
+
+	private void updateStepExecution(long stepExecutionId, long jobExecId,  String batchStatus, String exitStatus, String stepName, long readCount, 
+			long writeCount, long commitCount, long rollbackCount, long readSkipCount, long processSkipCount, long filterCount,
+			long writeSkipCount, Timestamp startTime, Timestamp endTime, Serializable persistentData) {
+
+		logger.entering(CLASSNAME, "updateStepExecution", new Object[] {stepExecutionId, jobExecId, batchStatus, exitStatus==null ? "<null>" : exitStatus, stepName, readCount, 
+				writeCount, commitCount, rollbackCount, readSkipCount, processSkipCount, filterCount, writeSkipCount, startTime==null ? "<null>" : startTime,
+						endTime==null ? "<null>" : endTime, persistentData==null ? "<null>" : persistentData});
+
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ResultSet rs = null;
+		StepExecutionImpl stepExecution = null;
+		String query = "UPDATE stepexecutioninstancedata SET jobexecid = ?, batchstatus = ?, exitstatus = ?, stepname = ?,  readcount = ?," 
+				+ "writecount = ?, commitcount = ?, rollbackcount = ?, readskipcount = ?, processskipcount = ?, filtercount = ?, writeskipcount = ?,"
+				+ " starttime = ?, endtime = ?, persistentdata = ? WHERE stepexecid = ?";
+
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement(query);
+			statement.setLong(1, jobExecId);
+			statement.setString(2, batchStatus);
+			statement.setString(3, exitStatus);
+			statement.setString(4, stepName);
+			statement.setLong(5, readCount);
+			statement.setLong(6, writeCount);
+			statement.setLong(7, commitCount);
+			statement.setLong(8, rollbackCount);
+			statement.setLong(9, readSkipCount);
+			statement.setLong(10, processSkipCount);
+			statement.setLong(11, filterCount);
+			statement.setLong(12, writeSkipCount);
+			statement.setTimestamp(13, startTime);
+			statement.setTimestamp(14, endTime);
+			statement.setObject(15, serializeObject(persistentData));
+			statement.setLong(16, stepExecutionId); 
+			statement.executeUpdate();
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		} catch (IOException e) {
+			throw new PersistenceException(e);
+		} finally {
+			cleanupConnection(conn, null, statement);
+		}
+
+		logger.exiting(CLASSNAME, "updateStepExecution");
+	}
+
+	/* (non-Javadoc)
+	 * @see com.ibm.jbatch.container.services.IPersistenceManagerService#createJobStatus(long)
+	 */
+	@Override
+	public JobStatus createJobStatus(long jobInstanceId) {
+		logger.entering(CLASSNAME, "createJobStatus", jobInstanceId);
+		Connection conn = null;
+		PreparedStatement statement = null;
+		JobStatus jobStatus = new JobStatus(jobInstanceId);
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement("INSERT INTO jobstatus (id, obj) VALUES(?, ?)");
+			statement.setLong(1, jobInstanceId);
+			statement.setBytes(2, serializeObject(jobStatus));
+			statement.executeUpdate();
+
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		} catch (IOException e) {
+			throw new PersistenceException(e);
+		} finally {
+			cleanupConnection(conn, null, statement);
+		}
+		logger.exiting(CLASSNAME, "createJobStatus");
+		return jobStatus;
+	}
+
+	/* (non-Javadoc)
+	 * @see com.ibm.jbatch.container.services.IPersistenceManagerService#getJobStatus(long)
+	 */
+	@Override
+	public JobStatus getJobStatus(long instanceId) {
+		logger.entering(CLASSNAME, "getJobStatus", instanceId);
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ResultSet rs = null;
+		RuntimeJobExecution jobExecution = null;
+		String query = "SELECT obj FROM jobstatus WHERE id = ?";
+		JobStatus jobStatus = null;
+
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement(query);
+			statement.setLong(1, instanceId);
+			rs = statement.executeQuery();
+			if(rs.next()) {
+				jobStatus = (JobStatus)deserializeObject(rs.getBytes(1));
+			}
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		} catch (IOException e) {
+			throw new PersistenceException(e);
+		} catch (ClassNotFoundException e) {
+			throw new PersistenceException(e);
+		} finally {
+			cleanupConnection(conn, rs, statement);
+		}
+		logger.exiting(CLASSNAME, "getJobStatus", jobStatus);
+		return jobStatus;
+	}
+
+	/* (non-Javadoc)
+	 * @see com.ibm.jbatch.container.services.IPersistenceManagerService#updateJobStatus(long, com.ibm.jbatch.container.status.JobStatus)
+	 */
+	@Override
+	public void updateJobStatus(long instanceId, JobStatus jobStatus) {
+		logger.entering(CLASSNAME, "updateJobStatus", new Object[] {instanceId, jobStatus});
+		Connection conn = null;
+		PreparedStatement statement = null;
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement("UPDATE jobstatus SET obj = ? WHERE id = ?");
+			statement.setBytes(1, serializeObject(jobStatus));
+			statement.setLong(2, instanceId);
+			statement.executeUpdate();
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		} catch (IOException e) {
+			throw new PersistenceException(e);
+		} finally {
+			cleanupConnection(conn, null, statement);
+		}
+		logger.exiting(CLASSNAME, "updateJobStatus");
+	}	
+
+	/* (non-Javadoc)
+	 * @see com.ibm.jbatch.container.services.IPersistenceManagerService#createStepStatus(long)
+	 */
+	@Override
+	public StepStatus createStepStatus(long stepExecId) {
+		logger.entering(CLASSNAME, "createStepStatus", stepExecId);
+		Connection conn = null;
+		PreparedStatement statement = null;
+		StepStatus stepStatus = new StepStatus(stepExecId);
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement("INSERT INTO stepstatus (id, obj) VALUES(?, ?)");
+			statement.setLong(1, stepExecId);
+			statement.setBytes(2, serializeObject(stepStatus));
+			statement.executeUpdate();
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		} catch (IOException e) {
+			throw new PersistenceException(e);
+		} finally {
+			cleanupConnection(conn, null, statement);
+		}
+		logger.exiting(CLASSNAME, "createStepStatus");
+		return stepStatus;
+	}
+
+	/* (non-Javadoc)
+	 * @see com.ibm.jbatch.container.services.IPersistenceManagerService#getStepStatus(long, java.lang.String)
+	 */
+	@Override
+	public StepStatus getStepStatus(long instanceId, String stepName) {
+		logger.entering(CLASSNAME, "getStepStatus", new Object[] {instanceId, stepName});
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ResultSet rs = null;
+		RuntimeJobExecution jobExecution = null;
+		String query = "SELECT obj FROM stepstatus WHERE id IN ("
+				+ "SELECT B.stepexecid FROM executioninstancedata A INNER JOIN stepexecutioninstancedata B ON A.jobexecid = B.jobexecid " 
+				+ "WHERE A.jobinstanceid = ? and B.stepname = ?)";
+		StepStatus stepStatus = null;
+
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement(query);
+			statement.setLong(1, instanceId);
+			statement.setString(2, stepName);
+			rs = statement.executeQuery();
+			if(rs.next()) {
+				stepStatus = (StepStatus)deserializeObject(rs.getBytes(1));
+			}
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		} catch (IOException e) {
+			throw new PersistenceException(e);
+		} catch (ClassNotFoundException e) {
+			throw new PersistenceException(e);
+		} finally {
+			cleanupConnection(conn, rs, statement);
+		}
+		logger.exiting(CLASSNAME, "getStepStatus", stepStatus==null ? "<null>" : stepStatus);
+		return stepStatus;
+	}
+
+	/* (non-Javadoc)
+	 * @see com.ibm.jbatch.container.services.IPersistenceManagerService#updateStepStatus(long, com.ibm.jbatch.container.status.StepStatus)
+	 */
+	@Override
+	public void updateStepStatus(long stepExecutionId, StepStatus stepStatus) {
+		logger.entering(CLASSNAME, "updateStepStatus", new Object[] {stepExecutionId, stepStatus});
+		Connection conn = null;
+		PreparedStatement statement = null;
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement("UPDATE stepstatus SET obj = ? WHERE id = ?");
+			statement.setBytes(1, serializeObject(stepStatus));
+			statement.setLong(2, stepExecutionId);
+			statement.executeUpdate();
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		} catch (IOException e) {
+			throw new PersistenceException(e);
+		} finally {
+			cleanupConnection(conn, null, statement);
+		}
+		logger.exiting(CLASSNAME, "updateStepStatus");
+	}
+
+	/* (non-Javadoc)
+	 * @see com.ibm.jbatch.container.services.IPersistenceManagerService#getTagName(long)
+	 */
+	@Override
+	public String getTagName(long jobExecutionId) {
+		logger.entering(CLASSNAME, "getTagName", jobExecutionId);
+		String apptag = null;
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ResultSet rs = null;
+		String query = "SELECT A.apptag FROM jobinstancedata A INNER JOIN executioninstancedata B ON A.jobinstanceid = B.jobinstanceid"
+				+ " WHERE B.jobexecid = ?";
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement(query);
+			statement.setLong(1, jobExecutionId);
+			rs = statement.executeQuery();
+			if(rs.next()) {
+				apptag = rs.getString(1);
+			}
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		} finally {
+			cleanupConnection(conn, rs, statement);
+		}
+		logger.exiting(CLASSNAME, "getTagName");
+		return apptag;
+	}
+
+	@Override
+	public long getMostRecentExecutionId(long jobInstanceId) {
+		logger.entering(CLASSNAME, "getMostRecentExecutionId", jobInstanceId);
+		long mostRecentId = -1;
+		Connection conn = null;
+		PreparedStatement statement = null;
+		ResultSet rs = null;
+		String query = "SELECT jobexecid FROM executioninstancedata WHERE jobinstanceid = ? ORDER BY createtime DESC";
+
+		try {
+			conn = getConnection();
+			statement = conn.prepareStatement(query);
+			statement.setLong(1, jobInstanceId);
+			rs = statement.executeQuery();
+			if(rs.next()) {
+				mostRecentId = rs.getLong(1);
+			}
+		} catch (SQLException e) {
+			throw new PersistenceException(e);
+		} finally {
+			cleanupConnection(conn, rs, statement);
+		}
+		logger.exiting(CLASSNAME, "getMostRecentExecutionId");
+		return mostRecentId;
+	}
+
+	@Override
+	public void shutdown() throws BatchContainerServiceException {
+		// TODO Auto-generated method stub
+
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/services/impl/JDBCPersistenceManagerSQLConstants.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/services/impl/JDBCPersistenceManagerSQLConstants.java b/JSR352.Runtime/src/com/ibm/jbatch/container/services/impl/JDBCPersistenceManagerSQLConstants.java
new file mode 100755
index 0000000..0735d5d
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/services/impl/JDBCPersistenceManagerSQLConstants.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2013 International Business Machines Corp.
+ * 
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License, 
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package com.ibm.jbatch.container.services.impl;
+
+ interface JDBCPersistenceManagerSQLConstants {
+
+	 final String JOBSTATUS_TABLE = "JOBSTATUS";
+	 final String STEPSTATUS_TABLE = "STEPSTATUS";
+	 final String CHECKPOINTDATA_TABLE = "CHECKPOINTDATA";
+	 final String JOBINSTANCEDATA_TABLE = "JOBINSTANCEDATA";
+	 final String EXECUTIONINSTANCEDATA_TABLE = "EXECUTIONINSTANCEDATA";
+	 final String STEPEXECUTIONINSTANCEDATA_TABLE = "STEPEXECUTIONINSTANCEDATA";
+	
+	 final String CREATE_TAB_JOBSTATUS = "CREATE TABLE JOBSTATUS("
+			+ "id BIGINT CONSTRAINT JOBSTATUS_PK PRIMARY KEY," 
+			+ "obj BLOB,"
+			+ "CONSTRAINT JOBSTATUS_JOBINST_FK FOREIGN KEY (id) REFERENCES JOBINSTANCEDATA (jobinstanceid) ON DELETE CASCADE)";
+	 final String CREATE_TAB_STEPSTATUS = "CREATE TABLE STEPSTATUS("
+			+ "id BIGINT CONSTRAINT STEPSTATUS_PK PRIMARY KEY," 
+			+ "obj BLOB,"
+			+ "CONSTRAINT STEPSTATUS_STEPEXEC_FK FOREIGN KEY (id) REFERENCES STEPEXECUTIONINSTANCEDATA (stepexecid) ON DELETE CASCADE)";
+	 final String CREATE_TAB_CHECKPOINTDATA = "CREATE TABLE CHECKPOINTDATA("
+			+ "id VARCHAR(512),obj BLOB)";
+	 final String CREATE_TAB_JOBINSTANCEDATA = "CREATE TABLE JOBINSTANCEDATA("
+			+ "jobinstanceid BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) CONSTRAINT JOBINSTANCE_PK PRIMARY KEY,"
+			+ "name VARCHAR(512),"
+			+ "apptag VARCHAR(512))";
+	 final String CREATE_TAB_EXECUTIONINSTANCEDATA = "CREATE TABLE EXECUTIONINSTANCEDATA("
+			+ "jobexecid BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) CONSTRAINT JOBEXECUTION_PK PRIMARY KEY,"
+			+ "jobinstanceid BIGINT,"
+			+ "createtime TIMESTAMP,"
+			+ "starttime TIMESTAMP,"
+			+ "endtime TIMESTAMP,"
+			+ "updatetime TIMESTAMP,"
+			+ "parameters BLOB,"
+			+ "batchstatus VARCHAR(512),"
+			+ "exitstatus VARCHAR(512)," 
+			+ "CONSTRAINT JOBINST_JOBEXEC_FK FOREIGN KEY (jobinstanceid) REFERENCES JOBINSTANCEDATA (jobinstanceid))";
+	 final String CREATE_TAB_STEPEXECUTIONINSTANCEDATA = "CREATE TABLE STEPEXECUTIONINSTANCEDATA("
+			+ "stepexecid BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) CONSTRAINT STEPEXECUTION_PK PRIMARY KEY,"
+			+ "jobexecid BIGINT,"
+			+ "batchstatus VARCHAR(512),"
+			+ "exitstatus VARCHAR(512),"
+			+ "stepname VARCHAR(512),"
+			+ "readcount INTEGER,"
+			+ "writecount INTEGER,"
+			+ "commitcount INTEGER,"
+			+ "rollbackcount INTEGER,"
+			+ "readskipcount INTEGER,"
+			+ "processskipcount INTEGER,"
+			+ "filtercount INTEGER,"
+			+ "writeskipcount INTEGER,"
+			+ "startTime TIMESTAMP," 
+			+ "endTime TIMESTAMP,"
+			+ "persistentData BLOB," 
+			+ "CONSTRAINT JOBEXEC_STEPEXEC_FK FOREIGN KEY (jobexecid) REFERENCES EXECUTIONINSTANCEDATA (jobexecid))";
+	
+	 final String INSERT_JOBSTATUS = "insert into jobstatus values(?, ?)";
+	
+	 final String UPDATE_JOBSTATUS = "update jobstatus set obj = ? where id = ?";
+
+	 final String SELECT_JOBSTATUS = "select id, obj from jobstatus where id = ?";
+	
+	 final String DELETE_JOBSTATUS = "delete from jobstatus where id = ?";
+
+	 final String INSERT_STEPSTATUS = "insert into stepstatus values(?, ?)";
+	
+	 final String UPDATE_STEPSTATUS = "update stepstatus set obj = ? where id = ?";
+
+	 final String SELECT_STEPSTATUS = "select id, obj from stepstatus where id = ?";
+	
+	 final String DELETE_STEPSTATUS = "delete from stepstatus where id = ?";
+
+	 final String INSERT_CHECKPOINTDATA = "insert into checkpointdata values(?, ?)";
+
+	 final String UPDATE_CHECKPOINTDATA = "update checkpointdata set obj = ? where id = ?";
+
+	 final String SELECT_CHECKPOINTDATA = "select id, obj from checkpointdata where id = ?";
+	
+	 final String CREATE_CHECKPOINTDATA_INDEX = "create index chk_index on checkpointdata(id)";
+	
+	 final String DELETE_CHECKPOINTDATA = "delete from checkpointdata where id = ?";
+	
+	// JOB OPERATOR QUERIES
+	 final String INSERT_JOBINSTANCEDATA = "insert into jobinstancedata (name, apptag) values(?, ?)";
+	
+	 final String INSERT_EXECUTIONDATA = "insert into executionInstanceData (jobinstanceid, parameters) values(?, ?)";
+	
+	 final String SELECT_JOBINSTANCEDATA_COUNT = "select count(jobinstanceid) as jobinstancecount from jobinstancedata where name = ?";
+	
+	 final String SELECT_JOBINSTANCEDATA_IDS = "select jobinstanceid from jobinstancedata where name = ? order by jobinstanceid desc";
+	
+	 final String SELECT_JOBINSTANCEDATA_NAMES = "select name from jobinstancedata where apptag = ?";
+	 final String SELECT_JOBINSTANCEDATA_APPTAG = "select apptag from jobinstancedata where jobinstanceid = ?";
+	
+	final String START_TIME = "starttime";
+	final String CREATE_TIME = "createtime";
+	final String END_TIME = "endtime";
+	final String UPDATE_TIME = "updatetime";
+	final String BATCH_STATUS = "batchstatus";
+	final String EXIT_STATUS = "exitstatus";
+	final String INSTANCE_ID = "instanceId";
+	final String JOBEXEC_ID = "jobexecid";
+	final String STEPEXEC_ID = "stepexecid";
+	final String STEPCONTEXT = "stepcontext";
+	final String APPTAG = "apptag";
+}

http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/f7740962/JSR352.Runtime/src/com/ibm/jbatch/container/services/impl/JNDIDelegatingThreadPoolServiceImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/services/impl/JNDIDelegatingThreadPoolServiceImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/services/impl/JNDIDelegatingThreadPoolServiceImpl.java
new file mode 100755
index 0000000..e1398bd
--- /dev/null
+++ b/JSR352.Runtime/src/com/ibm/jbatch/container/services/impl/JNDIDelegatingThreadPoolServiceImpl.java
@@ -0,0 +1,98 @@
+/**
+ * Copyright 2013 International Business Machines Corp.
+ *
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership. Licensed under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.ibm.jbatch.container.services.impl;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import com.ibm.jbatch.container.exception.BatchContainerServiceException;
+import com.ibm.jbatch.container.util.BatchContainerConstants;
+import com.ibm.jbatch.spi.services.IBatchConfig;
+import com.ibm.jbatch.spi.services.IBatchThreadPoolService;
+import com.ibm.jbatch.spi.services.ParallelTaskResult;
+
+public class JNDIDelegatingThreadPoolServiceImpl implements IBatchThreadPoolService, BatchContainerConstants  {
+
+	private final static String sourceClass = JNDIDelegatingThreadPoolServiceImpl.class.getName();
+	private final static Logger logger = Logger.getLogger(sourceClass);
+
+	public final String DEFAULT_JNDI_LOCATION = "java:comp/DefaultManagedExecutorService";
+	private String jndiLocation = null;
+
+	public JNDIDelegatingThreadPoolServiceImpl() {
+		// TODO Auto-generated constructor stub
+	}
+
+	@Override
+	public void init(IBatchConfig batchConfig) {
+		// Don't want to get/cache the actual threadpool here since we want to do a JNDI lookup each time.
+		jndiLocation = batchConfig.getConfigProperties().getProperty(THREADPOOL_JNDI_LOCATION, DEFAULT_JNDI_LOCATION);
+	}
+
+	public void shutdown() throws BatchContainerServiceException {
+		String method = "shutdown";
+		if(logger.isLoggable(Level.FINER)) { logger.entering(sourceClass, method);	}
+
+		// We don't want to be responsible for cleaning up.
+
+		if(logger.isLoggable(Level.FINER)) { logger.exiting(sourceClass, method);	}
+	}
+
+	public void executeTask(Runnable work, Object config) {
+		String method = "executeTask";
+		if(logger.isLoggable(Level.FINER)) { logger.entering(sourceClass, method);	}
+
+		try {
+			Context ctx = new InitialContext();
+			ExecutorService delegateService = (ExecutorService)ctx.lookup(jndiLocation);
+			delegateService.execute(work);
+		} catch (NamingException e) {
+			logger.severe("Lookup failed for JNDI name: " + jndiLocation);
+			throw new BatchContainerServiceException(e);
+		}
+
+		if(logger.isLoggable(Level.FINER)) { logger.exiting(sourceClass, method);	}
+	}
+
+	public ParallelTaskResult executeParallelTask(Runnable work, Object config) {
+		String method = "executeParallelTask";
+		ParallelTaskResult taskResult = null;
+		if(logger.isLoggable(Level.FINER)) { logger.entering(sourceClass, method);  }
+
+		try {
+			Context ctx = new InitialContext();
+			ExecutorService delegateService = (ExecutorService)ctx.lookup(jndiLocation);
+			Future result = delegateService.submit(work);
+			taskResult = new JSEResultAdapter(result);
+		} catch (NamingException e) { 
+			logger.severe("Lookup failed for JNDI name: " + jndiLocation);
+			throw new BatchContainerServiceException(e);
+		}
+
+		if(logger.isLoggable(Level.FINER)) { logger.exiting(sourceClass, method);   }
+
+		return taskResult;
+	}
+
+
+}