You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by st...@apache.org on 2014/03/30 12:35:49 UTC
[6/6] git commit: unify PersistenceManagerSercie impl names
unify PersistenceManagerSercie impl names
Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/67e63c08
Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/67e63c08
Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/67e63c08
Branch: refs/heads/master
Commit: 67e63c0814b7c5a196017f0d0ff604c03dc331ab
Parents: c47427d
Author: Mark Struberg <st...@apache.org>
Authored: Sun Mar 30 12:35:23 2014 +0200
Committer: Mark Struberg <st...@apache.org>
Committed: Sun Mar 30 12:35:23 2014 +0200
----------------------------------------------------------------------
jbatch/pom.xml | 6 +-
.../container/services/ServicesManager.java | 4 +-
.../persistence/JDBCPersistenceManager.java | 1617 ------------------
.../JDBCPersistenceManagerService.java | 1617 ++++++++++++++++++
.../JPAPersistenceManagerService.java | 849 +++++++++
.../persistence/JPAPersistenceService.java | 849 ---------
.../persistence/MemoryPersistenceManager.java | 666 --------
.../MemoryPersistenceManagerService.java | 666 ++++++++
.../org/apache/batchee/test/StepLauncher.java | 4 +-
9 files changed, 3139 insertions(+), 3139 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/67e63c08/jbatch/pom.xml
----------------------------------------------------------------------
diff --git a/jbatch/pom.xml b/jbatch/pom.xml
index c91b0ee..5de5065 100644
--- a/jbatch/pom.xml
+++ b/jbatch/pom.xml
@@ -159,7 +159,7 @@
<configuration>
<skip>${maven.test.skip}</skip>
<systemProperties>
- <PersistenceManagerService>org.apache.batchee.container.services.persistence.MemoryPersistenceManager</PersistenceManagerService>
+ <PersistenceManagerService>org.apache.batchee.container.services.persistence.MemoryPersistenceManagerService</PersistenceManagerService>
</systemProperties>
<suiteXmlFiles>
<suiteXmlFile>${tck.suite}</suiteXmlFile>
@@ -178,7 +178,7 @@
<configuration>
<skip>${maven.test.skip}</skip>
<systemProperties>
- <PersistenceManagerService>org.apache.batchee.container.services.persistence.JDBCPersistenceManager</PersistenceManagerService>
+ <PersistenceManagerService>org.apache.batchee.container.services.persistence.JDBCPersistenceManagerService</PersistenceManagerService>
</systemProperties>
<suiteXmlFiles>
<suiteXmlFile>${tck.suite}</suiteXmlFile>
@@ -197,7 +197,7 @@
<configuration>
<skip>${maven.test.skip}</skip>
<systemProperties>
- <PersistenceManagerService>org.apache.batchee.container.services.persistence.JPAPersistenceService</PersistenceManagerService>
+ <PersistenceManagerService>org.apache.batchee.container.services.persistence.JPAPersistenceManagerService</PersistenceManagerService>
</systemProperties>
<suiteXmlFiles>
<suiteXmlFile>${tck.suite}</suiteXmlFile>
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/67e63c08/jbatch/src/main/java/org/apache/batchee/container/services/ServicesManager.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/ServicesManager.java b/jbatch/src/main/java/org/apache/batchee/container/services/ServicesManager.java
index 888a9d2..f094129 100755
--- a/jbatch/src/main/java/org/apache/batchee/container/services/ServicesManager.java
+++ b/jbatch/src/main/java/org/apache/batchee/container/services/ServicesManager.java
@@ -26,7 +26,7 @@ import org.apache.batchee.container.services.factory.DefaultBatchArtifactFactory
import org.apache.batchee.container.services.kernel.DefaultBatchKernel;
import org.apache.batchee.container.services.loader.DefaultJobXMLLoaderService;
import org.apache.batchee.container.services.locator.SingletonLocator;
-import org.apache.batchee.container.services.persistence.MemoryPersistenceManager;
+import org.apache.batchee.container.services.persistence.MemoryPersistenceManagerService;
import org.apache.batchee.container.services.security.DefaultSecurityService;
import org.apache.batchee.container.services.status.DefaultJobStatusManager;
import org.apache.batchee.container.services.transaction.DefaultBatchTransactionService;
@@ -57,7 +57,7 @@ public class ServicesManager implements BatchContainerConstants {
private static final Map<String, String> SERVICE_IMPL_CLASS_NAMES = new ConcurrentHashMap<String, String>();
static {
SERVICE_IMPL_CLASS_NAMES.put(TransactionManagementService.class.getName(), DefaultBatchTransactionService.class.getName());
- SERVICE_IMPL_CLASS_NAMES.put(PersistenceManagerService.class.getName(), MemoryPersistenceManager.class.getName());
+ SERVICE_IMPL_CLASS_NAMES.put(PersistenceManagerService.class.getName(), MemoryPersistenceManagerService.class.getName());
SERVICE_IMPL_CLASS_NAMES.put(JobStatusManagerService.class.getName(), DefaultJobStatusManager.class.getName());
SERVICE_IMPL_CLASS_NAMES.put(BatchThreadPoolService.class.getName(), DefaultThreadPoolService.class.getName());
SERVICE_IMPL_CLASS_NAMES.put(BatchKernelService.class.getName(), DefaultBatchKernel.class.getName());
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/67e63c08/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JDBCPersistenceManager.java
----------------------------------------------------------------------
diff --git a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JDBCPersistenceManager.java b/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JDBCPersistenceManager.java
deleted file mode 100755
index 6b8aeb2..0000000
--- a/jbatch/src/main/java/org/apache/batchee/container/services/persistence/JDBCPersistenceManager.java
+++ /dev/null
@@ -1,1617 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.batchee.container.services.persistence;
-
-import org.apache.batchee.container.exception.BatchContainerServiceException;
-import org.apache.batchee.container.exception.PersistenceException;
-import org.apache.batchee.container.impl.JobExecutionImpl;
-import org.apache.batchee.container.impl.JobInstanceImpl;
-import org.apache.batchee.container.impl.MetricImpl;
-import org.apache.batchee.container.impl.StepContextImpl;
-import org.apache.batchee.container.impl.StepExecutionImpl;
-import org.apache.batchee.container.impl.controller.chunk.CheckpointData;
-import org.apache.batchee.container.impl.controller.chunk.CheckpointDataKey;
-import org.apache.batchee.container.impl.controller.chunk.PersistentDataWrapper;
-import org.apache.batchee.container.impl.jobinstance.RuntimeFlowInSplitExecution;
-import org.apache.batchee.container.impl.jobinstance.RuntimeJobExecution;
-import org.apache.batchee.container.services.InternalJobExecution;
-import org.apache.batchee.container.services.persistence.jdbc.Dictionary;
-import org.apache.batchee.container.services.persistence.jdbc.database.Database;
-import org.apache.batchee.container.services.persistence.jdbc.database.DerbyDatabase;
-import org.apache.batchee.container.status.JobStatus;
-import org.apache.batchee.container.status.StepStatus;
-import org.apache.batchee.container.util.TCCLObjectInputStream;
-import org.apache.batchee.spi.PersistenceManagerService;
-
-import javax.batch.operations.NoSuchJobExecutionException;
-import javax.batch.runtime.BatchStatus;
-import javax.batch.runtime.JobInstance;
-import javax.batch.runtime.Metric;
-import javax.batch.runtime.StepExecution;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-import javax.sql.DataSource;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.PrintWriter;
-import java.io.Serializable;
-import java.io.StringWriter;
-import java.nio.charset.Charset;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import static org.apache.batchee.container.util.Serializations.deserialize;
-import static org.apache.batchee.container.util.Serializations.serialize;
-
-public class JDBCPersistenceManager implements PersistenceManagerService {
- private static final Charset UTF_8 = Charset.forName("UTF-8");
-
- static interface Defaults {
- final String JDBC_DRIVER = "org.apache.derby.jdbc.EmbeddedDriver";
- final String JDBC_URL = "jdbc:derby:memory:batchee;create=true";
- final String JDBC_USER = "app";
- final String JDBC_PASSWORD = "app";
- final String SCHEMA = "BATCHEE";
- }
-
- private Dictionary dictionary;
-
- protected DataSource dataSource = null;
- protected String jndiName = null;
-
- protected String driver = "";
- protected String schema = "";
- protected String url = "";
- protected String user = "";
- protected String pwd = "";
-
- @Override
- public void init(final Properties batchConfig) throws BatchContainerServiceException {
- final boolean hasSchema = "true".equalsIgnoreCase(batchConfig.getProperty("persistence.database.has-schema", "true"));
- if (hasSchema) {
- schema = batchConfig.getProperty("persistence.database.schema", Defaults.SCHEMA);
- } else {
- schema = null;
- }
-
- if (batchConfig.containsKey("persistence.database.jndi")) {
- jndiName = batchConfig.getProperty("persistence.database.jndi", "");
- if (jndiName.isEmpty()) {
- throw new BatchContainerServiceException("JNDI name is not defined.");
- }
-
- try {
- final Context ctx = new InitialContext();
- dataSource = DataSource.class.cast(ctx.lookup(jndiName));
- } catch (final NamingException e) {
- throw new BatchContainerServiceException(e);
- }
-
- } else {
- driver = batchConfig.getProperty("persistence.database.driver", Defaults.JDBC_DRIVER);
- url = batchConfig.getProperty("persistence.database.url", Defaults.JDBC_URL);
- user = batchConfig.getProperty("persistence.database.user", Defaults.JDBC_USER);
- pwd = batchConfig.getProperty("persistence.database.password", Defaults.JDBC_PASSWORD);
- }
-
- try {
- initDictionary(batchConfig);
-
- if ("create".equalsIgnoreCase(batchConfig.getProperty("persistence.database.ddl", "create"))) {
- if (hasSchema && !isSchemaValid()) {
- createSchema();
- }
- checkAllTables();
- }
- } catch (final SQLException e) {
- throw new BatchContainerServiceException(e);
- }
- }
-
- private void initDictionary(final Properties batchConfig) throws BatchContainerServiceException, SQLException {
- final String type = batchConfig.getProperty("persistence.database.db-dictionary", guessDictionary());
- if (type == null) {
- throw new IllegalArgumentException("You need to provide a db dictionary for this database");
- }
-
- try {
- final Database database = Database.class.cast(Thread.currentThread().getContextClassLoader().loadClass(type).newInstance());
- dictionary = new Dictionary(
- batchConfig.getProperty("persistence.database.tables.checkpoint", "checkpointentity"),
- batchConfig.getProperty("persistence.database.tables.job-instance", "jobinstanceentity"),
- batchConfig.getProperty("persistence.database.tables.job-execution", "jobexecutionentity"),
- batchConfig.getProperty("persistence.database.tables.step-execution", "stepexecutionentity"),
- database);
- } catch (final Exception e) {
- throw new BatchContainerServiceException(e);
- }
- }
-
- /**
- * Checks if the default schema JBATCH or the schema defined in batch-config exists.
- *
- * @return true if the schema exists, false otherwise.
- * @throws SQLException
- */
- private boolean isSchemaValid() throws SQLException {
- final Connection conn = getConnectionToDefaultSchema();
- final DatabaseMetaData dbmd = conn.getMetaData();
- final ResultSet rs = dbmd.getSchemas();
- while (rs.next()) {
- if (schema.equalsIgnoreCase(rs.getString("TABLE_SCHEM"))) {
- cleanupConnection(conn, rs, null);
- return true;
- }
- }
- cleanupConnection(conn, rs, null);
- return false;
- }
-
- private String guessDictionary() throws SQLException {
- final Connection conn = getConnectionToDefaultSchema();
- final String pn;
- try {
- final DatabaseMetaData dbmd = conn.getMetaData();
- pn = dbmd.getDatabaseProductName().toLowerCase();
- } finally {
- conn.close();
- }
-
- if (pn.contains("derby")) {
- return DerbyDatabase.class.getName();
- }
- /* TODO
- if (pn.contains("mysql")) {
- return "mysql";
- }
- if (pn.contains("oracle")) {
- return "oracle";
- }
- if (pn.contains("hsql")) {
- return "hsql";
- }
- */
- return null;
- }
-
- private void createSchema() throws SQLException {
- final Connection conn = getConnectionToDefaultSchema();
- final PreparedStatement ps = conn.prepareStatement("CREATE SCHEMA " + schema);
- ps.execute();
- cleanupConnection(conn, null, ps);
- }
-
- /**
- * Checks if all the runtime batch table exists. If not, it creates them.
- *
- * @throws SQLException
- */
- private void checkAllTables() throws SQLException {
- createIfNotExists(dictionary.getCheckpointTable(), dictionary.getCreateCheckpointTable());
- createIfNotExists(dictionary.getJobExecutionTable(), dictionary.getCreateJobExecutionTable());
- createIfNotExists(dictionary.getJobInstanceTable(), dictionary.getCreateJobInstanceTable());
- createIfNotExists(dictionary.getStepExecutionTable(), dictionary.getCreateStepExecutionTable());
- }
-
- protected Connection getConnection() throws SQLException {
- final Connection connection;
- if (dataSource != null) {
- connection = dataSource.getConnection();
- } else {
- try {
- Class.forName(driver);
- } catch (ClassNotFoundException e) {
- throw new PersistenceException(e);
- }
- connection = DriverManager.getConnection(url, user, pwd);
- }
- setSchemaOnConnection(connection);
-
- return connection;
- }
-
- private void createIfNotExists(final String tableName, final String createTableStatement) throws SQLException {
- final Connection conn = getConnection();
- final DatabaseMetaData dbmd = conn.getMetaData();
- final ResultSet rs = dbmd.getTables(null, schema, tableName, null);
-
- PreparedStatement ps = null;
- if (!rs.next()) {
- ps = conn.prepareStatement(createTableStatement);
- ps.executeUpdate();
- }
-
- cleanupConnection(conn, rs, ps);
- }
-
- public void createCheckpointData(final CheckpointDataKey key, final CheckpointData value) {
- Connection conn = null;
- PreparedStatement statement = null;
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getInsertCheckpoint());
-
- statement.setBytes(1, value.getRestartToken());
- statement.setString(2, key.getStepName());
- statement.setString(3, key.getType().name());
- statement.setLong(4, key.getJobInstanceId());
- statement.executeUpdate();
- if (!conn.getAutoCommit()) {
- conn.commit();
- }
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, null, statement);
- }
- }
-
- @Override
- public CheckpointData getCheckpointData(final CheckpointDataKey key) {
- return queryCheckpointData(key);
- }
-
- @Override
- public void setCheckpointData(final CheckpointDataKey key, final CheckpointData value) {
- if (queryCheckpointData(key) != null) {
- updateCheckpointData(key, value);
- } else {
- createCheckpointData(key, value);
- }
- }
-
- /**
- * @return the database connection. The schema is set to whatever default its used by the underlying database.
- * @throws SQLException
- */
- protected Connection getConnectionToDefaultSchema() throws SQLException {
- final Connection connection;
- if (dataSource != null) {
- try {
- connection = dataSource.getConnection();
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- }
- } else {
- try {
- Class.forName(driver);
- } catch (final ClassNotFoundException e) {
- final StringWriter sw = new StringWriter();
- final PrintWriter pw = new PrintWriter(sw);
- e.printStackTrace(pw);
- throw new PersistenceException(e);
- }
- try {
- connection = DriverManager.getConnection(url, user, pwd);
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- }
- }
- return connection;
- }
-
- private void setSchemaOnConnection(final Connection connection) throws SQLException {
- if (schema == null) {
- return;
- }
-
- final PreparedStatement ps = connection.prepareStatement("set schema ?");
- ps.setString(1, schema);
- ps.executeUpdate();
- ps.close();
- }
-
- /**
- * select data from DB table
- *
- * @param key - the IPersistenceDataKey object
- * @return List of serializable objects store in the DB table
- * <p/>
- * Ex. select id, obj from tablename where id = ?
- */
- private CheckpointData queryCheckpointData(final CheckpointDataKey key) {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getSelectCheckpoint());
- statement.setLong(1, key.getJobInstanceId());
- statement.setString(2, key.getType().name());
- statement.setString(3, key.getStepName());
- rs = statement.executeQuery();
- if (rs.next()) {
- final CheckpointData data = new CheckpointData(key.getJobInstanceId(), key.getStepName(), key.getType());
- data.setRestartToken(rs.getBytes(dictionary.checkpointColumn(1)));
- return data;
- }
- return null;
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
- }
-
- /**
- * update data in DB table
- *
- * @param value - serializable object to store
- * @param key - the IPersistenceDataKey object
- * <p/>
- * Ex. update tablename set obj = ? where id = ?
- */
- private void updateCheckpointData(final CheckpointDataKey key, final CheckpointData value) {
- Connection conn = null;
- PreparedStatement statement = null;
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getUpdateCheckpoint());
- statement.setBytes(1, value.getRestartToken());
- statement.setLong(2, key.getJobInstanceId());
- statement.setString(3, key.getType().name());
- statement.setString(4, key.getStepName());
- statement.executeUpdate();
- if (!conn.getAutoCommit()) {
- conn.commit();
- }
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, null, statement);
- }
- }
-
-
- /**
- * closes connection, result set and statement
- *
- * @param conn - connection object to close
- * @param rs - result set object to close
- * @param statement - statement object to close
- */
- private void cleanupConnection(final Connection conn, final ResultSet rs, final PreparedStatement statement) {
- if (statement != null) {
- try {
- statement.close();
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- }
- }
-
- if (rs != null) {
- try {
- rs.close();
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- }
- }
-
- if (conn != null) {
- Exception thrown = null;
- try {
- conn.close();
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- } finally {
- try {
- conn.close();
- } catch (final SQLException e) {
- thrown = e;
- }
- }
- if (thrown != null) {
- throw new PersistenceException(thrown);
- }
- }
- }
-
- @Override
- public int jobOperatorGetJobInstanceCount(final String jobName, final String appTag) {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getCountJobInstanceByNameAndTag());
- statement.setString(1, jobName);
- statement.setString(2, appTag);
- rs = statement.executeQuery();
- rs.next();
- return rs.getInt("jobinstancecount"); // not a column name so ok
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
- }
-
- @Override
- public int jobOperatorGetJobInstanceCount(final String jobName) {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getCountJobInstanceByName());
- statement.setString(1, jobName);
- rs = statement.executeQuery();
- rs.next();
- return rs.getInt("jobinstancecount"); // not a column name so ok
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
- }
-
-
- @Override
- public List<Long> jobOperatorGetJobInstanceIds(final String jobName, final String appTag, final int start, final int count) {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
-
- final List<Long> data = new ArrayList<Long>();
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getFindJoBInstanceIds());
- statement.setObject(1, jobName);
- statement.setObject(2, appTag);
- rs = statement.executeQuery();
- while (rs.next()) {
- data.add(rs.getLong(dictionary.jobInstanceColumns(0)));
- }
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
-
- if (!data.isEmpty()) {
- try {
- return data.subList(start, start + count);
- } catch (final IndexOutOfBoundsException oobEx) {
- return data.subList(start, data.size());
- }
- }
- return data;
- }
-
- @Override
- public List<Long> jobOperatorGetJobInstanceIds(final String jobName, final int start, final int count) {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
-
- final List<Long> data = new ArrayList<Long>();
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getFindJobInstanceIdsByName());
- statement.setObject(1, jobName);
- rs = statement.executeQuery();
- while (rs.next()) {
- data.add(rs.getLong(dictionary.jobInstanceColumns(0)));
- }
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
-
- if (data.size() > 0) {
- try {
- return data.subList(start, start + count);
- } catch (final IndexOutOfBoundsException oobEx) {
- return data.subList(start, data.size());
- }
- }
- return data;
- }
-
- @Override
- public Map<Long, String> jobOperatorGetExternalJobInstanceData() {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
-
- final Map<Long, String> data = new HashMap<Long, String>();
-
- try {
- conn = getConnection();
-
- // Filter out 'subjob' parallel execution entries which start with the special character
- statement = conn.prepareStatement(dictionary.getFindExternalJobInstances());
- rs = statement.executeQuery();
- while (rs.next()) {
- data.put(rs.getLong(dictionary.jobInstanceColumns(0)), rs.getString(dictionary.jobInstanceColumns(3)));
- }
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
-
- return data;
- }
-
- @Override
- public Timestamp jobOperatorQueryJobExecutionTimestamp(final long key, final TimestampType timestampType) {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- Timestamp createTimestamp = null;
- Timestamp endTimestamp = null;
- Timestamp updateTimestamp = null;
- Timestamp startTimestamp = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getFindJobExecutionTimestamps());
- statement.setLong(1, key);
- rs = statement.executeQuery();
- while (rs.next()) {
- createTimestamp = rs.getTimestamp(1);
- endTimestamp = rs.getTimestamp(2);
- updateTimestamp = rs.getTimestamp(3);
- startTimestamp = rs.getTimestamp(4);
- }
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
-
- if (timestampType.equals(TimestampType.CREATE)) {
- return createTimestamp;
- } else if (timestampType.equals(TimestampType.END)) {
- return endTimestamp;
- } else if (timestampType.equals(TimestampType.LAST_UPDATED)) {
- return updateTimestamp;
- } else if (timestampType.equals(TimestampType.STARTED)) {
- return startTimestamp;
- }
- throw new IllegalArgumentException("Unexpected enum value.");
- }
-
- @Override
- public String jobOperatorQueryJobExecutionBatchStatus(final long key) {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getFindJobExecutionBatchStatus());
- statement.setLong(1, key);
- rs = statement.executeQuery();
- if (rs.next()) {
- return rs.getString(1);
- }
- return null;
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
- }
-
-
- @Override
- public String jobOperatorQueryJobExecutionExitStatus(final long key) {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getFindJobExecutionExitStatus());
- statement.setLong(1, key);
- rs = statement.executeQuery();
- if (rs.next()) {
- return rs.getString(1);
- }
- return null;
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
- }
-
- @Override
- public Properties getParameters(final long executionId) throws NoSuchJobExecutionException {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getFindJobExecutionJobProperties());
- statement.setLong(1, executionId);
- rs = statement.executeQuery();
-
- if (rs.next()) {
- final byte[] buf = rs.getBytes(dictionary.jobExecutionColumns(5));
- return Properties.class.cast(deserialize(buf));
- }
- throw new NoSuchJobExecutionException("Did not find table entry for executionID =" + executionId);
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- } catch (final IOException e) {
- throw new PersistenceException(e);
- } catch (final ClassNotFoundException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
- }
-
- @Override
- public List<StepExecution> getStepExecutionsForJobExecution(final long execid) {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
-
- long jobexecid;
- long stepexecid;
- String stepname;
- String batchstatus;
- String exitstatus;
- long readCount;
- long writeCount;
- long commitCount;
- long rollbackCount;
- long readSkipCount;
- long processSkipCount;
- long filterCount;
- long writeSkipCount;
- Timestamp startTS;
- Timestamp endTS;
- StepExecutionImpl stepEx;
- ObjectInputStream objectIn;
-
- final List<StepExecution> data = new ArrayList<StepExecution>();
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getFinStepExecutionFromJobExecution());
- statement.setLong(1, execid);
- rs = statement.executeQuery();
-
- while (rs.next()) {
- jobexecid = rs.getLong(dictionary.stepExecutionColumns(18));
- stepexecid = rs.getLong(dictionary.stepExecutionColumns(0));
- stepname = rs.getString(dictionary.stepExecutionColumns(15));
- batchstatus = rs.getString(dictionary.stepExecutionColumns(1));
- exitstatus = rs.getString(dictionary.stepExecutionColumns(4));
- readCount = rs.getLong(dictionary.stepExecutionColumns(10));
- writeCount = rs.getLong(dictionary.stepExecutionColumns(16));
- commitCount = rs.getLong(dictionary.stepExecutionColumns(2));
- rollbackCount = rs.getLong(dictionary.stepExecutionColumns(12));
- readSkipCount = rs.getLong(dictionary.stepExecutionColumns(11));
- processSkipCount = rs.getLong(dictionary.stepExecutionColumns(9));
- filterCount = rs.getLong(dictionary.stepExecutionColumns(5));
- writeSkipCount = rs.getLong(dictionary.stepExecutionColumns(17));
- startTS = rs.getTimestamp(dictionary.stepExecutionColumns(14));
- endTS = rs.getTimestamp(dictionary.stepExecutionColumns(3));
-
- // get the object based data
- Serializable persistentData = null;
- final byte[] pDataBytes = rs.getBytes(dictionary.stepExecutionColumns(8));
- if (pDataBytes != null) {
- objectIn = new TCCLObjectInputStream(new ByteArrayInputStream(pDataBytes));
- persistentData = (Serializable) objectIn.readObject();
- }
-
- stepEx = new StepExecutionImpl(jobexecid, stepexecid);
-
- stepEx.setBatchStatus(BatchStatus.valueOf(batchstatus));
- stepEx.setExitStatus(exitstatus);
- stepEx.setStepName(stepname);
- stepEx.setReadCount(readCount);
- stepEx.setWriteCount(writeCount);
- stepEx.setCommitCount(commitCount);
- stepEx.setRollbackCount(rollbackCount);
- stepEx.setReadSkipCount(readSkipCount);
- stepEx.setProcessSkipCount(processSkipCount);
- stepEx.setFilterCount(filterCount);
- stepEx.setWriteSkipCount(writeSkipCount);
- stepEx.setStartTime(startTS);
- stepEx.setEndTime(endTS);
- stepEx.setPersistentUserData(persistentData);
-
- data.add(stepEx);
- }
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- } catch (final IOException e) {
- throw new PersistenceException(e);
- } catch (final ClassNotFoundException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
-
- return data;
- }
-
-
- @Override
- public StepExecution getStepExecutionByStepExecutionId(final long stepExecId) {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getFindStepExecutionFromId());
- statement.setLong(1, stepExecId);
- rs = statement.executeQuery();
- if (rs.next()) {
- final long jobexecid = rs.getLong(dictionary.stepExecutionColumns(18));
- final long stepExecutionId = rs.getLong(dictionary.stepExecutionColumns(0));
- final String stepName = rs.getString(dictionary.stepExecutionColumns(15));
- final String batchstatus = rs.getString(dictionary.stepExecutionColumns(1));
- final String exitstatus = rs.getString(dictionary.stepExecutionColumns(4));
- final long readCount = rs.getLong(dictionary.stepExecutionColumns(10));
- final long writeCount = rs.getLong(dictionary.stepExecutionColumns(16));
- final long commitCount = rs.getLong(dictionary.stepExecutionColumns(2));
- final long rollbackCount = rs.getLong(dictionary.stepExecutionColumns(12));
- final long readSkipCount = rs.getLong(dictionary.stepExecutionColumns(11));
- final long processSkipCount = rs.getLong(dictionary.stepExecutionColumns(9));
- final long filterCount = rs.getLong(dictionary.stepExecutionColumns(5));
- final long writeSkipCount = rs.getLong(dictionary.stepExecutionColumns(17));
- final Timestamp startTS = rs.getTimestamp(dictionary.stepExecutionColumns(14));
- final Timestamp endTS = rs.getTimestamp(dictionary.stepExecutionColumns(3));
-
- // get the object based data
- Serializable persistentData = null;
- final byte[] pDataBytes = rs.getBytes(dictionary.stepExecutionColumns(8));
- if (pDataBytes != null) {
- persistentData = Serializable.class.cast(new TCCLObjectInputStream(new ByteArrayInputStream(pDataBytes)).readObject());
- }
-
- final StepExecutionImpl stepEx = new StepExecutionImpl(jobexecid, stepExecutionId);
-
- stepEx.setBatchStatus(BatchStatus.valueOf(batchstatus));
- stepEx.setExitStatus(exitstatus);
- stepEx.setStepName(stepName);
- stepEx.setReadCount(readCount);
- stepEx.setWriteCount(writeCount);
- stepEx.setCommitCount(commitCount);
- stepEx.setRollbackCount(rollbackCount);
- stepEx.setReadSkipCount(readSkipCount);
- stepEx.setProcessSkipCount(processSkipCount);
- stepEx.setFilterCount(filterCount);
- stepEx.setWriteSkipCount(writeSkipCount);
- stepEx.setStartTime(startTS);
- stepEx.setEndTime(endTS);
- stepEx.setPersistentUserData(persistentData);
- return stepEx;
- }
- return null;
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } catch (IOException e) {
- throw new PersistenceException(e);
- } catch (ClassNotFoundException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
- }
-
- @Override
- public void updateBatchStatusOnly(final long key, final BatchStatus batchStatus, final Timestamp updatets) {
- Connection conn = null;
- PreparedStatement statement = null;
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getUpdateJobExecution());
- statement.setString(1, batchStatus.name());
- statement.setTimestamp(2, updatets);
- statement.setLong(3, key);
- statement.executeUpdate();
- if (!conn.getAutoCommit()) {
- conn.commit();
- }
- } catch (final SQLException e) {
- e.printStackTrace();
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, null, statement);
- }
- }
-
- @Override
- public void updateWithFinalExecutionStatusesAndTimestamps(final long key, final BatchStatus batchStatus,
- final String exitStatus, final Timestamp updatets) {
- Connection conn = null;
- PreparedStatement statement = null;
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getSetJobExecutionFinalData());
-
- statement.setString(1, batchStatus.name());
- statement.setString(2, exitStatus);
- statement.setTimestamp(3, updatets);
- statement.setTimestamp(4, updatets);
- statement.setLong(5, key);
-
- statement.executeUpdate();
- if (!conn.getAutoCommit()) {
- conn.commit();
- }
- } catch (final SQLException e) {
- e.printStackTrace();
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, null, statement);
- }
- }
-
- public void markJobStarted(final long key, final Timestamp startTS) {
- Connection conn = null;
- PreparedStatement statement = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getUpdateStartedJobExecution());
-
- statement.setString(1, BatchStatus.STARTED.name());
- statement.setTimestamp(2, startTS);
- statement.setTimestamp(3, startTS);
- statement.setLong(4, key);
-
- statement.executeUpdate();
- if (!conn.getAutoCommit()) {
- conn.commit();
- }
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, null, statement);
- }
- }
-
-
- @Override
- public InternalJobExecution jobOperatorGetJobExecution(long jobExecutionId) {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getFindJobExecutionById());
- statement.setLong(1, jobExecutionId);
- rs = statement.executeQuery();
- if (rs.next()) {
- final Timestamp createtime = rs.getTimestamp(dictionary.jobExecutionColumns(2));
- final Timestamp starttime = rs.getTimestamp(dictionary.jobExecutionColumns(6));
- final Timestamp endtime = rs.getTimestamp(dictionary.jobExecutionColumns(3));
- final Timestamp updatetime = rs.getTimestamp(dictionary.jobExecutionColumns(7));
- final long instanceId = rs.getLong(dictionary.jobExecutionColumns(8));
-
- // get the object based data
- final String batchStatus = rs.getString(dictionary.jobExecutionColumns(1));
- final String exitStatus = rs.getString(dictionary.jobExecutionColumns(4));
-
- // get the object based data
- final byte[] buf = rs.getBytes(dictionary.jobExecutionColumns(5));
- final Properties params = Properties.class.cast(deserialize(buf));
-
- final String jobName = rs.getString(dictionary.jobInstanceColumns(3));
-
- final JobExecutionImpl jobEx = new JobExecutionImpl(jobExecutionId, instanceId, this);
- jobEx.setCreateTime(createtime);
- jobEx.setStartTime(starttime);
- jobEx.setEndTime(endtime);
- jobEx.setJobParameters(params);
- jobEx.setLastUpdateTime(updatetime);
- jobEx.setBatchStatus(batchStatus);
- jobEx.setExitStatus(exitStatus);
- jobEx.setJobName(jobName);
- return jobEx;
- }
- return null;
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- } catch (final IOException e) {
- throw new PersistenceException(e);
- } catch (final ClassNotFoundException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
- }
-
- @Override
- public List<InternalJobExecution> jobOperatorGetJobExecutions(final long jobInstanceId) {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
-
- final List<InternalJobExecution> data = new ArrayList<InternalJobExecution>();
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getFindJobExecutionByInstance());
- statement.setLong(1, jobInstanceId);
- rs = statement.executeQuery();
- while (rs.next()) {
- final long jobExecutionId = rs.getLong(dictionary.jobExecutionColumns(0));
- final Timestamp createtime = rs.getTimestamp(dictionary.jobExecutionColumns(2));
- final Timestamp starttime = rs.getTimestamp(dictionary.jobExecutionColumns(6));
- final Timestamp endtime = rs.getTimestamp(dictionary.jobExecutionColumns(3));
- final Timestamp updatetime = rs.getTimestamp(dictionary.jobExecutionColumns(7));
- final String batchStatus = rs.getString(dictionary.jobExecutionColumns(1));
- final String exitStatus = rs.getString(dictionary.jobExecutionColumns(4));
- final String jobName = rs.getString(dictionary.jobInstanceColumns(3));
-
- final JobExecutionImpl jobEx = new JobExecutionImpl(jobExecutionId, jobInstanceId, this);
- jobEx.setCreateTime(createtime);
- jobEx.setStartTime(starttime);
- jobEx.setEndTime(endtime);
- jobEx.setLastUpdateTime(updatetime);
- jobEx.setBatchStatus(batchStatus);
- jobEx.setExitStatus(exitStatus);
- jobEx.setJobName(jobName);
-
- data.add(jobEx);
- }
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
- return data;
- }
-
- @Override
- public Set<Long> jobOperatorGetRunningExecutions(String jobName) {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
-
- final Set<Long> executionIds = new HashSet<Long>();
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getFindRunningJobExecutions());
- statement.setString(1, BatchStatus.STARTED.name());
- statement.setString(2, BatchStatus.STARTING.name());
- statement.setString(3, BatchStatus.STOPPING.name());
- statement.setString(4, jobName);
- rs = statement.executeQuery();
- while (rs.next()) {
- executionIds.add(rs.getLong(dictionary.jobExecutionColumns(0)));
- }
-
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
- return executionIds;
- }
-
- @Override
- public JobStatus getJobStatusFromExecution(final long executionId) {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getFindJobStatus());
- statement.setLong(1, executionId);
- rs = statement.executeQuery();
- if (rs.next()) {
- final long jobInstanceId = rs.getLong(dictionary.jobExecutionColumns(8));
- final String batchStatus = rs.getString(dictionary.jobExecutionColumns(1));
- final byte[] jobXmls = rs.getBytes(dictionary.jobInstanceColumns(4));
- final JobInstanceImpl jobInstance;
- if (jobXmls != null) {
- jobInstance = new JobInstanceImpl(jobInstanceId, new String(jobXmls, UTF_8));
- } else {
- jobInstance = new JobInstanceImpl(jobInstanceId);
- }
- jobInstance.setJobName(rs.getString(dictionary.jobInstanceColumns(3)));
-
- final JobStatus status = new JobStatus(jobInstanceId);
- status.setExitStatus(rs.getString(dictionary.jobInstanceColumns(2)));
- status.setLatestExecutionId(rs.getLong(dictionary.jobInstanceColumns(5)));
- status.setRestartOn(rs.getString(dictionary.jobInstanceColumns(6)));
- status.setCurrentStepId(rs.getString(dictionary.jobInstanceColumns(7)));
- status.setJobInstance(jobInstance);
- if (batchStatus != null) {
- status.setBatchStatus(BatchStatus.valueOf(batchStatus));
- }
-
- return status;
- }
- return null;
- } catch (final Exception e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
- }
-
- public long getJobInstanceIdByExecutionId(final long executionId) throws NoSuchJobExecutionException {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getFindJobInstanceFromJobExecution());
- statement.setObject(1, executionId);
- rs = statement.executeQuery();
- if (!rs.next()) {
- throw new NoSuchJobExecutionException("Did not find job instance associated with executionID =" + executionId);
- }
- return rs.getLong(dictionary.jobExecutionColumns(8));
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
- }
-
- @Override
- public JobInstance createSubJobInstance(final String name, final String apptag) {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- JobInstanceImpl jobInstance = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getCreateJobInstance(), Statement.RETURN_GENERATED_KEYS);
- statement.setString(1, name);
- statement.setString(2, apptag);
- statement.executeUpdate();
- if (!conn.getAutoCommit()) {
- conn.commit();
- }
- rs = statement.getGeneratedKeys();
- if (rs.next()) {
- final long jobInstanceID = rs.getLong(1);
- jobInstance = new JobInstanceImpl(jobInstanceID);
- jobInstance.setJobName(name);
- }
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
- return jobInstance;
- }
-
- @Override
- public JobInstance createJobInstance(final String name, final String apptag, final String jobXml) {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getCreateJobInstanceWithJobXml(), Statement.RETURN_GENERATED_KEYS);
- statement.setString(1, name);
- statement.setString(2, apptag);
- statement.setBytes(3, jobXml.getBytes(UTF_8));
- statement.executeUpdate();
- if (!conn.getAutoCommit()) {
- conn.commit();
- }
- rs = statement.getGeneratedKeys();
- if (rs.next()) {
- long jobInstanceID = rs.getLong(1);
- final JobInstanceImpl jobInstance = new JobInstanceImpl(jobInstanceID, jobXml);
- jobInstance.setJobName(name);
- return jobInstance;
- }
- return null;
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
- }
-
- @Override
- public RuntimeJobExecution createJobExecution(final JobInstance jobInstance, final Properties jobParameters, final BatchStatus batchStatus) {
- final Timestamp now = new Timestamp(System.currentTimeMillis());
- final long newExecutionId = createRuntimeJobExecutionEntry(jobInstance, jobParameters, batchStatus, now);
- final RuntimeJobExecution jobExecution = new RuntimeJobExecution(jobInstance, newExecutionId, this);
- jobExecution.setBatchStatus(batchStatus.name());
- jobExecution.setCreateTime(now);
- jobExecution.setLastUpdateTime(now);
- return jobExecution;
- }
-
- private long createRuntimeJobExecutionEntry(final JobInstance jobInstance, final Properties jobParameters, final BatchStatus batchStatus,
- final Timestamp timestamp) {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getCreateJobExecution(), Statement.RETURN_GENERATED_KEYS);
- statement.setLong(1, jobInstance.getInstanceId());
- statement.setTimestamp(2, timestamp);
- statement.setTimestamp(3, timestamp);
- statement.setString(4, batchStatus.name());
- statement.setObject(5, serialize(jobParameters));
- statement.executeUpdate();
- if (!conn.getAutoCommit()) {
- conn.commit();
- }
- rs = statement.getGeneratedKeys();
- if (rs.next()) {
- return rs.getLong(1);
- }
- return -1;
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- } catch (final IOException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
- }
-
- @Override
- public RuntimeFlowInSplitExecution createFlowInSplitExecution(final JobInstance jobInstance, final BatchStatus batchStatus) {
- final Timestamp now = new Timestamp(System.currentTimeMillis());
- final long newExecutionId = createRuntimeJobExecutionEntry(jobInstance, null, batchStatus, now);
- final RuntimeFlowInSplitExecution flowExecution = new RuntimeFlowInSplitExecution(jobInstance, newExecutionId, this);
- flowExecution.setBatchStatus(batchStatus.name());
- flowExecution.setCreateTime(now);
- flowExecution.setLastUpdateTime(now);
- return flowExecution;
- }
-
- @Override
- public StepExecutionImpl createStepExecution(final long rootJobExecId, final StepContextImpl stepContext) {
- final String batchStatus = stepContext.getBatchStatus() == null ? BatchStatus.STARTING.name() : stepContext.getBatchStatus().name();
- final String exitStatus = stepContext.getExitStatus();
- final String stepName = stepContext.getStepName();
-
- long readCount = 0;
- long writeCount = 0;
- long commitCount = 0;
- long rollbackCount = 0;
- long readSkipCount = 0;
- long processSkipCount = 0;
- long filterCount = 0;
- long writeSkipCount = 0;
- Timestamp startTime = stepContext.getStartTimeTS();
- Timestamp endTime = stepContext.getEndTimeTS();
-
- final Metric[] metrics = stepContext.getMetrics();
- for (final Metric metric : metrics) {
- if (metric.getType().equals(MetricImpl.MetricType.READ_COUNT)) {
- readCount = metric.getValue();
- } else if (metric.getType().equals(MetricImpl.MetricType.WRITE_COUNT)) {
- writeCount = metric.getValue();
- } else if (metric.getType().equals(MetricImpl.MetricType.PROCESS_SKIP_COUNT)) {
- processSkipCount = metric.getValue();
- } else if (metric.getType().equals(MetricImpl.MetricType.COMMIT_COUNT)) {
- commitCount = metric.getValue();
- } else if (metric.getType().equals(MetricImpl.MetricType.ROLLBACK_COUNT)) {
- rollbackCount = metric.getValue();
- } else if (metric.getType().equals(MetricImpl.MetricType.READ_SKIP_COUNT)) {
- readSkipCount = metric.getValue();
- } else if (metric.getType().equals(MetricImpl.MetricType.FILTER_COUNT)) {
- filterCount = metric.getValue();
- } else if (metric.getType().equals(MetricImpl.MetricType.WRITE_SKIP_COUNT)) {
- writeSkipCount = metric.getValue();
- }
- }
- final Serializable persistentData = stepContext.getPersistentUserData();
-
- return createStepExecution(rootJobExecId, batchStatus, exitStatus, stepName, readCount,
- writeCount, commitCount, rollbackCount, readSkipCount, processSkipCount, filterCount, writeSkipCount, startTime,
- endTime, persistentData);
- }
-
-
-//CHECKSTYLE:OFF
- private StepExecutionImpl createStepExecution(final long rootJobExecId, final String batchStatus, final String exitStatus, final String stepName,
- final long readCount, final long writeCount, final long commitCount, final long rollbackCount, final long readSkipCount,
- final long processSkipCount, final long filterCount, final long writeSkipCount, final Timestamp startTime,
- final Timestamp endTime, final Serializable persistentData) {
-//CHECKSTYLE:ON
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getCreateStepExecution(), Statement.RETURN_GENERATED_KEYS);
- statement.setLong(1, rootJobExecId);
- statement.setString(2, batchStatus);
- statement.setString(3, exitStatus);
- statement.setString(4, stepName);
- statement.setLong(5, readCount);
- statement.setLong(6, writeCount);
- statement.setLong(7, commitCount);
- statement.setLong(8, rollbackCount);
- statement.setLong(9, readSkipCount);
- statement.setLong(10, processSkipCount);
- statement.setLong(11, filterCount);
- statement.setLong(12, writeSkipCount);
- statement.setTimestamp(13, startTime);
- statement.setTimestamp(14, endTime);
- statement.setObject(15, serialize(persistentData));
-
- statement.executeUpdate();
- if (!conn.getAutoCommit()) {
- conn.commit();
- }
-
- rs = statement.getGeneratedKeys();
- if (rs.next()) {
- final long stepExecutionId = rs.getLong(1);
- final StepExecutionImpl stepExecution = new StepExecutionImpl(rootJobExecId, stepExecutionId);
- stepExecution.setStepName(stepName);
- return stepExecution;
- }
- return null;
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- } catch (final IOException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, null, statement);
- }
- }
-
- @Override
- public void updateStepExecution(final long rootJobExecId, final StepContextImpl stepContext) {
- final long stepExecutionId = stepContext.getStepInternalExecID();
- final String batchStatus = stepContext.getBatchStatus() == null ? BatchStatus.STARTING.name() : stepContext.getBatchStatus().name();
- final String exitStatus = stepContext.getExitStatus();
- final String stepName = stepContext.getStepName();
- final Timestamp startTime = stepContext.getStartTimeTS();
- final Timestamp endTime = stepContext.getEndTimeTS();
-
- long readCount = 0;
- long writeCount = 0;
- long commitCount = 0;
- long rollbackCount = 0;
- long readSkipCount = 0;
- long processSkipCount = 0;
- long filterCount = 0;
- long writeSkipCount = 0;
-
- final Metric[] metrics = stepContext.getMetrics();
- for (final Metric metric : metrics) {
- if (metric.getType().equals(MetricImpl.MetricType.READ_COUNT)) {
- readCount = metric.getValue();
- } else if (metric.getType().equals(MetricImpl.MetricType.WRITE_COUNT)) {
- writeCount = metric.getValue();
- } else if (metric.getType().equals(MetricImpl.MetricType.PROCESS_SKIP_COUNT)) {
- processSkipCount = metric.getValue();
- } else if (metric.getType().equals(MetricImpl.MetricType.COMMIT_COUNT)) {
- commitCount = metric.getValue();
- } else if (metric.getType().equals(MetricImpl.MetricType.ROLLBACK_COUNT)) {
- rollbackCount = metric.getValue();
- } else if (metric.getType().equals(MetricImpl.MetricType.READ_SKIP_COUNT)) {
- readSkipCount = metric.getValue();
- } else if (metric.getType().equals(MetricImpl.MetricType.FILTER_COUNT)) {
- filterCount = metric.getValue();
- } else if (metric.getType().equals(MetricImpl.MetricType.WRITE_SKIP_COUNT)) {
- writeSkipCount = metric.getValue();
- }
- }
- final Serializable persistentData = stepContext.getPersistentUserData();
-
- updateStepExecution(stepExecutionId, rootJobExecId, batchStatus, exitStatus, stepName, readCount,
- writeCount, commitCount, rollbackCount, readSkipCount, processSkipCount, filterCount,
- writeSkipCount, startTime, endTime, persistentData);
-
- }
-
-
-//CHECKSTYLE:OFF
- private void updateStepExecution(final long stepExecutionId, final long jobExecId, final String batchStatus, final String exitStatus, final String stepName,
- final long readCount, final long writeCount, final long commitCount, final long rollbackCount, final long readSkipCount,
- final long processSkipCount, final long filterCount, final long writeSkipCount, final Timestamp startTime,
- final Timestamp endTime, final Serializable persistentData) {
-//CHECKSTYLE:ON
- Connection conn = null;
- PreparedStatement statement = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getUpdateStepExecution());
- statement.setLong(1, jobExecId);
- statement.setString(2, batchStatus);
- statement.setString(3, exitStatus);
- statement.setString(4, stepName);
- statement.setLong(5, readCount);
- statement.setLong(6, writeCount);
- statement.setLong(7, commitCount);
- statement.setLong(8, rollbackCount);
- statement.setLong(9, readSkipCount);
- statement.setLong(10, processSkipCount);
- statement.setLong(11, filterCount);
- statement.setLong(12, writeSkipCount);
- statement.setTimestamp(13, startTime);
- statement.setTimestamp(14, endTime);
- statement.setObject(15, serialize(persistentData));
- statement.setLong(16, stepExecutionId);
- statement.executeUpdate();
- if (!conn.getAutoCommit()) {
- conn.commit();
- }
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- } catch (final IOException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, null, statement);
- }
- }
-
- @Override
- public JobStatus createJobStatus(final long jobInstanceId) {
- return new JobStatus(jobInstanceId); // instance already created
- }
-
- @Override
- public JobStatus getJobStatus(final long instanceId) {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getFindJobInstance());
- statement.setLong(1, instanceId);
- rs = statement.executeQuery();
- if (rs.next()) {
- final JobStatus status = new JobStatus(instanceId);
- status.setCurrentStepId(dictionary.jobExecutionColumns(7));
- status.setExitStatus(dictionary.jobExecutionColumns(2));
-
- final byte[] jobXmls = rs.getBytes(dictionary.jobInstanceColumns(4));
- final JobInstanceImpl instance;
- if (jobXmls != null) {
- instance = new JobInstanceImpl(instanceId, new String(jobXmls, UTF_8));
- } else {
- instance = new JobInstanceImpl(instanceId);
- }
- instance.setJobName(rs.getString(dictionary.jobInstanceColumns(3)));
-
- status.setJobInstance(instance);
-
- status.setLatestExecutionId(rs.getLong(dictionary.jobInstanceColumns(5)));
- status.setRestartOn(rs.getString(dictionary.jobInstanceColumns(6)));
-
- final String batchStatus = rs.getString(dictionary.stepExecutionColumns(1));
- if (batchStatus != null) {
- status.setBatchStatus(BatchStatus.valueOf(batchStatus));
- }
-
- return status;
- }
- return null;
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.batchee.spi.PersistenceManagerService#updateJobStatus(long, org.apache.batchee.container.status.JobStatus)
- */
- @Override
- public void updateJobStatus(final long instanceId, final JobStatus jobStatus) {
- Connection conn = null;
- PreparedStatement statement = null;
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getUpdateJobInstanceStatus());
- if (jobStatus.getBatchStatus() != null) {
- statement.setString(1, jobStatus.getBatchStatus().name());
- } else {
- statement.setString(1, null);
- }
- statement.setString(2, jobStatus.getExitStatus());
- statement.setLong(3, jobStatus.getLatestExecutionId());
- statement.setString(4, jobStatus.getRestartOn());
- statement.setString(5, jobStatus.getCurrentStepId());
- if (jobStatus.getJobInstance() != null) {
- statement.setString(6, jobStatus.getJobInstance().getJobName());
- } else {
- statement.setString(6, null);
- }
- statement.setLong(7, instanceId);
- statement.executeUpdate();
- if (!conn.getAutoCommit()) {
- conn.commit();
- }
- } catch (SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, null, statement);
- }
- }
-
- @Override
- public StepStatus createStepStatus(final long stepExecId) {
- return new StepStatus(stepExecId); // instance already created
- }
-
- @Override
- public StepStatus getStepStatus(final long instanceId, final String stepName) {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getFindStepExecutionByJobInstanceAndStepName());
- statement.setLong(1, instanceId);
- statement.setString(2, stepName);
- rs = statement.executeQuery();
- if (rs.next()) {
- final int startCount = rs.getInt(dictionary.stepExecutionColumns(13));
- final long id = rs.getLong(dictionary.stepExecutionColumns(0));
- final StepStatus stepStatus = new StepStatus(id, startCount);
- final int numPartitions = rs.getInt(dictionary.stepExecutionColumns(7));
- final byte[] persistentDatas = rs.getBytes(dictionary.stepExecutionColumns(8));
- if (numPartitions >= 0) {
- stepStatus.setNumPartitions(numPartitions);
- }
- if (persistentDatas != null) {
- stepStatus.setPersistentUserData(new PersistentDataWrapper(persistentDatas));
- }
- stepStatus.setBatchStatus(BatchStatus.valueOf(rs.getString(dictionary.stepExecutionColumns(1))));
- stepStatus.setExitStatus(rs.getString(dictionary.stepExecutionColumns(4)));
- stepStatus.setLastRunStepExecutionId(rs.getLong(dictionary.stepExecutionColumns(6)));
- stepStatus.setStepExecutionId(id);
- return stepStatus;
- }
- return null;
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
- }
-
- @Override
- public void updateStepStatus(final long stepExecutionId, final StepStatus stepStatus) {
- Connection conn = null;
- PreparedStatement statement = null;
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getUpdateStepExecutionStatus());
- statement.setBytes(1, stepStatus.getRawPersistentUserData());
- if (stepStatus.getBatchStatus() != null) {
- statement.setString(2, stepStatus.getBatchStatus().name());
- } else {
- statement.setString(2, null);
- }
- statement.setString(3, stepStatus.getExitStatus());
- statement.setLong(4, stepStatus.getLastRunStepExecutionId());
- if (stepStatus.getNumPartitions() != null) {
- statement.setInt(5, stepStatus.getNumPartitions());
- } else {
- statement.setInt(5, -1);
- }
- statement.setInt(6, stepStatus.getStartCount());
- statement.setLong(7, stepStatus.getStepExecutionId());
- statement.executeUpdate();
- if (!conn.getAutoCommit()) {
- conn.commit();
- }
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, null, statement);
- }
- }
-
- @Override
- public long getMostRecentExecutionId(final long jobInstanceId) {
- Connection conn = null;
- PreparedStatement statement = null;
- ResultSet rs = null;
-
- try {
- conn = getConnection();
- statement = conn.prepareStatement(dictionary.getFindMostRecentJobExecution());
- statement.setLong(1, jobInstanceId);
- rs = statement.executeQuery();
- if (rs.next()) {
- return rs.getLong(1);
- }
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, rs, statement);
- }
- return -1;
- }
-
- @Override
- public void cleanUp(final long instanceId) {
- Connection conn = null;
- try {
- conn = getConnection();
- deleteFromInstanceId(instanceId, conn, dictionary.getDeleteStepExecution());
- deleteFromInstanceId(instanceId, conn, dictionary.getDeleteCheckpoint());
- deleteFromInstanceId(instanceId, conn, dictionary.getDeleteJobExecution());
- deleteFromInstanceId(instanceId, conn, dictionary.getDeleteJobInstance());
- if (!conn.getAutoCommit()) {
- conn.commit();
- }
- } catch (final SQLException e) {
- throw new PersistenceException(e);
- } finally {
- cleanupConnection(conn, null, null);
- }
- }
-
- private static void deleteFromInstanceId(final long instanceId, final Connection conn, final String delete) throws SQLException {
- final PreparedStatement statement = conn.prepareStatement(delete);
- statement.setLong(1, instanceId);
- statement.executeUpdate();
- statement.close();
- }
-
- @Override
- public String toString() {
- try {
- return "JDBCPersistenceManager{" +
- "dictionary=" + dictionary +
- ", dataSource=" + dataSource +
- ", jndiName='" + jndiName + '\'' +
- ", driver='" + driver + '\'' +
- ", schema='" + schema + '\'' +
- ", url='" + url + '\'' +
- ", user='" + user + '\'' +
- ", pwd='" + pwd + '\'' +
- '}';
- } catch (final Throwable th) {
- return "JDBCPersistenceManager{dictionary=" + dictionary + '}';
- }
- }
-}