You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rm...@apache.org on 2013/11/05 08:39:26 UTC
[49/62] importing batchee from github - a fork from the IBm RI
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/services/impl/JDBCPersistenceManagerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/services/impl/JDBCPersistenceManagerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/services/impl/JDBCPersistenceManagerImpl.java
deleted file mode 100755
index 1f322a5..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/services/impl/JDBCPersistenceManagerImpl.java
+++ /dev/null
@@ -1,2208 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.services.impl;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.PrintWriter;
-import java.io.Serializable;
-import java.io.StringWriter;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.batch.operations.NoSuchJobExecutionException;
-import javax.batch.runtime.BatchStatus;
-import javax.batch.runtime.JobInstance;
-import javax.batch.runtime.Metric;
-import javax.batch.runtime.StepExecution;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-import javax.sql.DataSource;
-
-import com.ibm.jbatch.container.context.impl.MetricImpl;
-import com.ibm.jbatch.container.context.impl.StepContextImpl;
-import com.ibm.jbatch.container.exception.BatchContainerServiceException;
-import com.ibm.jbatch.container.exception.PersistenceException;
-import com.ibm.jbatch.container.impl.PartitionedStepBuilder;
-import com.ibm.jbatch.container.jobinstance.JobInstanceImpl;
-import com.ibm.jbatch.container.jobinstance.JobOperatorJobExecution;
-import com.ibm.jbatch.container.jobinstance.RuntimeFlowInSplitExecution;
-import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
-import com.ibm.jbatch.container.jobinstance.StepExecutionImpl;
-import com.ibm.jbatch.container.persistence.CheckpointData;
-import com.ibm.jbatch.container.persistence.CheckpointDataKey;
-import com.ibm.jbatch.container.services.IJobExecution;
-import com.ibm.jbatch.container.services.IPersistenceManagerService;
-import com.ibm.jbatch.container.status.JobStatus;
-import com.ibm.jbatch.container.status.StepStatus;
-import com.ibm.jbatch.container.util.TCCLObjectInputStream;
-import com.ibm.jbatch.spi.services.IBatchConfig;
-
-public class JDBCPersistenceManagerImpl implements IPersistenceManagerService, JDBCPersistenceManagerSQLConstants {
-
- private static final String CLASSNAME = JDBCPersistenceManagerImpl.class.getName();
-
- private final static Logger logger = Logger.getLogger(CLASSNAME);
-
- private IBatchConfig batchConfig = null;
-
- protected DataSource dataSource = null;
- protected String jndiName = null;
-
- protected String driver = "";
- protected String schema = "";
- protected String url = "";
- protected String userId = "";
- protected String pwd = "";
-
- /* (non-Javadoc)
- * @see com.ibm.jbatch.container.services.impl.AbstractPersistenceManagerImpl#init(com.ibm.jbatch.container.IBatchConfig)
- */
- @Override
- public void init(IBatchConfig batchConfig) throws BatchContainerServiceException {
- logger.config("Entering CLASSNAME.init(), batchConfig =" + batchConfig);
-
- this.batchConfig = batchConfig;
-
- schema = batchConfig.getDatabaseConfigurationBean().getSchema();
-
- if (!batchConfig.isJ2seMode()) {
- jndiName = batchConfig.getDatabaseConfigurationBean().getJndiName();
-
- logger.config("JNDI name = " + jndiName);
-
- if (jndiName == null || jndiName.equals("")) {
- throw new BatchContainerServiceException("JNDI name is not defined.");
- }
-
- try {
- Context ctx = new InitialContext();
- dataSource = (DataSource) ctx.lookup(jndiName);
-
- } catch (NamingException e) {
- logger.severe("Lookup failed for JNDI name: " + jndiName +
- ". One cause of this could be that the batch runtime is incorrectly configured to EE mode when it should be in SE mode.");
- throw new BatchContainerServiceException(e);
- }
-
- } else {
- driver = batchConfig.getDatabaseConfigurationBean().getJdbcDriver();
- url = batchConfig.getDatabaseConfigurationBean().getJdbcUrl();
- userId = batchConfig.getDatabaseConfigurationBean().getDbUser();
- pwd = batchConfig.getDatabaseConfigurationBean().getDbPassword();
-
- logger.config("driver: " + driver + ", url: " + url);
- }
-
- try {
- // only auto-create on Derby
- if(isDerby()) {
- if(!isSchemaValid()) {
- createSchema();
- }
- checkAllTables();
- }
- } catch (SQLException e) {
- logger.severe(e.getLocalizedMessage());
- throw new BatchContainerServiceException(e);
- }
-
- logger.config("Exiting CLASSNAME.init()");
- }
-
- /**
- * Checks if the default schema JBATCH or the schema defined in batch-config exists.
- *
- * @return true if the schema exists, false otherwise.
- * @throws SQLException
- */
- private boolean isSchemaValid() throws SQLException {
- logger.entering(CLASSNAME, "isSchemaValid");
- Connection conn = getConnectionToDefaultSchema();
- DatabaseMetaData dbmd = conn.getMetaData();
- ResultSet rs = dbmd.getSchemas();
- while(rs.next()) {
- if (schema.equalsIgnoreCase(rs.getString("TABLE_SCHEM")) ) {
- cleanupConnection(conn, rs, null);
- logger.exiting(CLASSNAME, "isSchemaValid", true);
- return true;
- }
- }
- cleanupConnection(conn, rs, null);
- logger.exiting(CLASSNAME, "isSchemaValid", false);
- return false;
- }
-
- private boolean isDerby() throws SQLException {
- logger.entering(CLASSNAME, "isDerby");
- Connection conn = getConnectionToDefaultSchema();
- DatabaseMetaData dbmd = conn.getMetaData();
- boolean derby = dbmd.getDatabaseProductName().toLowerCase().indexOf("derby") > 0;
- logger.exiting(CLASSNAME, "isDerby", derby);
- return derby;
- }
-
- /**
- * Creates the default schema JBATCH or the schema defined in batch-config.
- *
- * @throws SQLException
- */
- private void createSchema() throws SQLException {
- logger.entering(CLASSNAME, "createSchema");
- Connection conn = getConnectionToDefaultSchema();
-
- logger.log(Level.INFO, schema + " schema does not exists. Trying to create it.");
- PreparedStatement ps = null;
- ps = conn.prepareStatement("CREATE SCHEMA " + schema);
- ps.execute();
-
- cleanupConnection(conn, null, ps);
- logger.exiting(CLASSNAME, "createSchema");
- }
-
- /**
- * Checks if all the runtime batch table exists. If not, it creates them.
- *
- * @throws SQLException
- */
- private void checkAllTables() throws SQLException {
- logger.entering(CLASSNAME, "checkAllTables");
-
- createIfNotExists(CHECKPOINTDATA_TABLE, CREATE_TAB_CHECKPOINTDATA);
- executeStatement(CREATE_CHECKPOINTDATA_INDEX);
- createIfNotExists(JOBINSTANCEDATA_TABLE, CREATE_TAB_JOBINSTANCEDATA);
-
- createIfNotExists(EXECUTIONINSTANCEDATA_TABLE,
- CREATE_TAB_EXECUTIONINSTANCEDATA);
- createIfNotExists(STEPEXECUTIONINSTANCEDATA_TABLE,
- CREATE_TAB_STEPEXECUTIONINSTANCEDATA);
-
- createIfNotExists(JOBSTATUS_TABLE, CREATE_TAB_JOBSTATUS);
- createIfNotExists(STEPSTATUS_TABLE, CREATE_TAB_STEPSTATUS);
-
- logger.exiting(CLASSNAME, "checkAllTables");
- }
-
- /**
- * Creates tableName using the createTableStatement DDL.
- *
- * @param tableName
- * @param createTableStatement
- * @throws SQLException
- */
- private void createIfNotExists(String tableName, String createTableStatement) throws SQLException {
- logger.entering(CLASSNAME, "createIfNotExists", new Object[] {tableName, createTableStatement});
-
- Connection conn = getConnection();
- DatabaseMetaData dbmd = conn.getMetaData();
- ResultSet rs = dbmd.getTables(null, schema, tableName, null);
- PreparedStatement ps = null;
- if(!rs.next()) {
- logger.log(Level.INFO, tableName + " table does not exists. Trying to create it.");
- ps = conn.prepareStatement(createTableStatement);
- ps.executeUpdate();
- }
-
- cleanupConnection(conn, rs, ps);
- logger.exiting(CLASSNAME, "createIfNotExists");
- }
-
- /**
- * Executes the provided SQL statement
- *
- * @param statement
- * @throws SQLException
- */
- private void executeStatement(String statement) throws SQLException {
- logger.entering(CLASSNAME, "executeStatement", statement);
-
- Connection conn = getConnection();
- PreparedStatement ps = null;
-
- ps = conn.prepareStatement(statement);
- ps.executeUpdate();
-
- cleanupConnection(conn, ps);
- logger.exiting(CLASSNAME, "executeStatement");
- }
-
-
- /* (non-Javadoc)
- * @see com.ibm.jbatch.container.services.impl.AbstractPersistenceManagerImpl#createCheckpointData(com.ibm.ws.batch.container.checkpoint.CheckpointDataKey, com.ibm.ws.batch.container.checkpoint.CheckpointData)
- */
- @Override
- public void createCheckpointData(CheckpointDataKey key, CheckpointData value) {
- logger.entering(CLASSNAME, "createCheckpointData", new Object[] {key, value});
- insertCheckpointData(key.getCommaSeparatedKey(), value);
- logger.exiting(CLASSNAME, "createCheckpointData");
- }
-
- /* (non-Javadoc)
- * @see com.ibm.jbatch.container.services.impl.AbstractPersistenceManagerImpl#getCheckpointData(com.ibm.ws.batch.container.checkpoint.CheckpointDataKey)
- */
- @Override
- public CheckpointData getCheckpointData(CheckpointDataKey key) {
- logger.entering(CLASSNAME, "getCheckpointData", key==null ? "<null>" : key);
- CheckpointData checkpointData = queryCheckpointData(key.getCommaSeparatedKey());
- logger.exiting(CLASSNAME, "getCheckpointData", checkpointData==null ? "<null>" : checkpointData);
- return checkpointData;
- }
-
- /* (non-Javadoc)
- * @see com.ibm.jbatch.container.services.impl.AbstractPersistenceManagerImpl#updateCheckpointData(com.ibm.ws.batch.container.checkpoint.CheckpointDataKey, com.ibm.ws.batch.container.checkpoint.CheckpointData)
- */
- @Override
- public void updateCheckpointData(CheckpointDataKey key, CheckpointData value) {
- logger.entering(CLASSNAME, "updateCheckpointData", new Object[] {key, value});
- CheckpointData data = queryCheckpointData(key.getCommaSeparatedKey());
- if(data != null) {
- updateCheckpointData(key.getCommaSeparatedKey(), value);
- } else {
- createCheckpointData(key, value);
- }
- logger.exiting(CLASSNAME, "updateCheckpointData");
- }
-
-
- /**
- * @return the database connection and sets it to the default schema JBATCH or the schema defined in batch-config.
- *
- * @throws SQLException
- */
- protected Connection getConnection() throws SQLException {
- logger.finest("Entering: " + CLASSNAME + ".getConnection");
- Connection connection = null;
-
- if(!batchConfig.isJ2seMode()) {
- logger.finest("J2EE mode, getting connection from data source");
- connection = dataSource.getConnection();
- logger.finest("autocommit="+connection.getAutoCommit());
- } else {
- try {
- Class.forName(driver);
- } catch (ClassNotFoundException e) {
- throw new PersistenceException(e);
- }
- logger.finest("JSE mode, getting connection from " + url);
- connection = DriverManager.getConnection(url, userId, pwd);
- logger.finest("autocommit="+connection.getAutoCommit());
- }
- setSchemaOnConnection(connection);
-
- logger.finest("Exiting: " + CLASSNAME + ".getConnection() with conn =" + connection);
- return connection;
- }
-
- /**
- * @return the database connection. The schema is set to whatever default its used by the underlying database.
- * @throws SQLException
- */
- protected Connection getConnectionToDefaultSchema() throws SQLException {
- logger.finest("Entering getConnectionToDefaultSchema");
- Connection connection = null;
-
- if(!batchConfig.isJ2seMode()) {
- logger.finest("J2EE mode, getting connection from data source");
- try {
- connection = dataSource.getConnection();
- } catch(SQLException e) {
- logException("FAILED GETTING DATABASE CONNECTION", e);
- throw new PersistenceException(e);
- }
- logger.finest("autocommit="+connection.getAutoCommit());
- } else {
- try {
- Class.forName(driver);
- } catch (ClassNotFoundException e) {
- logException("ClassNotFoundException: Cannot load driver class: " + driver, e);
- throw new PersistenceException(e);
- }
- logger.finest("JSE mode, getting connection from " + url);
- try {
- connection = DriverManager.getConnection(url, userId, pwd);
- } catch (SQLException e) {
- logException("FAILED GETTING DATABASE CONNECTION. FOR EMBEDDED DERBY CHECK FOR OTHER USER LOCKING THE CURRENT DATABASE (Try using a different database instance).", e);
- throw new PersistenceException(e);
- }
- logger.finest("autocommit="+connection.getAutoCommit());
- }
- logger.finest("Exiting from getConnectionToDefaultSchema, conn= " +connection);
- return connection;
- }
-
- private void logException(String msg, Exception e) {
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- e.printStackTrace(pw);
-
- logger.log(Level.SEVERE, msg + "; Exception stack trace: " + sw);
- }
-
- /**
- * Set the default schema JBATCH or the schema defined in batch-config on the connection object.
- *
- * @param connection
- * @throws SQLException
- */
- private void setSchemaOnConnection(Connection connection) throws SQLException {
- logger.finest("Entering " + CLASSNAME +".setSchemaOnConnection()");
-
- PreparedStatement ps = null;
- ps = connection.prepareStatement("SET SCHEMA ?");
- ps.setString(1, schema);
- ps.executeUpdate();
-
- ps.close();
-
- logger.finest("Exiting " + CLASSNAME +".setSchemaOnConnection()");
- }
-
- /**
- * select data from DB table
- *
- * @param key - the IPersistenceDataKey object
- * @return List of serializable objects store in the DB table
- *
- * Ex. select id, obj from tablename where id = ?
- */
- private CheckpointData queryCheckpointData(Object key) {
- logger.entering(CLASSNAME, "queryCheckpointData", new Object[] {key, SELECT_CHECKPOINTDATA});
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- ObjectInputStream objectIn = null;
- CheckpointData data = null;
- try {
- conn = getConnection();
- statement = conn.prepareStatement(SELECT_CHECKPOINTDATA);
- statement.setObject(1, key);
- rs = statement.executeQuery();
- if (rs.next()) {
- byte[] buf = rs.getBytes("obj");
- data = (CheckpointData)deserializeObject(buf);
- }
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } catch (IOException e) {
- throw new PersistenceException(e);
- } catch (ClassNotFoundException e) {
- throw new PersistenceException(e);
- } finally {
- if (objectIn != null) {
- try {
- objectIn.close();
- } catch (IOException e) {
- throw new PersistenceException(e);
- }
- }
- cleanupConnection(conn, rs, statement);
- }
- logger.exiting(CLASSNAME, "queryCheckpointData");
- return data;
- }
-
-
- /**
- * insert data to DB table
- *
- * @param key - the IPersistenceDataKey object
- * @param value - serializable object to store
- *
- * Ex. insert into tablename values(?, ?)
- */
- private <T> void insertCheckpointData(Object key, T value) {
- logger.entering(CLASSNAME, "insertCheckpointData", new Object[] {key, value});
- Connection conn = null;
- PreparedStatement statement = null;
- ByteArrayOutputStream baos = null;
- ObjectOutputStream oout = null;
- byte[] b;
- try {
- conn = getConnection();
- statement = conn.prepareStatement(INSERT_CHECKPOINTDATA);
- baos = new ByteArrayOutputStream();
- oout = new ObjectOutputStream(baos);
- oout.writeObject(value);
-
- b = baos.toByteArray();
-
- statement.setObject(1, key);
- statement.setBytes(2, b);
- statement.executeUpdate();
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } catch (IOException e) {
- throw new PersistenceException(e);
- } finally {
- if (baos != null) {
- try {
- baos.close();
- } catch (IOException e) {
- throw new PersistenceException(e);
- }
- }
- if (oout != null) {
- try {
- oout.close();
- } catch (IOException e) {
- throw new PersistenceException(e);
- }
- }
- cleanupConnection(conn, null, statement);
- }
- logger.exiting(CLASSNAME, "insertCheckpointData");
- }
-
- /**
- * update data in DB table
- *
- * @param value - serializable object to store
- * @param key - the IPersistenceDataKey object
- * @param query - SQL statement to execute.
- *
- * Ex. update tablename set obj = ? where id = ?
- */
- private void updateCheckpointData(Object key, CheckpointData value) {
- logger.entering(CLASSNAME, "updateCheckpointData", new Object[] {key, value});
- Connection conn = null;
- PreparedStatement statement = null;
- ByteArrayOutputStream baos = null;
- ObjectOutputStream oout = null;
- byte[] b;
- try {
- conn = getConnection();
- statement = conn.prepareStatement(UPDATE_CHECKPOINTDATA);
- baos = new ByteArrayOutputStream();
- oout = new ObjectOutputStream(baos);
- oout.writeObject(value);
-
- b = baos.toByteArray();
-
- statement.setBytes(1, b);
- statement.setObject(2, key);
- statement.executeUpdate();
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } catch (IOException e) {
- throw new PersistenceException(e);
- } finally {
- if (baos != null) {
- try {
- baos.close();
- } catch (IOException e) {
- throw new PersistenceException(e);
- }
- }
- if (oout != null) {
- try {
- oout.close();
- } catch (IOException e) {
- throw new PersistenceException(e);
- }
- }
- cleanupConnection(conn, null, statement);
- }
- logger.exiting(CLASSNAME, "updateCheckpointData");
- }
-
-
-
- /**
- * closes connection, result set and statement
- *
- * @param conn - connection object to close
- * @param rs - result set object to close
- * @param statement - statement object to close
- */
- private void cleanupConnection(Connection conn, ResultSet rs, PreparedStatement statement) {
-
- logger.logp(Level.FINEST, CLASSNAME, "cleanupConnection", "Entering", new Object[] {conn, rs==null ? "<null>" : rs, statement==null ? "<null>" : statement});
-
- if (statement != null) {
- try {
- statement.close();
- } catch (SQLException e) {
- throw new PersistenceException(e);
- }
- }
-
- if (rs != null) {
- try {
- rs.close();
- } catch (SQLException e) {
- throw new PersistenceException(e);
- }
- }
-
- if (conn != null) {
- try {
- conn.close();
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } finally {
- try {
- conn.close();
- } catch (SQLException e) {
- throw new PersistenceException(e);
- }
- }
- }
- logger.logp(Level.FINEST, CLASSNAME, "cleanupConnection", "Exiting");
- }
-
- /**
- * closes connection and statement
- *
- * @param conn - connection object to close
- * @param statement - statement object to close
- */
- private void cleanupConnection(Connection conn, PreparedStatement statement) {
-
- logger.logp(Level.FINEST, CLASSNAME, "cleanupConnection", "Entering", new Object[] {conn, statement});
-
- if (statement != null) {
- try {
- statement.close();
- } catch (SQLException e) {
- throw new PersistenceException(e);
- }
- }
-
- if (conn != null) {
- try {
- conn.close();
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } finally {
- try {
- conn.close();
- } catch (SQLException e) {
- throw new PersistenceException(e);
- }
- }
- }
- logger.logp(Level.FINEST, CLASSNAME, "cleanupConnection", "Exiting");
- }
-
-
- @Override
- public int jobOperatorGetJobInstanceCount(String jobName, String appTag) {
-
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- int count;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement("select count(jobinstanceid) as jobinstancecount from jobinstancedata where name = ? and apptag = ?");
- statement.setString(1, jobName);
- statement.setString(2, appTag);
- rs = statement.executeQuery();
- rs.next();
- count = rs.getInt("jobinstancecount");
-
- } catch (SQLException e) {
- throw new PersistenceException(e);
- }
- finally {
- cleanupConnection(conn, rs, statement);
- }
- return count;
- }
-
- @Override
- public int jobOperatorGetJobInstanceCount(String jobName) {
-
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- int count;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement(SELECT_JOBINSTANCEDATA_COUNT);
- statement.setString(1, jobName);
- rs = statement.executeQuery();
- rs.next();
- count = rs.getInt("jobinstancecount");
-
- } catch (SQLException e) {
- throw new PersistenceException(e);
- }
- finally {
- cleanupConnection(conn, rs, statement);
- }
- return count;
- }
-
-
- @Override
- public List<Long> jobOperatorGetJobInstanceIds(String jobName, String appTag, int start, int count) {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- List<Long> data = new ArrayList<Long>();
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement("select jobinstanceid from jobinstancedata where name = ? and apptag = ? order by jobinstanceid desc");
- statement.setObject(1, jobName);
- statement.setObject(2, appTag);
- rs = statement.executeQuery();
- while (rs.next()) {
- long id = rs.getLong("jobinstanceid");
- data.add(id);
- }
- } catch (SQLException e) {
- throw new PersistenceException(e);
- }
- finally {
- cleanupConnection(conn, rs, statement);
- }
-
- if (data.size() > 0){
- try {
- return data.subList(start, start+count);
- }
- catch (IndexOutOfBoundsException oobEx){
- return data.subList(start, data.size());
- }
- }
- else return data;
- }
-
- @Override
- public List<Long> jobOperatorGetJobInstanceIds(String jobName, int start, int count) {
-
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- List<Long> data = new ArrayList<Long>();
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement(SELECT_JOBINSTANCEDATA_IDS);
- statement.setObject(1, jobName);
- rs = statement.executeQuery();
- while (rs.next()) {
- long id = rs.getLong("jobinstanceid");
- data.add(id);
- }
- } catch (SQLException e) {
- throw new PersistenceException(e);
- }
- finally {
- cleanupConnection(conn, rs, statement);
- }
-
- if (data.size() > 0){
- try {
- return data.subList(start, start+count);
- }
- catch (IndexOutOfBoundsException oobEx){
- return data.subList(start, data.size());
- }
- }
- else return data;
- }
-
- @Override
- public Map<Long, String> jobOperatorGetExternalJobInstanceData() {
-
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- HashMap<Long, String> data = new HashMap<Long,String>();
-
- try {
- conn = getConnection();
-
- // Filter out 'subjob' parallel execution entries which start with the special character
- final String filter = "not like '" + PartitionedStepBuilder.JOB_ID_SEPARATOR + "%'";
-
- statement = conn.prepareStatement("select distinct jobinstanceid, name from jobinstancedata where name " + filter );
- rs = statement.executeQuery();
- while (rs.next()) {
- long id = rs.getLong("jobinstanceid");
- String name = rs.getString("name");
- data.put(id, name);
- }
- } catch (SQLException e) {
- throw new PersistenceException(e);
- }
- finally {
- cleanupConnection(conn, rs, statement);
- }
-
- return data;
- }
-
- @Override
- public Timestamp jobOperatorQueryJobExecutionTimestamp(long key, TimestampType timestampType) {
-
-
-
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- Timestamp createTimestamp = null;
- Timestamp endTimestamp = null;
- Timestamp updateTimestamp = null;
- Timestamp startTimestamp = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement("select createtime, endtime, updatetime, starttime from executioninstancedata where jobexecid = ?");
- statement.setObject(1, key);
- rs = statement.executeQuery();
- while (rs.next()) {
- createTimestamp = rs.getTimestamp(1);
- endTimestamp = rs.getTimestamp(2);
- updateTimestamp = rs.getTimestamp(3);
- startTimestamp = rs.getTimestamp(4);
- }
- } catch (SQLException e) {
- throw new PersistenceException(e);
- }
- finally {
- cleanupConnection(conn, rs, statement);
- }
-
- if (timestampType.equals(TimestampType.CREATE)) {
- return createTimestamp;
- } else if (timestampType.equals(TimestampType.END)) {
- return endTimestamp;
- } else if (timestampType.equals(TimestampType.LAST_UPDATED)) {
- return updateTimestamp;
- } else if (timestampType.equals(TimestampType.STARTED)) {
- return startTimestamp;
- } else {
- throw new IllegalArgumentException("Unexpected enum value.");
- }
- }
-
- @Override
- public String jobOperatorQueryJobExecutionBatchStatus(long key) {
-
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- String status = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement("select batchstatus from executioninstancedata where jobexecid = ?");
- statement.setLong(1, key);
- rs = statement.executeQuery();
- while (rs.next()) {
- status = rs.getString(1);
- }
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
-
- return status;
- }
-
-
- @Override
- public String jobOperatorQueryJobExecutionExitStatus(long key) {
-
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- String status = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement("select exitstatus from executioninstancedata where jobexecid = ?");
- statement.setLong(1, key);
- rs = statement.executeQuery();
- while (rs.next()) {
- status = rs.getString(1);
- }
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
-
- return status;
- }
-
- @Override
- public long jobOperatorQueryJobExecutionJobInstanceId(long executionID) throws NoSuchJobExecutionException {
-
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- long jobinstanceID = 0;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement("select jobinstanceid from executioninstancedata where jobexecid = ?");
- statement.setLong(1, executionID);
- rs = statement.executeQuery();
- if (rs.next()) {
- jobinstanceID = rs.getLong("jobinstanceid");
- } else {
- String msg = "Did not find job instance associated with executionID =" + executionID;
- logger.fine(msg);
- throw new NoSuchJobExecutionException(msg);
- }
- } catch (SQLException e) {
- throw new PersistenceException(e);
- }
- finally {
- cleanupConnection(conn, rs, statement);
- }
-
- return jobinstanceID;
- }
-
- @Override
- public Properties getParameters(long executionId) throws NoSuchJobExecutionException {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- Properties props = null;
- ObjectInputStream objectIn = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement("select parameters from executioninstancedata where jobexecid = ?");
- statement.setLong(1, executionId);
- rs = statement.executeQuery();
-
- if (rs.next()) {
- // get the object based data
- byte[] buf = rs.getBytes("parameters");
- props = (Properties)deserializeObject(buf);
- } else {
- String msg = "Did not find table entry for executionID =" + executionId;
- logger.fine(msg);
- throw new NoSuchJobExecutionException(msg);
- }
-
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } catch (IOException e) {
- throw new PersistenceException(e);
- } catch (ClassNotFoundException e) {
- throw new PersistenceException(e);
- } finally {
- if (objectIn != null) {
- try {
- objectIn.close();
- } catch (IOException e) {
- throw new PersistenceException(e);
- }
- }
- cleanupConnection(conn, rs, statement);
- }
-
- return props;
-
- }
-
-
- public Map<String, StepExecution> getMostRecentStepExecutionsForJobInstance(long instanceId) {
-
- Map<String, StepExecution> data = new HashMap<String, StepExecution>();
-
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
-
- long jobexecid = 0;
- long stepexecid = 0;
- String stepname = null;
- String batchstatus = null;
- String exitstatus = null;
- Exception ex = null;
- long readCount =0;
- long writeCount = 0;
- long commitCount = 0;
- long rollbackCount = 0;
- long readSkipCount = 0;
- long processSkipCount = 0;
- long filterCount = 0;
- long writeSkipCount = 0;
- Timestamp startTS = null;
- Timestamp endTS = null;
- StepExecutionImpl stepEx = null;
- ObjectInputStream objectIn = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement("select A.* from stepexecutioninstancedata as A inner join executioninstancedata as B on A.jobexecid = B.jobexecid where B.jobinstanceid = ? order by A.stepexecid desc");
- statement.setLong(1, instanceId);
- rs = statement.executeQuery();
- while (rs.next()) {
- stepname = rs.getString("stepname");
- if (data.containsKey(stepname)) {
- continue;
- } else {
-
- jobexecid = rs.getLong("jobexecid");
- batchstatus = rs.getString("batchstatus");
- exitstatus = rs.getString("exitstatus");
- readCount = rs.getLong("readcount");
- writeCount = rs.getLong("writecount");
- commitCount = rs.getLong("commitcount");
- rollbackCount = rs.getLong("rollbackcount");
- readSkipCount = rs.getLong("readskipcount");
- processSkipCount = rs.getLong("processskipcount");
- filterCount = rs.getLong("filtercount");
- writeSkipCount = rs.getLong("writeSkipCount");
- startTS = rs.getTimestamp("startTime");
- endTS = rs.getTimestamp("endTime");
- // get the object based data
- Serializable persistentData = null;
- byte[] pDataBytes = rs.getBytes("persistentData");
- if (pDataBytes != null) {
- objectIn = new TCCLObjectInputStream(new ByteArrayInputStream(pDataBytes));
- persistentData = (Serializable)objectIn.readObject();
- }
-
- stepEx = new StepExecutionImpl(jobexecid, stepexecid);
-
- stepEx.setBatchStatus(BatchStatus.valueOf(batchstatus));
- stepEx.setExitStatus(exitstatus);
- stepEx.setStepName(stepname);
- stepEx.setReadCount(readCount);
- stepEx.setWriteCount(writeCount);
- stepEx.setCommitCount(commitCount);
- stepEx.setRollbackCount(rollbackCount);
- stepEx.setReadSkipCount(readSkipCount);
- stepEx.setProcessSkipCount(processSkipCount);
- stepEx.setFilterCount(filterCount);
- stepEx.setWriteSkipCount(writeSkipCount);
- stepEx.setStartTime(startTS);
- stepEx.setEndTime(endTS);
- stepEx.setPersistentUserData(persistentData);
-
- data.put(stepname, stepEx);
- }
- }
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } catch (IOException e) {
- throw new PersistenceException(e);
- } catch (ClassNotFoundException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
-
- return data;
- }
-
-
- @Override
- public List<StepExecution> getStepExecutionsForJobExecution(long execid) {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
-
- long jobexecid = 0;
- long stepexecid = 0;
- String stepname = null;
- String batchstatus = null;
- String exitstatus = null;
- Exception ex = null;
- long readCount =0;
- long writeCount = 0;
- long commitCount = 0;
- long rollbackCount = 0;
- long readSkipCount = 0;
- long processSkipCount = 0;
- long filterCount = 0;
- long writeSkipCount = 0;
- Timestamp startTS = null;
- Timestamp endTS = null;
- StepExecutionImpl stepEx = null;
- ObjectInputStream objectIn = null;
-
- List<StepExecution> data = new ArrayList<StepExecution>();
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement("select * from stepexecutioninstancedata where jobexecid = ?");
- statement.setLong(1, execid);
- rs = statement.executeQuery();
- while (rs.next()) {
- jobexecid = rs.getLong("jobexecid");
- stepexecid = rs.getLong("stepexecid");
- stepname = rs.getString("stepname");
- batchstatus = rs.getString("batchstatus");
- exitstatus = rs.getString("exitstatus");
- readCount = rs.getLong("readcount");
- writeCount = rs.getLong("writecount");
- commitCount = rs.getLong("commitcount");
- rollbackCount = rs.getLong("rollbackcount");
- readSkipCount = rs.getLong("readskipcount");
- processSkipCount = rs.getLong("processskipcount");
- filterCount = rs.getLong("filtercount");
- writeSkipCount = rs.getLong("writeSkipCount");
- startTS = rs.getTimestamp("startTime");
- endTS = rs.getTimestamp("endTime");
- // get the object based data
- Serializable persistentData = null;
- byte[] pDataBytes = rs.getBytes("persistentData");
- if (pDataBytes != null) {
- objectIn = new TCCLObjectInputStream(new ByteArrayInputStream(pDataBytes));
- persistentData = (Serializable)objectIn.readObject();
- }
-
- stepEx = new StepExecutionImpl(jobexecid, stepexecid);
-
- stepEx.setBatchStatus(BatchStatus.valueOf(batchstatus));
- stepEx.setExitStatus(exitstatus);
- stepEx.setStepName(stepname);
- stepEx.setReadCount(readCount);
- stepEx.setWriteCount(writeCount);
- stepEx.setCommitCount(commitCount);
- stepEx.setRollbackCount(rollbackCount);
- stepEx.setReadSkipCount(readSkipCount);
- stepEx.setProcessSkipCount(processSkipCount);
- stepEx.setFilterCount(filterCount);
- stepEx.setWriteSkipCount(writeSkipCount);
- stepEx.setStartTime(startTS);
- stepEx.setEndTime(endTS);
- stepEx.setPersistentUserData(persistentData);
-
- data.add(stepEx);
- }
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } catch (IOException e) {
- throw new PersistenceException(e);
- } catch (ClassNotFoundException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
-
- return data;
- }
-
-
- @Override
- public StepExecution getStepExecutionByStepExecutionId(long stepExecId) {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
-
- long jobexecid = 0;
- long stepexecid = 0;
- String stepname = null;
- String batchstatus = null;
- String exitstatus = null;
- Exception ex = null;
- long readCount = 0;
- long writeCount = 0;
- long commitCount = 0;
- long rollbackCount = 0;
- long readSkipCount = 0;
- long processSkipCount = 0;
- long filterCount = 0;
- long writeSkipCount = 0;
- Timestamp startTS = null;
- Timestamp endTS = null;
- StepExecutionImpl stepEx = null;
- ObjectInputStream objectIn = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement("select * from stepexecutioninstancedata where stepexecid = ?");
- statement.setLong(1, stepExecId);
- rs = statement.executeQuery();
- while (rs.next()) {
- jobexecid = rs.getLong("jobexecid");
- stepexecid = rs.getLong("stepexecid");
- stepname = rs.getString("stepname");
- batchstatus = rs.getString("batchstatus");
- exitstatus = rs.getString("exitstatus");
- readCount = rs.getLong("readcount");
- writeCount = rs.getLong("writecount");
- commitCount = rs.getLong("commitcount");
- rollbackCount = rs.getLong("rollbackcount");
- readSkipCount = rs.getLong("readskipcount");
- processSkipCount = rs.getLong("processskipcount");
- filterCount = rs.getLong("filtercount");
- writeSkipCount = rs.getLong("writeSkipCount");
- startTS = rs.getTimestamp("startTime");
- endTS = rs.getTimestamp("endTime");
- // get the object based data
- Serializable persistentData = null;
- byte[] pDataBytes = rs.getBytes("persistentData");
- if (pDataBytes != null) {
- objectIn = new TCCLObjectInputStream(new ByteArrayInputStream(pDataBytes));
- persistentData = (Serializable) objectIn.readObject();
- }
-
- stepEx = new StepExecutionImpl(jobexecid, stepexecid);
-
- stepEx.setBatchStatus(BatchStatus.valueOf(batchstatus));
- stepEx.setExitStatus(exitstatus);
- stepEx.setStepName(stepname);
- stepEx.setReadCount(readCount);
- stepEx.setWriteCount(writeCount);
- stepEx.setCommitCount(commitCount);
- stepEx.setRollbackCount(rollbackCount);
- stepEx.setReadSkipCount(readSkipCount);
- stepEx.setProcessSkipCount(processSkipCount);
- stepEx.setFilterCount(filterCount);
- stepEx.setWriteSkipCount(writeSkipCount);
- stepEx.setStartTime(startTS);
- stepEx.setEndTime(endTS);
- stepEx.setPersistentUserData(persistentData);
-
-
- }
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } catch (IOException e) {
- throw new PersistenceException(e);
- } catch (ClassNotFoundException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
-
- return stepEx;
- }
-
- @Override
- public void updateBatchStatusOnly(long key, BatchStatus batchStatus, Timestamp updatets) {
- Connection conn = null;
- PreparedStatement statement = null;
- ByteArrayOutputStream baos = null;
- ObjectOutputStream oout = null;
- byte[] b;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement("update executioninstancedata set batchstatus = ?, updatetime = ? where jobexecid = ?");
- statement.setString(1, batchStatus.name());
- statement.setTimestamp(2, updatets);
- statement.setLong(3, key);
- statement.executeUpdate();
- } catch (SQLException e) {
- e.printStackTrace();
- throw new PersistenceException(e);
- } finally {
- if (baos != null) {
- try {
- baos.close();
- } catch (IOException e) {
- throw new PersistenceException(e);
- }
- }
- if (oout != null) {
- try {
- oout.close();
- } catch (IOException e) {
- throw new PersistenceException(e);
- }
- }
- cleanupConnection(conn, null, statement);
- }
- }
-
- @Override
- public void updateWithFinalExecutionStatusesAndTimestamps(long key,
- BatchStatus batchStatus, String exitStatus, Timestamp updatets) {
- // TODO Auto-generated methddod stub
- Connection conn = null;
- PreparedStatement statement = null;
- ByteArrayOutputStream baos = null;
- ObjectOutputStream oout = null;
- byte[] b;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement("update executioninstancedata set batchstatus = ?, exitstatus = ?, endtime = ?, updatetime = ? where jobexecid = ?");
-
- statement.setString(1, batchStatus.name());
- statement.setString(2, exitStatus);
- statement.setTimestamp(3, updatets);
- statement.setTimestamp(4, updatets);
- statement.setLong(5, key);
-
- statement.executeUpdate();
- } catch (SQLException e) {
- e.printStackTrace();
- throw new PersistenceException(e);
- } finally {
- if (baos != null) {
- try {
- baos.close();
- } catch (IOException e) {
- throw new PersistenceException(e);
- }
- }
- if (oout != null) {
- try {
- oout.close();
- } catch (IOException e) {
- throw new PersistenceException(e);
- }
- }
- cleanupConnection(conn, null, statement);
- }
-
- }
-
- public void markJobStarted(long key, Timestamp startTS) {
- Connection conn = null;
- PreparedStatement statement = null;
- ByteArrayOutputStream baos = null;
- ObjectOutputStream oout = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement("update executioninstancedata set batchstatus = ?, starttime = ?, updatetime = ? where jobexecid = ?");
-
- statement.setString(1, BatchStatus.STARTED.name());
- statement.setTimestamp(2, startTS);
- statement.setTimestamp(3, startTS);
- statement.setLong(4, key);
-
- statement.executeUpdate();
- } catch (SQLException e) {
- e.printStackTrace();
- throw new PersistenceException(e);
- } finally {
- if (baos != null) {
- try {
- baos.close();
- } catch (IOException e) {
- throw new PersistenceException(e);
- }
- }
- if (oout != null) {
- try {
- oout.close();
- } catch (IOException e) {
- throw new PersistenceException(e);
- }
- }
- cleanupConnection(conn, null, statement);
- }
- }
-
-
- @Override
- public IJobExecution jobOperatorGetJobExecution(long jobExecutionId) {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- Timestamp createtime = null;
- Timestamp starttime = null;
- Timestamp endtime = null;
- Timestamp updatetime = null;
- long instanceId = 0;
- String batchStatus = null;
- String exitStatus = null;
- JobOperatorJobExecution jobEx = null;
- ObjectInputStream objectIn = null;
- String jobName = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement("select A.createtime, A.starttime, A.endtime, A.updatetime, A.parameters, A.jobinstanceid, A.batchstatus, A.exitstatus, B.name from executioninstancedata as A inner join jobinstancedata as B on A.jobinstanceid = B.jobinstanceid where jobexecid = ?");
- statement.setLong(1, jobExecutionId);
- rs = statement.executeQuery();
- while (rs.next()) {
- createtime = rs.getTimestamp("createtime");
- starttime = rs.getTimestamp("starttime");
- endtime = rs.getTimestamp("endtime");
- updatetime = rs.getTimestamp("updatetime");
- instanceId = rs.getLong("jobinstanceid");
-
- // get the object based data
- batchStatus = rs.getString("batchstatus");
- exitStatus = rs.getString("exitstatus");
-
- // get the object based data
- Properties params = null;
- byte[] buf = rs.getBytes("parameters");
- params = (Properties)deserializeObject(buf);
-
- jobName = rs.getString("name");
-
- jobEx = new JobOperatorJobExecution(jobExecutionId, instanceId);
- jobEx.setCreateTime(createtime);
- jobEx.setStartTime(starttime);
- jobEx.setEndTime(endtime);
- jobEx.setJobParameters(params);
- jobEx.setLastUpdateTime(updatetime);
- jobEx.setBatchStatus(batchStatus);
- jobEx.setExitStatus(exitStatus);
- jobEx.setJobName(jobName);
- }
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } catch (IOException e) {
- throw new PersistenceException(e);
- } catch (ClassNotFoundException e) {
- throw new PersistenceException(e);
- } finally {
- if (objectIn != null) {
- try {
- objectIn.close();
- } catch (IOException e) {
- throw new PersistenceException(e);
- }
- }
- cleanupConnection(conn, rs, statement);
- }
- return jobEx;
- }
-
- @Override
- public List<IJobExecution> jobOperatorGetJobExecutions(long jobInstanceId) {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- Timestamp createtime = null;
- Timestamp starttime = null;
- Timestamp endtime = null;
- Timestamp updatetime = null;
- long jobExecutionId = 0;
- long instanceId = 0;
- String batchStatus = null;
- String exitStatus = null;
- String jobName = null;
- List<IJobExecution> data = new ArrayList<IJobExecution>();
- JobOperatorJobExecution jobEx = null;
- ObjectInputStream objectIn = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement("select A.jobexecid, A.createtime, A.starttime, A.endtime, A.updatetime, A.parameters, A.batchstatus, A.exitstatus, B.name from executioninstancedata as A inner join jobinstancedata as B ON A.jobinstanceid = B.jobinstanceid where A.jobinstanceid = ?");
- statement.setLong(1, jobInstanceId);
- rs = statement.executeQuery();
- while (rs.next()) {
- jobExecutionId = rs.getLong("jobexecid");
- createtime = rs.getTimestamp("createtime");
- starttime = rs.getTimestamp("starttime");
- endtime = rs.getTimestamp("endtime");
- updatetime = rs.getTimestamp("updatetime");
- batchStatus = rs.getString("batchstatus");
- exitStatus = rs.getString("exitstatus");
- jobName = rs.getString("name");
-
- // get the object based data
- byte[] buf = rs.getBytes("parameters");
- Properties params = (Properties)deserializeObject(buf);
-
- jobEx = new JobOperatorJobExecution(jobExecutionId, instanceId);
- jobEx.setCreateTime(createtime);
- jobEx.setStartTime(starttime);
- jobEx.setEndTime(endtime);
- jobEx.setLastUpdateTime(updatetime);
- jobEx.setBatchStatus(batchStatus);
- jobEx.setExitStatus(exitStatus);
- jobEx.setJobName(jobName);
-
- data.add(jobEx);
- }
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } catch (IOException e) {
- throw new PersistenceException(e);
- } catch (ClassNotFoundException e) {
- throw new PersistenceException(e);
- } finally {
- if (objectIn != null) {
- try {
- objectIn.close();
- } catch (IOException e) {
- throw new PersistenceException(e);
- }
- }
- cleanupConnection(conn, rs, statement);
- }
- return data;
- }
-
- @Override
- public Set<Long> jobOperatorGetRunningExecutions(String jobName){
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- Set<Long> executionIds = new HashSet<Long>();
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement("SELECT A.jobexecid FROM executioninstancedata AS A INNER JOIN jobinstancedata AS B ON A.jobinstanceid = B.jobinstanceid WHERE A.batchstatus IN (?,?,?) AND B.name = ?");
- statement.setString(1, BatchStatus.STARTED.name());
- statement.setString(2, BatchStatus.STARTING.name());
- statement.setString(3, BatchStatus.STOPPING.name());
- statement.setString(4, jobName);
- rs = statement.executeQuery();
- while (rs.next()) {
- executionIds.add(rs.getLong("jobexecid"));
- }
-
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
- return executionIds;
- }
-
- @Override
- public String getJobCurrentTag(long jobInstanceId) {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- String apptag = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement(SELECT_JOBINSTANCEDATA_APPTAG);
- statement.setLong(1, jobInstanceId);
- rs = statement.executeQuery();
- while (rs.next()) {
- apptag = rs.getString(APPTAG);
- }
-
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
-
- return apptag;
- }
-
- @Override
- public void purge(String apptag) {
-
- logger.entering(CLASSNAME, "purge", apptag);
- String deleteJobs = "DELETE FROM jobinstancedata WHERE apptag = ?";
- String deleteJobExecutions = "DELETE FROM executioninstancedata "
- + "WHERE jobexecid IN ("
- + "SELECT B.jobexecid FROM jobinstancedata AS A INNER JOIN executioninstancedata AS B "
- + "ON A.jobinstanceid = B.jobinstanceid "
- + "WHERE A.apptag = ?)";
- String deleteStepExecutions = "DELETE FROM stepexecutioninstancedata "
- + "WHERE stepexecid IN ("
- + "SELECT C.stepexecid FROM jobinstancedata AS A INNER JOIN executioninstancedata AS B "
- + "ON A.jobinstanceid = B.jobinstanceid INNER JOIN stepexecutioninstancedata AS C "
- + "ON B.jobexecid = C.jobexecid "
- + "WHERE A.apptag = ?)";
-
- Connection conn = null;
- PreparedStatement statement = null;
- try {
- conn = getConnection();
- statement = conn.prepareStatement(deleteStepExecutions);
- statement.setString(1, apptag);
- statement.executeUpdate();
-
- statement = conn.prepareStatement(deleteJobExecutions);
- statement.setString(1, apptag);
- statement.executeUpdate();
-
- statement = conn.prepareStatement(deleteJobs);
- statement.setString(1, apptag);
- statement.executeUpdate();
-
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } finally {
-
- cleanupConnection(conn, null, statement);
- }
- logger.exiting(CLASSNAME, "purge");
-
- }
-
- @Override
- public JobStatus getJobStatusFromExecution(long executionId) {
-
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- JobStatus retVal = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement("select A.obj from jobstatus as A inner join " +
- "executioninstancedata as B on A.id = B.jobinstanceid where B.jobexecid = ?");
- statement.setLong(1, executionId);
- rs = statement.executeQuery();
- byte[] buf = null;
- if (rs.next()) {
- buf = rs.getBytes("obj");
- }
- retVal = (JobStatus)deserializeObject(buf);
- } catch (Exception e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
- logger.exiting(CLASSNAME, "executeQuery");
- return retVal;
- }
-
- public long getJobInstanceIdByExecutionId(long executionId) throws NoSuchJobExecutionException {
- long instanceId= 0;
-
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement("select jobinstanceid from executioninstancedata where jobexecid = ?");
- statement.setObject(1, executionId);
- rs = statement.executeQuery();
- if (rs.next()) {
- instanceId = rs.getLong("jobinstanceid");
- } else {
- String msg = "Did not find job instance associated with executionID =" + executionId;
- logger.fine(msg);
- throw new NoSuchJobExecutionException(msg);
- }
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
-
- return instanceId;
- }
-
- /**
- * This method is used to serialized an object saved into a table BLOB field.
- *
- * @param theObject the object to be serialized
- * @return a object byte array
- * @throws IOException
- */
- private byte[] serializeObject(Serializable theObject) throws IOException {
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oout = new ObjectOutputStream(baos);
- oout.writeObject(theObject);
- byte[] data = baos.toByteArray();
- baos.close();
- oout.close();
-
- return data;
- }
-
- /**
- * This method is used to de-serialized a table BLOB field to its original object form.
- *
- * @param buffer the byte array save a BLOB
- * @return the object saved as byte array
- * @throws IOException
- * @throws ClassNotFoundException
- */
- private Serializable deserializeObject(byte[] buffer) throws IOException, ClassNotFoundException {
-
- Serializable theObject = null;
- ObjectInputStream objectIn = null;
-
- if (buffer != null) {
- objectIn = new ObjectInputStream(new ByteArrayInputStream(buffer));
- theObject = (Serializable)objectIn.readObject();
- objectIn.close();
- }
- return theObject;
- }
-
- /* (non-Javadoc)
- * @see com.ibm.jbatch.container.services.IPersistenceManagerService#createJobInstance(java.lang.String, java.lang.String, java.lang.String, java.util.Properties)
- */
- @Override
- public JobInstance createSubJobInstance(String name, String apptag) {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- JobInstanceImpl jobInstance = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement("INSERT INTO jobinstancedata (name, apptag) VALUES(?, ?)", Statement.RETURN_GENERATED_KEYS);
- statement.setString(1, name);
- statement.setString(2, apptag);
- statement.executeUpdate();
- rs = statement.getGeneratedKeys();
- if(rs.next()) {
- long jobInstanceID = rs.getLong(1);
- jobInstance = new JobInstanceImpl(jobInstanceID);
- jobInstance.setJobName(name);
- }
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
- return jobInstance;
- }
-
- /* (non-Javadoc)
- * @see com.ibm.jbatch.container.services.IPersistenceManagerService#createJobInstance(java.lang.String, java.lang.String, java.lang.String, java.util.Properties)
- */
- @Override
- public JobInstance createJobInstance(String name, String apptag, String jobXml) {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- JobInstanceImpl jobInstance = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement("INSERT INTO jobinstancedata (name, apptag) VALUES(?, ?)", Statement.RETURN_GENERATED_KEYS);
- statement.setString(1, name);
- statement.setString(2, apptag);
- statement.executeUpdate();
- rs = statement.getGeneratedKeys();
- if(rs.next()) {
- long jobInstanceID = rs.getLong(1);
- jobInstance = new JobInstanceImpl(jobInstanceID, jobXml);
- jobInstance.setJobName(name);
- }
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
- return jobInstance;
- }
-
- /* (non-Javadoc)
- * @see com.ibm.jbatch.container.services.IPersistenceManagerService#createJobExecution(com.ibm.jbatch.container.jsl.JobNavigator, javax.batch.runtime.JobInstance, java.util.Properties, com.ibm.jbatch.container.context.impl.JobContextImpl)
- */
- @Override
- public RuntimeJobExecution createJobExecution(JobInstance jobInstance, Properties jobParameters, BatchStatus batchStatus) {
- Timestamp now = new Timestamp(System.currentTimeMillis());
- long newExecutionId = createRuntimeJobExecutionEntry(jobInstance, jobParameters, batchStatus, now);
- RuntimeJobExecution jobExecution = new RuntimeJobExecution(jobInstance, newExecutionId);
- jobExecution.setBatchStatus(batchStatus.name());
- jobExecution.setCreateTime(now);
- jobExecution.setLastUpdateTime(now);
- return jobExecution;
- }
-
- private long createRuntimeJobExecutionEntry(JobInstance jobInstance, Properties jobParameters, BatchStatus batchStatus, Timestamp timestamp) {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- long newJobExecutionId = 0L;
- try {
- conn = getConnection();
- statement = conn.prepareStatement("INSERT INTO executioninstancedata (jobinstanceid, createtime, updatetime, batchstatus, parameters) VALUES(?, ?, ?, ?, ?)", Statement.RETURN_GENERATED_KEYS);
- statement.setLong(1, jobInstance.getInstanceId());
- statement.setTimestamp(2, timestamp);
- statement.setTimestamp(3, timestamp);
- statement.setString(4, batchStatus.name());
- statement.setObject(5, serializeObject(jobParameters));
- statement.executeUpdate();
- rs = statement.getGeneratedKeys();
- if(rs.next()) {
- newJobExecutionId = rs.getLong(1);
- }
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } catch (IOException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
- return newJobExecutionId;
- }
-
- @Override
- public RuntimeFlowInSplitExecution createFlowInSplitExecution(JobInstance jobInstance, BatchStatus batchStatus) {
- Timestamp now = new Timestamp(System.currentTimeMillis());
- long newExecutionId = createRuntimeJobExecutionEntry(jobInstance, null, batchStatus, now);
- RuntimeFlowInSplitExecution flowExecution = new RuntimeFlowInSplitExecution(jobInstance, newExecutionId);
- flowExecution.setBatchStatus(batchStatus.name());
- flowExecution.setCreateTime(now);
- flowExecution.setLastUpdateTime(now);
- return flowExecution;
- }
-
- /* (non-Javadoc)
- * @see com.ibm.jbatch.container.services.IPersistenceManagerService#createStepExecution(long, com.ibm.jbatch.container.context.impl.StepContextImpl)
- */
- @Override
- public StepExecutionImpl createStepExecution(long rootJobExecId, StepContextImpl stepContext) {
- StepExecutionImpl stepExecution = null;
- String batchStatus = stepContext.getBatchStatus() == null ? BatchStatus.STARTING.name() : stepContext.getBatchStatus().name();
- String exitStatus = stepContext.getExitStatus();
- String stepName = stepContext.getStepName();
-
- long readCount = 0;
- long writeCount = 0;
- long commitCount = 0;
- long rollbackCount = 0;
- long readSkipCount = 0;
- long processSkipCount = 0;
- long filterCount = 0;
- long writeSkipCount = 0;
- Timestamp startTime = stepContext.getStartTimeTS();
- Timestamp endTime = stepContext.getEndTimeTS();
-
- Metric[] metrics = stepContext.getMetrics();
- for (int i = 0; i < metrics.length; i++) {
- if (metrics[i].getType().equals(MetricImpl.MetricType.READ_COUNT)) {
- readCount = metrics[i].getValue();
- } else if (metrics[i].getType().equals(MetricImpl.MetricType.WRITE_COUNT)) {
- writeCount = metrics[i].getValue();
- } else if (metrics[i].getType().equals(MetricImpl.MetricType.PROCESS_SKIP_COUNT)) {
- processSkipCount = metrics[i].getValue();
- } else if (metrics[i].getType().equals(MetricImpl.MetricType.COMMIT_COUNT)) {
- commitCount = metrics[i].getValue();
- } else if (metrics[i].getType().equals(MetricImpl.MetricType.ROLLBACK_COUNT)) {
- rollbackCount = metrics[i].getValue();
- } else if (metrics[i].getType().equals(MetricImpl.MetricType.READ_SKIP_COUNT)) {
- readSkipCount = metrics[i].getValue();
- } else if (metrics[i].getType().equals(MetricImpl.MetricType.FILTER_COUNT)) {
- filterCount = metrics[i].getValue();
- } else if (metrics[i].getType().equals(MetricImpl.MetricType.WRITE_SKIP_COUNT)) {
- writeSkipCount = metrics[i].getValue();
- }
- }
- Serializable persistentData = stepContext.getPersistentUserData();
-
- stepExecution = createStepExecution(rootJobExecId, batchStatus, exitStatus, stepName, readCount,
- writeCount, commitCount, rollbackCount, readSkipCount, processSkipCount, filterCount, writeSkipCount, startTime,
- endTime, persistentData);
-
- return stepExecution;
- }
-
-
- private StepExecutionImpl createStepExecution(long rootJobExecId, String batchStatus, String exitStatus, String stepName, long readCount,
- long writeCount, long commitCount, long rollbackCount, long readSkipCount, long processSkipCount, long filterCount,
- long writeSkipCount, Timestamp startTime, Timestamp endTime, Serializable persistentData) {
-
- logger.entering(CLASSNAME, "createStepExecution", new Object[] {rootJobExecId, batchStatus, exitStatus==null ? "<null>" : exitStatus, stepName, readCount,
- writeCount, commitCount, rollbackCount, readSkipCount, processSkipCount, filterCount, writeSkipCount, startTime == null ? "<null>" : startTime,
- endTime==null ? "<null>" :endTime , persistentData==null ? "<null>" : persistentData});
-
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- StepExecutionImpl stepExecution = null;
- String query = "INSERT INTO stepexecutioninstancedata (jobexecid, batchstatus, exitstatus, stepname, readcount,"
- + "writecount, commitcount, rollbackcount, readskipcount, processskipcount, filtercount, writeskipcount, starttime,"
- + "endtime, persistentdata) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement(query, Statement.RETURN_GENERATED_KEYS);
- statement.setLong(1, rootJobExecId);
- statement.setString(2, batchStatus);
- statement.setString(3, exitStatus);
- statement.setString(4, stepName);
- statement.setLong(5, readCount);
- statement.setLong(6, writeCount);
- statement.setLong(7, commitCount);
- statement.setLong(8, rollbackCount);
- statement.setLong(9, readSkipCount);
- statement.setLong(10, processSkipCount);
- statement.setLong(11, filterCount);
- statement.setLong(12, writeSkipCount);
- statement.setTimestamp(13, startTime);
- statement.setTimestamp(14, endTime);
- statement.setObject(15, serializeObject(persistentData));
-
- statement.executeUpdate();
- rs = statement.getGeneratedKeys();
- if(rs.next()) {
- long stepExecutionId = rs.getLong(1);
- stepExecution = new StepExecutionImpl(rootJobExecId, stepExecutionId);
- stepExecution.setStepName(stepName);
- }
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } catch (IOException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, null, statement);
- }
- logger.exiting(CLASSNAME, "createStepExecution");
-
- return stepExecution;
- }
-
- /* (non-Javadoc)
- * @see com.ibm.jbatch.container.services.IPersistenceManagerService#updateStepExecution(long, com.ibm.jbatch.container.context.impl.StepContextImpl)
- */
- @Override
- public void updateStepExecution(long rootJobExecId, StepContextImpl stepContext) {
- long stepExecutionId = stepContext.getStepExecutionId();
- String batchStatus = stepContext.getBatchStatus() == null ? BatchStatus.STARTING.name() : stepContext.getBatchStatus().name();
- String exitStatus = stepContext.getExitStatus();
- String stepName = stepContext.getStepName();
-
- long readCount = 0;
- long writeCount = 0;
- long commitCount = 0;
- long rollbackCount = 0;
- long readSkipCount = 0;
- long processSkipCount = 0;
- long filterCount = 0;
- long writeSkipCount = 0;
- Timestamp startTime = stepContext.getStartTimeTS();
- Timestamp endTime = stepContext.getEndTimeTS();
-
- Metric[] metrics = stepContext.getMetrics();
- for (int i = 0; i < metrics.length; i++) {
- if (metrics[i].getType().equals(MetricImpl.MetricType.READ_COUNT)) {
- readCount = metrics[i].getValue();
- } else if (metrics[i].getType().equals(MetricImpl.MetricType.WRITE_COUNT)) {
- writeCount = metrics[i].getValue();
- } else if (metrics[i].getType().equals(MetricImpl.MetricType.PROCESS_SKIP_COUNT)) {
- processSkipCount = metrics[i].getValue();
- } else if (metrics[i].getType().equals(MetricImpl.MetricType.COMMIT_COUNT)) {
- commitCount = metrics[i].getValue();
- } else if (metrics[i].getType().equals(MetricImpl.MetricType.ROLLBACK_COUNT)) {
- rollbackCount = metrics[i].getValue();
- } else if (metrics[i].getType().equals(MetricImpl.MetricType.READ_SKIP_COUNT)) {
- readSkipCount = metrics[i].getValue();
- } else if (metrics[i].getType().equals(MetricImpl.MetricType.FILTER_COUNT)) {
- filterCount = metrics[i].getValue();
- } else if (metrics[i].getType().equals(MetricImpl.MetricType.WRITE_SKIP_COUNT)) {
- writeSkipCount = metrics[i].getValue();
- }
- }
- Serializable persistentData = stepContext.getPersistentUserData();
-
- updateStepExecution(stepExecutionId, rootJobExecId, batchStatus, exitStatus, stepName, readCount,
- writeCount, commitCount, rollbackCount, readSkipCount, processSkipCount, filterCount,
- writeSkipCount, startTime, endTime, persistentData);
-
- }
-
-
- private void updateStepExecution(long stepExecutionId, long jobExecId, String batchStatus, String exitStatus, String stepName, long readCount,
- long writeCount, long commitCount, long rollbackCount, long readSkipCount, long processSkipCount, long filterCount,
- long writeSkipCount, Timestamp startTime, Timestamp endTime, Serializable persistentData) {
-
- logger.entering(CLASSNAME, "updateStepExecution", new Object[] {stepExecutionId, jobExecId, batchStatus, exitStatus==null ? "<null>" : exitStatus, stepName, readCount,
- writeCount, commitCount, rollbackCount, readSkipCount, processSkipCount, filterCount, writeSkipCount, startTime==null ? "<null>" : startTime,
- endTime==null ? "<null>" : endTime, persistentData==null ? "<null>" : persistentData});
-
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- StepExecutionImpl stepExecution = null;
- String query = "UPDATE stepexecutioninstancedata SET jobexecid = ?, batchstatus = ?, exitstatus = ?, stepname = ?, readcount = ?,"
- + "writecount = ?, commitcount = ?, rollbackcount = ?, readskipcount = ?, processskipcount = ?, filtercount = ?, writeskipcount = ?,"
- + " starttime = ?, endtime = ?, persistentdata = ? WHERE stepexecid = ?";
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement(query);
- statement.setLong(1, jobExecId);
- statement.setString(2, batchStatus);
- statement.setString(3, exitStatus);
- statement.setString(4, stepName);
- statement.setLong(5, readCount);
- statement.setLong(6, writeCount);
- statement.setLong(7, commitCount);
- statement.setLong(8, rollbackCount);
- statement.setLong(9, readSkipCount);
- statement.setLong(10, processSkipCount);
- statement.setLong(11, filterCount);
- statement.setLong(12, writeSkipCount);
- statement.setTimestamp(13, startTime);
- statement.setTimestamp(14, endTime);
- statement.setObject(15, serializeObject(persistentData));
- statement.setLong(16, stepExecutionId);
- statement.executeUpdate();
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } catch (IOException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, null, statement);
- }
-
- logger.exiting(CLASSNAME, "updateStepExecution");
- }
-
- /* (non-Javadoc)
- * @see com.ibm.jbatch.container.services.IPersistenceManagerService#createJobStatus(long)
- */
- @Override
- public JobStatus createJobStatus(long jobInstanceId) {
- logger.entering(CLASSNAME, "createJobStatus", jobInstanceId);
- Connection conn = null;
- PreparedStatement statement = null;
- JobStatus jobStatus = new JobStatus(jobInstanceId);
- try {
- conn = getConnection();
- statement = conn.prepareStatement("INSERT INTO jobstatus (id, obj) VALUES(?, ?)");
- statement.setLong(1, jobInstanceId);
- statement.setBytes(2, serializeObject(jobStatus));
- statement.executeUpdate();
-
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } catch (IOException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, null, statement);
- }
- logger.exiting(CLASSNAME, "createJobStatus");
- return jobStatus;
- }
-
- /* (non-Javadoc)
- * @see com.ibm.jbatch.container.services.IPersistenceManagerService#getJobStatus(long)
- */
- @Override
- public JobStatus getJobStatus(long instanceId) {
- logger.entering(CLASSNAME, "getJobStatus", instanceId);
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- RuntimeJobExecution jobExecution = null;
- String query = "SELECT obj FROM jobstatus WHERE id = ?";
- JobStatus jobStatus = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement(query);
- statement.setLong(1, instanceId);
- rs = statement.executeQuery();
- if(rs.next()) {
- jobStatus = (JobStatus)deserializeObject(rs.getBytes(1));
- }
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } catch (IOException e) {
- throw new PersistenceException(e);
- } catch (ClassNotFoundException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
- logger.exiting(CLASSNAME, "getJobStatus", jobStatus);
- return jobStatus;
- }
-
- /* (non-Javadoc)
- * @see com.ibm.jbatch.container.services.IPersistenceManagerService#updateJobStatus(long, com.ibm.jbatch.container.status.JobStatus)
- */
- @Override
- public void updateJobStatus(long instanceId, JobStatus jobStatus) {
- logger.entering(CLASSNAME, "updateJobStatus", new Object[] {instanceId, jobStatus});
- Connection conn = null;
- PreparedStatement statement = null;
- try {
- conn = getConnection();
- statement = conn.prepareStatement("UPDATE jobstatus SET obj = ? WHERE id = ?");
- statement.setBytes(1, serializeObject(jobStatus));
- statement.setLong(2, instanceId);
- statement.executeUpdate();
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } catch (IOException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, null, statement);
- }
- logger.exiting(CLASSNAME, "updateJobStatus");
- }
-
- /* (non-Javadoc)
- * @see com.ibm.jbatch.container.services.IPersistenceManagerService#createStepStatus(long)
- */
- @Override
- public StepStatus createStepStatus(long stepExecId) {
- logger.entering(CLASSNAME, "createStepStatus", stepExecId);
- Connection conn = null;
- PreparedStatement statement = null;
- StepStatus stepStatus = new StepStatus(stepExecId);
- try {
- conn = getConnection();
- statement = conn.prepareStatement("INSERT INTO stepstatus (id, obj) VALUES(?, ?)");
- statement.setLong(1, stepExecId);
- statement.setBytes(2, serializeObject(stepStatus));
- statement.executeUpdate();
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } catch (IOException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, null, statement);
- }
- logger.exiting(CLASSNAME, "createStepStatus");
- return stepStatus;
- }
-
- /* (non-Javadoc)
- * @see com.ibm.jbatch.container.services.IPersistenceManagerService#getStepStatus(long, java.lang.String)
- */
- @Override
- public StepStatus getStepStatus(long instanceId, String stepName) {
- logger.entering(CLASSNAME, "getStepStatus", new Object[] {instanceId, stepName});
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- RuntimeJobExecution jobExecution = null;
- String query = "SELECT obj FROM stepstatus WHERE id IN ("
- + "SELECT B.stepexecid FROM executioninstancedata A INNER JOIN stepexecutioninstancedata B ON A.jobexecid = B.jobexecid "
- + "WHERE A.jobinstanceid = ? and B.stepname = ?)";
- StepStatus stepStatus = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement(query);
- statement.setLong(1, instanceId);
- statement.setString(2, stepName);
- rs = statement.executeQuery();
- if(rs.next()) {
- stepStatus = (StepStatus)deserializeObject(rs.getBytes(1));
- }
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } catch (IOException e) {
- throw new PersistenceException(e);
- } catch (ClassNotFoundException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
- logger.exiting(CLASSNAME, "getStepStatus", stepStatus==null ? "<null>" : stepStatus);
- return stepStatus;
- }
-
- /* (non-Javadoc)
- * @see com.ibm.jbatch.container.services.IPersistenceManagerService#updateStepStatus(long, com.ibm.jbatch.container.status.StepStatus)
- */
- @Override
- public void updateStepStatus(long stepExecutionId, StepStatus stepStatus) {
- logger.entering(CLASSNAME, "updateStepStatus", new Object[] {stepExecutionId, stepStatus});
- Connection conn = null;
- PreparedStatement statement = null;
- try {
- conn = getConnection();
- statement = conn.prepareStatement("UPDATE stepstatus SET obj = ? WHERE id = ?");
- statement.setBytes(1, serializeObject(stepStatus));
- statement.setLong(2, stepExecutionId);
- statement.executeUpdate();
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } catch (IOException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, null, statement);
- }
- logger.exiting(CLASSNAME, "updateStepStatus");
- }
-
- /* (non-Javadoc)
- * @see com.ibm.jbatch.container.services.IPersistenceManagerService#getTagName(long)
- */
- @Override
- public String getTagName(long jobExecutionId) {
- logger.entering(CLASSNAME, "getTagName", jobExecutionId);
- String apptag = null;
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- String query = "SELECT A.apptag FROM jobinstancedata A INNER JOIN executioninstancedata B ON A.jobinstanceid = B.jobinstanceid"
- + " WHERE B.jobexecid = ?";
- try {
- conn = getConnection();
- statement = conn.prepareStatement(query);
- statement.setLong(1, jobExecutionId);
- rs = statement.executeQuery();
- if(rs.next()) {
- apptag = rs.getString(1);
- }
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
- logger.exiting(CLASSNAME, "getTagName");
- return apptag;
- }
-
- @Override
- public long getMostRecentExecutionId(long jobInstanceId) {
- logger.entering(CLASSNAME, "getMostRecentExecutionId", jobInstanceId);
- long mostRecentId = -1;
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- String query = "SELECT jobexecid FROM executioninstancedata WHERE jobinstanceid = ? ORDER BY createtime DESC";
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement(query);
- statement.setLong(1, jobInstanceId);
- rs = statement.executeQuery();
- if(rs.next()) {
- mostRecentId = rs.getLong(1);
- }
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
- logger.exiting(CLASSNAME, "getMostRecentExecutionId");
- return mostRecentId;
- }
-
- @Override
- public void shutdown() throws BatchContainerServiceException {
- // TODO Auto-generated method stub
-
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/services/impl/JDBCPersistenceManagerSQLConstants.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/services/impl/JDBCPersistenceManagerSQLConstants.java b/JSR352.Runtime/src/com/ibm/jbatch/container/services/impl/JDBCPersistenceManagerSQLConstants.java
deleted file mode 100755
index 0735d5d..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/services/impl/JDBCPersistenceManagerSQLConstants.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Copyright 2013 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package com.ibm.jbatch.container.services.impl;
-
- interface JDBCPersistenceManagerSQLConstants {
-
- final String JOBSTATUS_TABLE = "JOBSTATUS";
- final String STEPSTATUS_TABLE = "STEPSTATUS";
- final String CHECKPOINTDATA_TABLE = "CHECKPOINTDATA";
- final String JOBINSTANCEDATA_TABLE = "JOBINSTANCEDATA";
- final String EXECUTIONINSTANCEDATA_TABLE = "EXECUTIONINSTANCEDATA";
- final String STEPEXECUTIONINSTANCEDATA_TABLE = "STEPEXECUTIONINSTANCEDATA";
-
- final String CREATE_TAB_JOBSTATUS = "CREATE TABLE JOBSTATUS("
- + "id BIGINT CONSTRAINT JOBSTATUS_PK PRIMARY KEY,"
- + "obj BLOB,"
- + "CONSTRAINT JOBSTATUS_JOBINST_FK FOREIGN KEY (id) REFERENCES JOBINSTANCEDATA (jobinstanceid) ON DELETE CASCADE)";
- final String CREATE_TAB_STEPSTATUS = "CREATE TABLE STEPSTATUS("
- + "id BIGINT CONSTRAINT STEPSTATUS_PK PRIMARY KEY,"
- + "obj BLOB,"
- + "CONSTRAINT STEPSTATUS_STEPEXEC_FK FOREIGN KEY (id) REFERENCES STEPEXECUTIONINSTANCEDATA (stepexecid) ON DELETE CASCADE)";
- final String CREATE_TAB_CHECKPOINTDATA = "CREATE TABLE CHECKPOINTDATA("
- + "id VARCHAR(512),obj BLOB)";
- final String CREATE_TAB_JOBINSTANCEDATA = "CREATE TABLE JOBINSTANCEDATA("
- + "jobinstanceid BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) CONSTRAINT JOBINSTANCE_PK PRIMARY KEY,"
- + "name VARCHAR(512),"
- + "apptag VARCHAR(512))";
- final String CREATE_TAB_EXECUTIONINSTANCEDATA = "CREATE TABLE EXECUTIONINSTANCEDATA("
- + "jobexecid BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) CONSTRAINT JOBEXECUTION_PK PRIMARY KEY,"
- + "jobinstanceid BIGINT,"
- + "createtime TIMESTAMP,"
- + "starttime TIMESTAMP,"
- + "endtime TIMESTAMP,"
- + "updatetime TIMESTAMP,"
- + "parameters BLOB,"
- + "batchstatus VARCHAR(512),"
- + "exitstatus VARCHAR(512),"
- + "CONSTRAINT JOBINST_JOBEXEC_FK FOREIGN KEY (jobinstanceid) REFERENCES JOBINSTANCEDATA (jobinstanceid))";
- final String CREATE_TAB_STEPEXECUTIONINSTANCEDATA = "CREATE TABLE STEPEXECUTIONINSTANCEDATA("
- + "stepexecid BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) CONSTRAINT STEPEXECUTION_PK PRIMARY KEY,"
- + "jobexecid BIGINT,"
- + "batchstatus VARCHAR(512),"
- + "exitstatus VARCHAR(512),"
- + "stepname VARCHAR(512),"
- + "readcount INTEGER,"
- + "writecount INTEGER,"
- + "commitcount INTEGER,"
- + "rollbackcount INTEGER,"
- + "readskipcount INTEGER,"
- + "processskipcount INTEGER,"
- + "filtercount INTEGER,"
- + "writeskipcount INTEGER,"
- + "startTime TIMESTAMP,"
- + "endTime TIMESTAMP,"
- + "persistentData BLOB,"
- + "CONSTRAINT JOBEXEC_STEPEXEC_FK FOREIGN KEY (jobexecid) REFERENCES EXECUTIONINSTANCEDATA (jobexecid))";
-
- final String INSERT_JOBSTATUS = "insert into jobstatus values(?, ?)";
-
- final String UPDATE_JOBSTATUS = "update jobstatus set obj = ? where id = ?";
-
- final String SELECT_JOBSTATUS = "select id, obj from jobstatus where id = ?";
-
- final String DELETE_JOBSTATUS = "delete from jobstatus where id = ?";
-
- final String INSERT_STEPSTATUS = "insert into stepstatus values(?, ?)";
-
- final String UPDATE_STEPSTATUS = "update stepstatus set obj = ? where id = ?";
-
- final String SELECT_STEPSTATUS = "select id, obj from stepstatus where id = ?";
-
- final String DELETE_STEPSTATUS = "delete from stepstatus where id = ?";
-
- final String INSERT_CHECKPOINTDATA = "insert into checkpointdata values(?, ?)";
-
- final String UPDATE_CHECKPOINTDATA = "update checkpointdata set obj = ? where id = ?";
-
- final String SELECT_CHECKPOINTDATA = "select id, obj from checkpointdata where id = ?";
-
- final String CREATE_CHECKPOINTDATA_INDEX = "create index chk_index on checkpointdata(id)";
-
- final String DELETE_CHECKPOINTDATA = "delete from checkpointdata where id = ?";
-
- // JOB OPERATOR QUERIES
- final String INSERT_JOBINSTANCEDATA = "insert into jobinstancedata (name, apptag) values(?, ?)";
-
- final String INSERT_EXECUTIONDATA = "insert into executionInstanceData (jobinstanceid, parameters) values(?, ?)";
-
- final String SELECT_JOBINSTANCEDATA_COUNT = "select count(jobinstanceid) as jobinstancecount from jobinstancedata where name = ?";
-
- final String SELECT_JOBINSTANCEDATA_IDS = "select jobinstanceid from jobinstancedata where name = ? order by jobinstanceid desc";
-
- final String SELECT_JOBINSTANCEDATA_NAMES = "select name from jobinstancedata where apptag = ?";
- final String SELECT_JOBINSTANCEDATA_APPTAG = "select apptag from jobinstancedata where jobinstanceid = ?";
-
- final String START_TIME = "starttime";
- final String CREATE_TIME = "createtime";
- final String END_TIME = "endtime";
- final String UPDATE_TIME = "updatetime";
- final String BATCH_STATUS = "batchstatus";
- final String EXIT_STATUS = "exitstatus";
- final String INSTANCE_ID = "instanceId";
- final String JOBEXEC_ID = "jobexecid";
- final String STEPEXEC_ID = "stepexecid";
- final String STEPCONTEXT = "stepcontext";
- final String APPTAG = "apptag";
-}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/services/impl/JNDIDelegatingThreadPoolServiceImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/services/impl/JNDIDelegatingThreadPoolServiceImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/services/impl/JNDIDelegatingThreadPoolServiceImpl.java
deleted file mode 100755
index e1398bd..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/services/impl/JNDIDelegatingThreadPoolServiceImpl.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Copyright 2013 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.services.impl;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-
-import com.ibm.jbatch.container.exception.BatchContainerServiceException;
-import com.ibm.jbatch.container.util.BatchContainerConstants;
-import com.ibm.jbatch.spi.services.IBatchConfig;
-import com.ibm.jbatch.spi.services.IBatchThreadPoolService;
-import com.ibm.jbatch.spi.services.ParallelTaskResult;
-
-public class JNDIDelegatingThreadPoolServiceImpl implements IBatchThreadPoolService, BatchContainerConstants {
-
- private final static String sourceClass = JNDIDelegatingThreadPoolServiceImpl.class.getName();
- private final static Logger logger = Logger.getLogger(sourceClass);
-
- public final String DEFAULT_JNDI_LOCATION = "java:comp/DefaultManagedExecutorService";
- private String jndiLocation = null;
-
- public JNDIDelegatingThreadPoolServiceImpl() {
- // TODO Auto-generated constructor stub
- }
-
- @Override
- public void init(IBatchConfig batchConfig) {
- // Don't want to get/cache the actual threadpool here since we want to do a JNDI lookup each time.
- jndiLocation = batchConfig.getConfigProperties().getProperty(THREADPOOL_JNDI_LOCATION, DEFAULT_JNDI_LOCATION);
- }
-
- public void shutdown() throws BatchContainerServiceException {
- String method = "shutdown";
- if(logger.isLoggable(Level.FINER)) { logger.entering(sourceClass, method); }
-
- // We don't want to be responsible for cleaning up.
-
- if(logger.isLoggable(Level.FINER)) { logger.exiting(sourceClass, method); }
- }
-
- public void executeTask(Runnable work, Object config) {
- String method = "executeTask";
- if(logger.isLoggable(Level.FINER)) { logger.entering(sourceClass, method); }
-
- try {
- Context ctx = new InitialContext();
- ExecutorService delegateService = (ExecutorService)ctx.lookup(jndiLocation);
- delegateService.execute(work);
- } catch (NamingException e) {
- logger.severe("Lookup failed for JNDI name: " + jndiLocation);
- throw new BatchContainerServiceException(e);
- }
-
- if(logger.isLoggable(Level.FINER)) { logger.exiting(sourceClass, method); }
- }
-
- public ParallelTaskResult executeParallelTask(Runnable work, Object config) {
- String method = "executeParallelTask";
- ParallelTaskResult taskResult = null;
- if(logger.isLoggable(Level.FINER)) { logger.entering(sourceClass, method); }
-
- try {
- Context ctx = new InitialContext();
- ExecutorService delegateService = (ExecutorService)ctx.lookup(jndiLocation);
- Future result = delegateService.submit(work);
- taskResult = new JSEResultAdapter(result);
- } catch (NamingException e) {
- logger.severe("Lookup failed for JNDI name: " + jndiLocation);
- throw new BatchContainerServiceException(e);
- }
-
- if(logger.isLoggable(Level.FINER)) { logger.exiting(sourceClass, method); }
-
- return taskResult;
- }
-
-
-}