You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@batchee.apache.org by rm...@apache.org on 2013/11/05 08:39:31 UTC
[54/62] importing batchee from github - a fork from the IBm RI
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/context/impl/StepContextImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/context/impl/StepContextImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/context/impl/StepContextImpl.java
deleted file mode 100755
index 2473747..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/context/impl/StepContextImpl.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package com.ibm.jbatch.container.context.impl;
-
-import java.io.Serializable;
-import java.sql.Timestamp;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.batch.runtime.BatchStatus;
-import javax.batch.runtime.Metric;
-import javax.batch.runtime.context.StepContext;
-
-public class StepContextImpl implements StepContext {
-
- private final static String sourceClass = StepContextImpl.class.getName();
- private final static Logger logger = Logger.getLogger(sourceClass);
-
- private String stepId = null;
- private BatchStatus batchStatus = null;
- private String exitStatus = null;
- private Object transientUserData = null;
- private Serializable persistentUserData = null;
- private Exception exception = null;
- Timestamp starttime = null;
- Timestamp endtime = null;
-
- private long stepExecID = 0;
-
- private Properties properties = new Properties();
-
- private String batchletProcessRetVal = null;
-
-
-
- private ConcurrentHashMap<String, Metric> metrics = new ConcurrentHashMap<String, Metric>();
-
- public StepContextImpl(String stepId) {
- this.stepId = stepId;
- }
-
- @Override
- public BatchStatus getBatchStatus() {
- return batchStatus;
- }
-
- @Override
- public Exception getException() {
- // TODO Auto-generated method stub
- return exception;
- }
-
- public void setException(Exception exception){
- this.exception = exception;
- }
-
- @Override
- public String getExitStatus() {
- return exitStatus;
- }
-
- @Override
- public String getStepName() {
- return stepId;
- }
-
- @Override
- public Metric[] getMetrics() {
- return metrics.values().toArray(new Metric[0]);
- }
-
- public MetricImpl getMetric(MetricImpl.MetricType metricType) {
- return (MetricImpl)metrics.get(metricType.name());
- }
-
- public void addMetric(MetricImpl.MetricType metricType, long value) {
- metrics.putIfAbsent(metricType.name(), new MetricImpl(metricType, value));
- }
-
- @Override
- public Serializable getPersistentUserData() {
- return persistentUserData;
- }
-
- @Override
- public Properties getProperties() {
- return properties;
- }
-
- public Object getTransientUserData() {
- return transientUserData;
- }
-
- @Override
- public void setExitStatus(String status) {
- this.exitStatus = status;
- if (logger.isLoggable(Level.FINER)) {
- logger.log(Level.FINER, "Exit status set to: " + status + " for step id:" + getStepName());
- }
- }
-
- public void setBatchStatus(BatchStatus status) {
- this.batchStatus = status;
- if (logger.isLoggable(Level.FINE)) {
- logger.log(Level.FINE, "Batch status set to: " + status + " for step id:" + getStepName());
- }
- }
-
- @Override
- public void setPersistentUserData(Serializable data) {
- persistentUserData = data;
-
- }
-
- public void setTransientUserData(Object data) {
- transientUserData = data;
- }
-
- @Override
- public String toString() {
- StringBuffer buf = new StringBuffer();
- buf.append(" stepId: " + stepId);
- buf.append(", batchStatus: " + batchStatus);
- buf.append(", exitStatus: " + exitStatus);
- buf.append(", batchletProcessRetVal: " + batchletProcessRetVal);
- buf.append(", transientUserData: " + transientUserData);
- buf.append(", persistentUserData: " + persistentUserData);
- return buf.toString();
- }
-
- @Override
- public long getStepExecutionId() {
- return stepExecID;
- }
-
-
- public void setStepExecutionId(long stepExecutionId){
- stepExecID = stepExecutionId;
- }
-
- public void setStartTime(Timestamp startTS) {
- starttime = startTS;
-
- }
-
- public void setEndTime(Timestamp endTS) {
- endtime = endTS;
-
- }
-
- public Timestamp getStartTimeTS(){
- return starttime;
- }
-
- public Timestamp getEndTimeTS(){
- return endtime;
- }
-
- public String getBatchletProcessRetVal() {
- return batchletProcessRetVal;
- }
-
- public void setBatchletProcessRetVal(String batchletProcessRetVal) {
- this.batchletProcessRetVal = batchletProcessRetVal;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/exception/BatchContainerRuntimeException.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/exception/BatchContainerRuntimeException.java b/JSR352.Runtime/src/com/ibm/jbatch/container/exception/BatchContainerRuntimeException.java
deleted file mode 100755
index 093d123..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/exception/BatchContainerRuntimeException.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package com.ibm.jbatch.container.exception;
-
-public class BatchContainerRuntimeException extends RuntimeException {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- public BatchContainerRuntimeException() {
- // TODO Auto-generated constructor stub
- }
-
- public BatchContainerRuntimeException(String message) {
- super(message);
- // TODO Auto-generated constructor stub
- }
-
- public BatchContainerRuntimeException(Throwable cause) {
- super(cause);
- // TODO Auto-generated constructor stub
- }
-
- public BatchContainerRuntimeException(String message, Throwable cause) {
- super(message, cause);
- // TODO Auto-generated constructor stub
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/exception/BatchContainerServiceException.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/exception/BatchContainerServiceException.java b/JSR352.Runtime/src/com/ibm/jbatch/container/exception/BatchContainerServiceException.java
deleted file mode 100755
index 19853f4..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/exception/BatchContainerServiceException.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package com.ibm.jbatch.container.exception;
-
-public class BatchContainerServiceException extends BatchContainerRuntimeException {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- public BatchContainerServiceException() {
- // TODO Auto-generated constructor stub
- }
-
- public BatchContainerServiceException(String message) {
- super(message);
- // TODO Auto-generated constructor stub
- }
-
- public BatchContainerServiceException(Throwable cause) {
- super(cause);
- // TODO Auto-generated constructor stub
- }
-
- public BatchContainerServiceException(String message, Throwable cause) {
- super(message, cause);
- // TODO Auto-generated constructor stub
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/exception/IllegalBatchPropertyException.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/exception/IllegalBatchPropertyException.java b/JSR352.Runtime/src/com/ibm/jbatch/container/exception/IllegalBatchPropertyException.java
deleted file mode 100755
index 503283a..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/exception/IllegalBatchPropertyException.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package com.ibm.jbatch.container.exception;
-
-public class IllegalBatchPropertyException extends BatchContainerRuntimeException {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- public IllegalBatchPropertyException() {
- // TODO Auto-generated constructor stub
- }
-
- public IllegalBatchPropertyException(String message) {
- super(message);
- // TODO Auto-generated constructor stub
- }
-
- public IllegalBatchPropertyException(Throwable cause) {
- super(cause);
- // TODO Auto-generated constructor stub
- }
-
- public IllegalBatchPropertyException(String message, Throwable cause) {
- super(message, cause);
- // TODO Auto-generated constructor stub
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/exception/PersistenceException.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/exception/PersistenceException.java b/JSR352.Runtime/src/com/ibm/jbatch/container/exception/PersistenceException.java
deleted file mode 100755
index eb01ec9..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/exception/PersistenceException.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package com.ibm.jbatch.container.exception;
-
-public class PersistenceException extends BatchContainerRuntimeException {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- public PersistenceException() {
- // TODO Auto-generated constructor stub
- }
-
- public PersistenceException(String message) {
- super(message);
- // TODO Auto-generated constructor stub
- }
-
- public PersistenceException(Throwable cause) {
- super(cause);
- // TODO Auto-generated constructor stub
- }
-
- public PersistenceException(String message, Throwable cause) {
- super(message, cause);
- // TODO Auto-generated constructor stub
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/exception/TransactionManagementException.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/exception/TransactionManagementException.java b/JSR352.Runtime/src/com/ibm/jbatch/container/exception/TransactionManagementException.java
deleted file mode 100755
index a0aec50..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/exception/TransactionManagementException.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package com.ibm.jbatch.container.exception;
-
-public class TransactionManagementException extends BatchContainerRuntimeException {
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- public TransactionManagementException() {
- super();
- // TODO Auto-generated constructor stub
- }
-
- public TransactionManagementException(String message, Throwable cause) {
- super(message, cause);
- // TODO Auto-generated constructor stub
- }
-
- public TransactionManagementException(String message) {
- super(message);
- // TODO Auto-generated constructor stub
- }
-
- public TransactionManagementException(Throwable cause) {
- super(cause);
- // TODO Auto-generated constructor stub
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BaseStepControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BaseStepControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BaseStepControllerImpl.java
deleted file mode 100755
index e7df91a..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BaseStepControllerImpl.java
+++ /dev/null
@@ -1,470 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.impl;
-
-import java.io.ByteArrayOutputStream;
-import java.io.ObjectOutputStream;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.batch.operations.JobExecutionAlreadyCompleteException;
-import javax.batch.operations.JobExecutionNotMostRecentException;
-import javax.batch.operations.JobRestartException;
-import javax.batch.operations.JobStartException;
-import javax.batch.runtime.BatchStatus;
-import javax.batch.runtime.JobInstance;
-
-import com.ibm.jbatch.container.IExecutionElementController;
-import com.ibm.jbatch.container.context.impl.MetricImpl;
-import com.ibm.jbatch.container.context.impl.StepContextImpl;
-import com.ibm.jbatch.container.exception.BatchContainerRuntimeException;
-import com.ibm.jbatch.container.exception.BatchContainerServiceException;
-import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
-import com.ibm.jbatch.container.jobinstance.StepExecutionImpl;
-import com.ibm.jbatch.container.persistence.PersistentDataWrapper;
-import com.ibm.jbatch.container.services.IBatchKernelService;
-import com.ibm.jbatch.container.services.IJobStatusManagerService;
-import com.ibm.jbatch.container.services.IPersistenceManagerService;
-import com.ibm.jbatch.container.servicesmanager.ServicesManagerImpl;
-import com.ibm.jbatch.container.status.ExecutionStatus;
-import com.ibm.jbatch.container.status.ExtendedBatchStatus;
-import com.ibm.jbatch.container.status.StepStatus;
-import com.ibm.jbatch.container.util.PartitionDataWrapper;
-import com.ibm.jbatch.jsl.model.JSLProperties;
-import com.ibm.jbatch.jsl.model.Property;
-import com.ibm.jbatch.jsl.model.Step;
-import com.ibm.jbatch.spi.services.ITransactionManagementService;
-import com.ibm.jbatch.spi.services.TransactionManagerAdapter;
-
-/** Change the name of this class to something else!! Or change BaseStepControllerImpl. */
-public abstract class BaseStepControllerImpl implements IExecutionElementController {
-
- private final static String sourceClass = BatchletStepControllerImpl.class.getName();
- private final static Logger logger = Logger.getLogger(sourceClass);
-
- protected RuntimeJobExecution jobExecutionImpl;
- protected JobInstance jobInstance;
-
- protected StepContextImpl stepContext;
- protected Step step;
- protected StepStatus stepStatus;
-
- protected BlockingQueue<PartitionDataWrapper> analyzerStatusQueue = null;
-
- protected long rootJobExecutionId;
-
- protected static IBatchKernelService batchKernel = ServicesManagerImpl.getInstance().getBatchKernelService();
-
- protected TransactionManagerAdapter transactionManager = null;
-
- private static IPersistenceManagerService _persistenceManagementService = ServicesManagerImpl.getInstance().getPersistenceManagerService();
-
- private static IJobStatusManagerService _jobStatusService = (IJobStatusManagerService) ServicesManagerImpl.getInstance().getJobStatusManagerService();
-
- protected BaseStepControllerImpl(RuntimeJobExecution jobExecution, Step step, StepContextImpl stepContext, long rootJobExecutionId) {
- this.jobExecutionImpl = jobExecution;
- this.jobInstance = jobExecution.getJobInstance();
- this.stepContext = stepContext;
- this.rootJobExecutionId = rootJobExecutionId;
- if (step == null) {
- throw new IllegalArgumentException("Step parameter to ctor cannot be null.");
- }
- this.step = step;
- }
-
- protected BaseStepControllerImpl(RuntimeJobExecution jobExecution, Step step, StepContextImpl stepContext, long rootJobExecutionId, BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
- this(jobExecution, step, stepContext, rootJobExecutionId);
- this.analyzerStatusQueue = analyzerStatusQueue;
- }
-
- ///////////////////////////
- // ABSTRACT METHODS ARE HERE
- ///////////////////////////
- protected abstract void invokeCoreStep() throws JobRestartException, JobStartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException;
-
- protected abstract void setupStepArtifacts();
-
- protected abstract void invokePreStepArtifacts();
-
- protected abstract void invokePostStepArtifacts();
-
- // This is only useful from the partition threads
- protected abstract void sendStatusFromPartitionToAnalyzerIfPresent();
-
- @Override
- public ExecutionStatus execute() {
-
- // Here we're just setting up to decide if we're going to run the step or not (if it's already complete and
- // allow-start-if-complete=false.
- try {
- boolean executeStep = shouldStepBeExecuted();
- if (!executeStep) {
- logger.fine("Not going to run this step. Returning previous exit status of: " + stepStatus.getExitStatus());
- return new ExecutionStatus(ExtendedBatchStatus.DO_NOT_RUN, stepStatus.getExitStatus());
- }
- } catch (Throwable t) {
- // Treat an error at this point as unrecoverable, so fail job too.
- markJobAndStepFailed();
- rethrowWithWarning("Caught throwable while determining if step should be executed. Failing job.", t);
- }
-
- // At this point we have a StepExecution. Setup so that we're ready to invoke artifacts.
- try {
- startStep();
- } catch (Throwable t) {
- // Treat an error at this point as unrecoverable, so fail job too.
- markJobAndStepFailed();
- rethrowWithWarning("Caught throwable while starting step. Failing job.", t);
- }
-
- // At this point artifacts are in the picture so we want to try to invoke afterStep() on a failure.
- try {
- invokePreStepArtifacts(); //Call PartitionReducer and StepListener(s)
- invokeCoreStep();
- } catch (Exception e) {
- // We're going to continue on so that we can execute the afterStep() and analyzer
- try {
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- e.printStackTrace(pw);
- logger.warning("Caught exception executing step: " + sw.toString());
- markStepFailed();
- } catch(Throwable t) {
- // Since the first one is the original first failure, let's rethrow t1 and not the second error,
- // but we'll log a severe error pointing out that the failure didn't get persisted..
- // We won't try to call the afterStep() in this case either.
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- t.printStackTrace(pw);
- rethrowWithSevere("ERROR. PERSISTING BATCH STATUS FAILED. STEP EXECUTION STATUS TABLES MIGHT HAVE CONSISTENCY ISSUES" +
- "AND/OR UNEXPECTED ENTRIES.", t);
- }
- } catch (Throwable t) {
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- t.printStackTrace(pw);
- logger.warning("Failing both step AND job after catching error executing step: " + sw.toString());
- markJobAndStepFailed();
- }
-
- //
- // At this point we may have already failed the step, but we still try to invoke the end of step artifacts.
- //
- try {
- //Call PartitionAnalyzer, PartitionReducer and StepListener(s)
- invokePostStepArtifacts();
- } catch (Throwable t) {
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- t.printStackTrace(pw);
- logger.warning("Error invoking end of step artifacts. Stack trace: " + sw.toString());
- markStepFailed();
- }
-
- //
- // No more application code is on the path from here on out (excluding the call to the PartitionAnalyzer
- // analyzeStatus(). If an exception bubbles up and leaves the statuses inconsistent or incorrect then so be it;
- // maybe there's a runtime bug that will need to be fixed.
- //
- try {
- // Now that all step-level artifacts have had a chance to run,
- // we set the exit status to one of the defaults if it is still unset.
-
- // This is going to be the very last sequence of calls from the step running on the main thread,
- // since the call back to the partition analyzer only happens on the partition threads.
- // On the partition threads, then, we harden the status at the partition level before we
- // send it back to the main thread.
- persistUserData();
- transitionToFinalBatchStatus();
- defaultExitStatusIfNecessary();
- persistExitStatusAndEndTimestamp();
- } catch (Throwable t) {
- // Don't let an exception caught here prevent us from persisting the failed batch status.
- markJobAndStepFailed();
- rethrowWithWarning("Failure ending step execution", t);
- }
-
- //
- // Only happens on main thread.
- //
- sendStatusFromPartitionToAnalyzerIfPresent();
-
- logger.finer("Returning step batchStatus: " + stepStatus.getBatchStatus() +
- ", exitStatus: " + stepStatus.getExitStatus());
-
- if (stepStatus.getBatchStatus().equals(BatchStatus.FAILED)) {
- return new ExecutionStatus(ExtendedBatchStatus.EXCEPTION_THROWN, stepStatus.getExitStatus());
- } else {
- return new ExecutionStatus(ExtendedBatchStatus.NORMAL_COMPLETION, stepStatus.getExitStatus());
- }
- }
-
- private void defaultExitStatusIfNecessary() {
- String stepExitStatus = stepContext.getExitStatus();
- String processRetVal = stepContext.getBatchletProcessRetVal();
- if (stepExitStatus != null) {
- logger.fine("Returning with user-set exit status: " + stepExitStatus);
- } else if (processRetVal != null) {
- logger.fine("Returning with exit status from batchlet.process(): " + processRetVal);
- stepContext.setExitStatus(processRetVal);
- } else {
- logger.fine("Returning with default exit status");
- stepContext.setExitStatus(stepContext.getBatchStatus().name());
- }
- }
-
- private void markStepFailed() {
- updateBatchStatus(BatchStatus.FAILED);
- }
-
- protected void markJobAndStepFailed() {
- jobExecutionImpl.getJobContext().setBatchStatus(BatchStatus.FAILED);
- markStepFailed();
- }
-
- private void startStep() {
- // Update status
- statusStarting();
- //Set Step context properties
- setContextProperties();
- //Set up step artifacts like step listeners, partition reducers
- setupStepArtifacts();
- // Move batch status to started.
- updateBatchStatus(BatchStatus.STARTED);
-
- long time = System.currentTimeMillis();
- Timestamp startTS = new Timestamp(time);
- stepContext.setStartTime(startTS);
-
- _persistenceManagementService.updateStepExecution(rootJobExecutionId, stepContext);
- }
-
-
- /**
- * The only valid states at this point are STARTED,STOPPING, or FAILED.
- * been able to get to STOPPED, or COMPLETED yet at this point in the code.
- */
- private void transitionToFinalBatchStatus() {
- BatchStatus currentBatchStatus = stepContext.getBatchStatus();
- if (currentBatchStatus.equals(BatchStatus.STARTED)) {
- updateBatchStatus(BatchStatus.COMPLETED);
- } else if (currentBatchStatus.equals(BatchStatus.STOPPING)) {
- updateBatchStatus(BatchStatus.STOPPED);
- } else if (currentBatchStatus.equals(BatchStatus.FAILED)) {
- updateBatchStatus(BatchStatus.FAILED); // Should have already been done but maybe better for possible code refactoring to have it here.
- } else {
- throw new IllegalStateException("Step batch status should not be in a " + currentBatchStatus.name() + " state");
- }
- }
-
- protected void updateBatchStatus(BatchStatus updatedBatchStatus) {
- logger.fine("Updating batch status from : " + stepStatus.getBatchStatus() + ", to: " + updatedBatchStatus);
- stepStatus.setBatchStatus(updatedBatchStatus);
- _jobStatusService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
- stepContext.setBatchStatus(updatedBatchStatus);
- }
-
- protected boolean shouldStepBeExecuted() {
-
- if (logger.isLoggable(Level.FINER)) {
- logger.finer("In shouldStepBeExecuted() with stepContext = " + this.stepContext);
- }
-
- this.stepStatus = _jobStatusService.getStepStatus(jobInstance.getInstanceId(), step.getId());
- if (stepStatus == null) {
- logger.finer("No existing step status found. Create new step execution and proceed to execution.");
- // create new step execution
- StepExecutionImpl stepExecution = getNewStepExecution(rootJobExecutionId, stepContext);
- // create new step status for this run
- stepStatus = _jobStatusService.createStepStatus(stepExecution.getStepExecutionId());
- ((StepContextImpl) stepContext).setStepExecutionId(stepExecution.getStepExecutionId());
- return true;
- } else {
- logger.finer("Existing step status found.");
- // if a step status already exists for this instance id. It means this
- // is a restart and we need to get the previously persisted data
- ((StepContextImpl) stepContext).setPersistentUserData(stepStatus.getPersistentUserData());
- if (shouldStepBeExecutedOnRestart()) {
- // Seems better to let the start count get incremented without getting a step execution than
- // vice versa (in an unexpected error case).
- stepStatus.incrementStartCount();
- // create new step execution
- StepExecutionImpl stepExecution = getNewStepExecution(rootJobExecutionId, stepContext);
- this.stepStatus.setLastRunStepExecutionId(stepExecution.getStepExecutionId());
- ((StepContextImpl) stepContext).setStepExecutionId(stepExecution.getStepExecutionId());
- return true;
- } else {
- return false;
- }
- }
- }
-
- private boolean shouldStepBeExecutedOnRestart() {
- BatchStatus stepBatchStatus = stepStatus.getBatchStatus();
- if (stepBatchStatus.equals(BatchStatus.COMPLETED)) {
- // A bit of parsing involved since the model gives us a String not a
- // boolean, but it should default to 'false', which is the spec'd default.
- if (!Boolean.parseBoolean(step.getAllowStartIfComplete())) {
- logger.fine("Step: " + step.getId() + " already has batch status of COMPLETED, so won't be run again since it does not allow start if complete.");
- return false;
- } else {
- logger.fine("Step: " + step.getId() + " already has batch status of COMPLETED, and allow-start-if-complete is set to 'true'");
- }
- }
-
- // The spec default is '0', which we get by initializing to '0' in the next line
- int startLimit = 0;
- String startLimitString = step.getStartLimit();
- if (startLimitString != null) {
- try {
- startLimit = Integer.parseInt(startLimitString);
- } catch (NumberFormatException e) {
- throw new IllegalArgumentException("Could not parse start limit value. Received NumberFormatException for start-limit value: " + startLimitString
- + " for stepId: " + step.getId() + ", with start-limit=" + step.getStartLimit());
- }
- }
-
- if (startLimit < 0) {
- throw new IllegalArgumentException("Found negative start-limit of " + startLimit + "for stepId: " + step.getId());
- }
-
- if (startLimit > 0) {
- int newStepStartCount = stepStatus.getStartCount() + 1;
- if (newStepStartCount > startLimit) {
- throw new IllegalStateException("For stepId: " + step.getId() + ", tried to start step for the " + newStepStartCount
- + " time, but startLimit = " + startLimit);
- } else {
- logger.fine("Starting (possibly restarting) step: " + step.getId() + ", since newStepStartCount = " + newStepStartCount
- + " and startLimit=" + startLimit);
- }
- }
- return true;
- }
-
-
- protected void statusStarting() {
- stepStatus.setBatchStatus(BatchStatus.STARTING);
- _jobStatusService.updateJobCurrentStep(jobInstance.getInstanceId(), step.getId());
- _jobStatusService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
- stepContext.setBatchStatus(BatchStatus.STARTING);
- }
-
- protected void persistUserData() {
- ByteArrayOutputStream persistentBAOS = new ByteArrayOutputStream();
- ObjectOutputStream persistentDataOOS = null;
-
- try {
- persistentDataOOS = new ObjectOutputStream(persistentBAOS);
- persistentDataOOS.writeObject(stepContext.getPersistentUserData());
- persistentDataOOS.close();
- } catch (Exception e) {
- throw new BatchContainerServiceException("Cannot persist the persistent user data for the step.", e);
- }
-
- stepStatus.setPersistentUserData(new PersistentDataWrapper(persistentBAOS.toByteArray()));
- _jobStatusService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
- }
-
- protected void persistExitStatusAndEndTimestamp() {
- stepStatus.setExitStatus(stepContext.getExitStatus());
- _jobStatusService.updateStepStatus(stepStatus.getStepExecutionId(), stepStatus);
-
- // set the end time metric before flushing
- long time = System.currentTimeMillis();
- Timestamp endTS = new Timestamp(time);
- stepContext.setEndTime(endTS);
-
- _persistenceManagementService.updateStepExecution(rootJobExecutionId, stepContext);
- }
-
- private StepExecutionImpl getNewStepExecution(long rootJobExecutionId, StepContextImpl stepContext) {
- return _persistenceManagementService.createStepExecution(rootJobExecutionId, stepContext);
- }
-
- private void setContextProperties() {
- JSLProperties jslProps = step.getProperties();
-
- if (jslProps != null) {
- for (Property property : jslProps.getPropertyList()) {
- Properties contextProps = stepContext.getProperties();
- contextProps.setProperty(property.getName(), property.getValue());
- }
- }
-
- // set up metrics
- stepContext.addMetric(MetricImpl.MetricType.READ_COUNT, 0);
- stepContext.addMetric(MetricImpl.MetricType.WRITE_COUNT, 0);
- stepContext.addMetric(MetricImpl.MetricType.READ_SKIP_COUNT, 0);
- stepContext.addMetric(MetricImpl.MetricType.PROCESS_SKIP_COUNT, 0);
- stepContext.addMetric(MetricImpl.MetricType.WRITE_SKIP_COUNT, 0);
- stepContext.addMetric(MetricImpl.MetricType.FILTER_COUNT, 0);
- stepContext.addMetric(MetricImpl.MetricType.COMMIT_COUNT, 0);
- stepContext.addMetric(MetricImpl.MetricType.ROLLBACK_COUNT, 0);
-
- ITransactionManagementService transMgr = ServicesManagerImpl.getInstance().getTransactionManagementService();
- transactionManager = transMgr.getTransactionManager(stepContext);
- }
-
- public void setStepContext(StepContextImpl stepContext) {
- this.stepContext = stepContext;
- }
-
- protected BlockingQueue<PartitionDataWrapper> getAnalyzerQueue() {
- return analyzerStatusQueue;
- }
-
- public void setAnalyzerQueue(BlockingQueue<PartitionDataWrapper> analyzerQueue) {
- this.analyzerStatusQueue = analyzerQueue;
- }
-
- @Override
- public List<Long> getLastRunStepExecutions() {
-
- List<Long> stepExecIdList = new ArrayList<Long>(1);
- stepExecIdList.add(this.stepStatus.getLastRunStepExecutionId());
-
- return stepExecIdList;
- }
-
- private void rethrowWithMsg(String msgBeginning, Throwable t, Level level) {
- String errorMsg = msgBeginning + " ; Caught exception/error: " + t.getLocalizedMessage();
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- t.printStackTrace(pw);
- logger.log(level, errorMsg + " : Stack trace: " + sw.toString());
- throw new BatchContainerRuntimeException(errorMsg, t);
- }
-
- private void rethrowWithWarning(String msgBeginning, Throwable t) {
- rethrowWithMsg(msgBeginning, t, Level.WARNING);
- }
-
- private void rethrowWithSevere(String msgBeginning, Throwable t) {
- rethrowWithMsg(msgBeginning, t, Level.SEVERE);
- }
-
- public String toString() {
- return "BaseStepControllerImpl for step = " + step.getId();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BatchConfigImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BatchConfigImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BatchConfigImpl.java
deleted file mode 100755
index 72b92a3..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BatchConfigImpl.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package com.ibm.jbatch.container.impl;
-
-import java.util.Properties;
-
-import com.ibm.jbatch.spi.DatabaseConfigurationBean;
-import com.ibm.jbatch.spi.services.IBatchConfig;
-
-public class BatchConfigImpl implements IBatchConfig {
-
- protected boolean j2seMode = false;
- protected DatabaseConfigurationBean databaseConfigBean = null;
- protected Properties configProperties = null;
-
- @Override
- public boolean isJ2seMode() {
- return j2seMode;
- }
-
- public void setJ2seMode(boolean j2seMode) {
- this.j2seMode = j2seMode;
- }
-
- @Override
- public DatabaseConfigurationBean getDatabaseConfigurationBean() {
- return databaseConfigBean;
- }
-
- public void setDatabaseConfigurationBean(DatabaseConfigurationBean databaseConfigBean) {
- this.databaseConfigBean = databaseConfigBean;
- }
-
- @Override
- public Properties getConfigProperties() {
- return configProperties;
- }
-
- public void setConfigProperties(Properties configProperties) {
- this.configProperties = configProperties;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BatchKernelImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BatchKernelImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BatchKernelImpl.java
deleted file mode 100755
index 98d9fc9..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BatchKernelImpl.java
+++ /dev/null
@@ -1,459 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.impl;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.batch.operations.JobExecutionAlreadyCompleteException;
-import javax.batch.operations.JobExecutionNotMostRecentException;
-import javax.batch.operations.JobExecutionNotRunningException;
-import javax.batch.operations.JobRestartException;
-import javax.batch.operations.JobStartException;
-import javax.batch.operations.NoSuchJobExecutionException;
-import javax.batch.runtime.JobInstance;
-
-import com.ibm.jbatch.container.IThreadRootController;
-import com.ibm.jbatch.container.callback.IJobEndCallbackService;
-import com.ibm.jbatch.container.exception.BatchContainerServiceException;
-import com.ibm.jbatch.container.jobinstance.JobExecutionHelper;
-import com.ibm.jbatch.container.jobinstance.RuntimeFlowInSplitExecution;
-import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
-import com.ibm.jbatch.container.services.IBatchKernelService;
-import com.ibm.jbatch.container.services.IJobExecution;
-import com.ibm.jbatch.container.services.IPersistenceManagerService;
-import com.ibm.jbatch.container.services.impl.NoOpBatchSecurityHelper;
-import com.ibm.jbatch.container.services.impl.RuntimeBatchJobUtil;
-import com.ibm.jbatch.container.servicesmanager.ServicesManager;
-import com.ibm.jbatch.container.servicesmanager.ServicesManagerImpl;
-import com.ibm.jbatch.container.util.BatchFlowInSplitWorkUnit;
-import com.ibm.jbatch.container.util.BatchPartitionWorkUnit;
-import com.ibm.jbatch.container.util.BatchWorkUnit;
-import com.ibm.jbatch.container.util.FlowInSplitBuilderConfig;
-import com.ibm.jbatch.container.util.PartitionsBuilderConfig;
-import com.ibm.jbatch.jsl.model.JSLJob;
-import com.ibm.jbatch.spi.BatchJobUtil;
-import com.ibm.jbatch.spi.BatchSPIManager;
-import com.ibm.jbatch.spi.BatchSecurityHelper;
-import com.ibm.jbatch.spi.services.IBatchConfig;
-import com.ibm.jbatch.spi.services.IBatchThreadPoolService;
-import com.ibm.jbatch.spi.services.ParallelTaskResult;
-
-public class BatchKernelImpl implements IBatchKernelService {
-
- private final static String sourceClass = BatchKernelImpl.class.getName();
- private final static Logger logger = Logger.getLogger(sourceClass);
-
- private Map<Long, IThreadRootController> executionId2jobControllerMap = new ConcurrentHashMap<Long, IThreadRootController>();
- private Set<Long> instanceIdExecutingSet = new HashSet<Long>();
-
- ServicesManager servicesManager = ServicesManagerImpl.getInstance();
-
- private IBatchThreadPoolService executorService = null;
-
- private IJobEndCallbackService callbackService = null;
-
- private IPersistenceManagerService persistenceService = null;
-
- private BatchSecurityHelper batchSecurity = null;
-
- private BatchJobUtil batchJobUtil = null;
-
- public BatchKernelImpl() {
- executorService = servicesManager.getThreadPoolService();
- callbackService = servicesManager.getJobCallbackService();
- persistenceService = servicesManager.getPersistenceManagerService();
-
- // registering our implementation of the util class used to purge by apptag
- batchJobUtil = new RuntimeBatchJobUtil();
- BatchSPIManager.getInstance().registerBatchJobUtil(batchJobUtil);
- }
-
- public BatchSecurityHelper getBatchSecurityHelper() {
- batchSecurity = BatchSPIManager.getInstance().getBatchSecurityHelper();
- if (batchSecurity == null) {
- batchSecurity = new NoOpBatchSecurityHelper();
- }
- return batchSecurity;
- }
-
- public void init(IBatchConfig pgcConfig) throws BatchContainerServiceException {
- }
-
- @Override
- public void shutdown() throws BatchContainerServiceException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public IJobExecution startJob(String jobXML) throws JobStartException {
- return startJob(jobXML, null);
- }
-
- @Override
- public IJobExecution startJob(String jobXML, Properties jobParameters) throws JobStartException {
- String method = "startJob";
-
- if (logger.isLoggable(Level.FINER)) {
- logger.entering(sourceClass, method, new Object[] { jobXML, jobParameters != null ? jobParameters : "<null>" });
- }
-
- RuntimeJobExecution jobExecution = JobExecutionHelper.startJob(jobXML, jobParameters);
-
- // TODO - register with status manager
-
- if (logger.isLoggable(Level.FINE)) {
- logger.fine("JobExecution constructed: " + jobExecution);
- }
-
- BatchWorkUnit batchWork = new BatchWorkUnit(this, jobExecution);
- registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
-
- executorService.executeTask(batchWork, null);
-
- if (logger.isLoggable(Level.FINER)) {
- logger.exiting(sourceClass, method, jobExecution);
- }
-
- return jobExecution.getJobOperatorJobExecution();
- }
-
- @Override
- public void stopJob(long executionId) throws NoSuchJobExecutionException, JobExecutionNotRunningException {
-
- IThreadRootController controller = this.executionId2jobControllerMap.get(executionId);
- if (controller == null) {
- String msg = "JobExecution with execution id of " + executionId + "is not running.";
- logger.warning("stopJob(): " + msg);
- throw new JobExecutionNotRunningException(msg);
- }
- controller.stop();
- }
-
- @Override
- public IJobExecution restartJob(long executionId) throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
- String method = "restartJob";
-
- if (logger.isLoggable(Level.FINER)) {
- logger.entering(sourceClass, method);
- }
-
- Properties dummyPropObj = new Properties();
- return this.restartJob(executionId, dummyPropObj);
- }
-
-
-
-
- @Override
- public IJobExecution restartJob(long executionId, Properties jobOverrideProps) throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException, NoSuchJobExecutionException {
- String method = "restartJob";
-
- if (logger.isLoggable(Level.FINER)) {
- logger.entering(sourceClass, method);
- }
-
- RuntimeJobExecution jobExecution =
- JobExecutionHelper.restartJob(executionId, jobOverrideProps);
-
- if (logger.isLoggable(Level.FINE)) {
- logger.fine("JobExecution constructed: " + jobExecution);
- }
-
- BatchWorkUnit batchWork = new BatchWorkUnit(this, jobExecution);
-
- registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
-
- executorService.executeTask(batchWork, null);
-
- if (logger.isLoggable(Level.FINER)) {
- logger.exiting(sourceClass, method, jobExecution);
- }
-
- return jobExecution.getJobOperatorJobExecution();
- }
-
- @Override
- public void jobExecutionDone(RuntimeJobExecution jobExecution) {
-
- if (logger.isLoggable(Level.FINE)) {
- logger.fine("JobExecution done with batchStatus: " + jobExecution.getBatchStatus() + " , getting ready to invoke callbacks for JobExecution: " + jobExecution.getExecutionId());
- }
-
- callbackService.done(jobExecution.getExecutionId());
-
- if (logger.isLoggable(Level.FINE)) {
- logger.fine("Done invoking callbacks for JobExecution: " + jobExecution.getExecutionId());
- }
-
- // Remove from executionId, instanceId map,set after job is done
- this.executionId2jobControllerMap.remove(jobExecution.getExecutionId());
- this.instanceIdExecutingSet.remove(jobExecution.getInstanceId());
-
- // AJM: ah - purge jobExecution from map here and flush to DB?
- // edit: no long want a 2 tier for the jobexecution...do want it for step execution
- // renamed method to flushAndRemoveStepExecution
-
- }
-
- public IJobExecution getJobExecution(long executionId) throws NoSuchJobExecutionException {
- /*
- * Keep logging on finest for apps like TCK which do polling
- */
- logger.finest("Entering " + sourceClass + ".getJobExecution(), executionId = " + executionId);
- IJobExecution retVal = JobExecutionHelper.getPersistedJobOperatorJobExecution(executionId);
-
- logger.finest("Exiting " + sourceClass + ".getJobExecution(), retVal = " + retVal);
- return retVal;
- }
-
- @Override
- public void startGeneratedJob(BatchWorkUnit batchWork) {
- String method = "startGeneratedJob";
-
- if (logger.isLoggable(Level.FINER)) {
- logger.entering(sourceClass, method, new Object[] { batchWork });
- }
-
- //This call is non-blocking
- ParallelTaskResult result = executorService.executeParallelTask(batchWork, null);
-
- if (logger.isLoggable(Level.FINER)) {
- logger.exiting(sourceClass, method, new Object[] { batchWork });
- }
- }
-
- @Override
- public int getJobInstanceCount(String jobName) {
- int jobInstanceCount = 0;
-
- jobInstanceCount = persistenceService.jobOperatorGetJobInstanceCount(jobName);
-
- return jobInstanceCount;
- }
-
- @Override
- public JobInstance getJobInstance(long executionId){
- return JobExecutionHelper.getJobInstance(executionId);
- }
-
-
- /**
- * Build a list of batch work units and set them up in STARTING state but don't start them yet.
- */
-
- @Override
- public List<BatchPartitionWorkUnit> buildNewParallelPartitions(PartitionsBuilderConfig config)
- throws JobRestartException, JobStartException {
-
- List<JSLJob> jobModels = config.getJobModels();
- Properties[] partitionPropertiesArray = config.getPartitionProperties();
-
- List<BatchPartitionWorkUnit> batchWorkUnits = new ArrayList<BatchPartitionWorkUnit>(jobModels.size());
-
- int instance = 0;
- for (JSLJob parallelJob : jobModels){
- Properties partitionProps = (partitionPropertiesArray == null) ? null : partitionPropertiesArray[instance];
-
- if (logger.isLoggable(Level.FINER)) {
- logger.finer("Starting execution for jobModel = " + parallelJob.toString());
- }
- RuntimeJobExecution jobExecution = JobExecutionHelper.startPartition(parallelJob, partitionProps);
- jobExecution.setPartitionInstance(instance);
-
- if (logger.isLoggable(Level.FINE)) {
- logger.fine("JobExecution constructed: " + jobExecution);
- }
- BatchPartitionWorkUnit batchWork = new BatchPartitionWorkUnit(this, jobExecution, config);
-
- registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
-
- batchWorkUnits.add(batchWork);
- instance++;
- }
-
- return batchWorkUnits;
- }
-
- @Override
- public List<BatchPartitionWorkUnit> buildOnRestartParallelPartitions(PartitionsBuilderConfig config) throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {
-
- List<JSLJob> jobModels = config.getJobModels();
- Properties[] partitionProperties = config.getPartitionProperties();
-
- List<BatchPartitionWorkUnit> batchWorkUnits = new ArrayList<BatchPartitionWorkUnit>(jobModels.size());
-
- //for now let always use a Properties array. We can add some more convenience methods later for null properties and what not
-
- int instance = 0;
- for (JSLJob parallelJob : jobModels){
-
- Properties partitionProps = (partitionProperties == null) ? null : partitionProperties[instance];
-
- try {
- long execId = getMostRecentExecutionId(parallelJob);
-
- RuntimeJobExecution jobExecution = null;
- try {
- jobExecution = JobExecutionHelper.restartPartition(execId, parallelJob, partitionProps);
- jobExecution.setPartitionInstance(instance);
- } catch (NoSuchJobExecutionException e) {
- String errorMsg = "Caught NoSuchJobExecutionException but this is an internal JobExecution so this shouldn't have happened: execId =" + execId;
- logger.severe(errorMsg);
- throw new IllegalStateException(errorMsg, e);
- }
-
- if (logger.isLoggable(Level.FINE)) {
- logger.fine("JobExecution constructed: " + jobExecution);
- }
-
- BatchPartitionWorkUnit batchWork = new BatchPartitionWorkUnit(this, jobExecution, config);
- registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
-
- batchWorkUnits.add(batchWork);
- } catch (JobExecutionAlreadyCompleteException e) {
- logger.fine("This execution already completed: " + parallelJob.getId());
- }
-
- instance++;
- }
-
- return batchWorkUnits;
- }
-
- @Override
- public void restartGeneratedJob(BatchWorkUnit batchWork) throws JobRestartException {
- String method = "restartGeneratedJob";
-
- if (logger.isLoggable(Level.FINER)) {
- logger.entering(sourceClass, method, new Object[] { batchWork });
- }
-
- //This call is non-blocking
- ParallelTaskResult result = executorService.executeParallelTask(batchWork, null);
-
- if (logger.isLoggable(Level.FINER)) {
- logger.exiting(sourceClass, method, batchWork);
- }
-
- }
-
- @Override
- public BatchFlowInSplitWorkUnit buildNewFlowInSplitWorkUnit(FlowInSplitBuilderConfig config) {
- JSLJob parallelJob = config.getJobModel();
-
- RuntimeFlowInSplitExecution execution = JobExecutionHelper.startFlowInSplit(parallelJob);
-
- if (logger.isLoggable(Level.FINE)) {
- logger.fine("JobExecution constructed: " + execution);
- }
- BatchFlowInSplitWorkUnit batchWork = new BatchFlowInSplitWorkUnit(this, execution, config);
-
- registerCurrentInstanceAndExecution(execution, batchWork.getController());
- return batchWork;
- }
-
- private long getMostRecentExecutionId(JSLJob jobModel) {
-
- //There can only be one instance associated with a subjob's id since it is generated from an unique
- //job instance id. So there should be no way to directly start a subjob with particular
- List<Long> instanceIds = persistenceService.jobOperatorGetJobInstanceIds(jobModel.getId(), 0, 2);
-
- // Maybe we should blow up on '0' too?
- if (instanceIds.size() > 1) {
- String errorMsg = "Found " + instanceIds.size() + " entries for instance id = " + jobModel.getId() + ", which should not have happened. Blowing up.";
- logger.severe(errorMsg);
- throw new IllegalStateException(errorMsg);
- }
-
- List<IJobExecution> partitionExecs = persistenceService.jobOperatorGetJobExecutions(instanceIds.get(0));
-
- Long execId = Long.MIN_VALUE;
-
- for (IJobExecution partitionExec : partitionExecs ) {
- if (partitionExec.getExecutionId() > execId ) {
- execId = partitionExec.getExecutionId();
- }
- }
- return execId;
- }
-
- @Override
- public BatchFlowInSplitWorkUnit buildOnRestartFlowInSplitWorkUnit(FlowInSplitBuilderConfig config)
- throws JobRestartException, JobExecutionAlreadyCompleteException, JobExecutionNotMostRecentException {
-
- String method = "buildOnRestartFlowInSplitWorkUnit";
-
- JSLJob jobModel = config.getJobModel();
-
- if (logger.isLoggable(Level.FINER)) {
- logger.entering(sourceClass, method, jobModel);
- }
-
- long execId = getMostRecentExecutionId(jobModel);
-
- RuntimeFlowInSplitExecution jobExecution = null;
- try {
- jobExecution = JobExecutionHelper.restartFlowInSplit(execId, jobModel);
- } catch (NoSuchJobExecutionException e) {
- String errorMsg = "Caught NoSuchJobExecutionException but this is an internal JobExecution so this shouldn't have happened: execId =" + execId;
- logger.severe(errorMsg);
- throw new IllegalStateException(errorMsg, e);
- }
-
- if (logger.isLoggable(Level.FINE)) {
- logger.fine("JobExecution constructed: " + jobExecution);
- }
-
- BatchFlowInSplitWorkUnit batchWork = new BatchFlowInSplitWorkUnit(this, jobExecution, config);
-
- registerCurrentInstanceAndExecution(jobExecution, batchWork.getController());
- return batchWork;
- }
-
- private void registerCurrentInstanceAndExecution(RuntimeJobExecution jobExecution, IThreadRootController controller) {
- long execId = jobExecution.getExecutionId();
- long instanceId = jobExecution.getInstanceId();
- String errorPrefix = "Tried to execute with Job executionId = " + execId + " and instanceId = " + instanceId + " ";
- if (executionId2jobControllerMap.get(execId) != null) {
- String errorMsg = errorPrefix + "but executionId is already currently executing.";
- logger.warning(errorMsg);
- throw new IllegalStateException(errorMsg);
- } else if (instanceIdExecutingSet.contains(instanceId)) {
- String errorMsg = errorPrefix + "but another execution with this instanceId is already currently executing.";
- logger.warning(errorMsg);
- throw new IllegalStateException(errorMsg);
- } else {
- instanceIdExecutingSet.add(instanceId);
- executionId2jobControllerMap.put(jobExecution.getExecutionId(), controller);
- }
- }
-
- @Override
- public boolean isExecutionRunning(long executionId) {
- return executionId2jobControllerMap.containsKey(executionId);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BatchletStepControllerImpl.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BatchletStepControllerImpl.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BatchletStepControllerImpl.java
deleted file mode 100755
index 2129c95..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/BatchletStepControllerImpl.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.ibm.jbatch.container.impl;
-
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.batch.runtime.BatchStatus;
-
-
-import com.ibm.jbatch.container.artifact.proxy.BatchletProxy;
-import com.ibm.jbatch.container.artifact.proxy.InjectionReferences;
-import com.ibm.jbatch.container.artifact.proxy.ProxyFactory;
-import com.ibm.jbatch.container.context.impl.StepContextImpl;
-import com.ibm.jbatch.container.exception.BatchContainerServiceException;
-import com.ibm.jbatch.container.jobinstance.RuntimeJobExecution;
-import com.ibm.jbatch.container.util.PartitionDataWrapper;
-import com.ibm.jbatch.container.validation.ArtifactValidationException;
-import com.ibm.jbatch.jsl.model.Batchlet;
-import com.ibm.jbatch.jsl.model.Partition;
-import com.ibm.jbatch.jsl.model.Property;
-import com.ibm.jbatch.jsl.model.Step;
-
-public class BatchletStepControllerImpl extends SingleThreadedStepControllerImpl {
-
- private final static String sourceClass = BatchletStepControllerImpl.class.getName();
- private final static Logger logger = Logger.getLogger(sourceClass);
-
- private BatchletProxy batchletProxy;
-
- public BatchletStepControllerImpl(RuntimeJobExecution jobExecutionImpl, Step step, StepContextImpl stepContext, long rootJobExecutionId, BlockingQueue<PartitionDataWrapper> analyzerStatusQueue) {
- super(jobExecutionImpl, step, stepContext, rootJobExecutionId, analyzerStatusQueue);
- }
-
- private void invokeBatchlet(Batchlet batchlet) throws BatchContainerServiceException {
-
- String batchletId = batchlet.getRef();
- List<Property> propList = (batchlet.getProperties() == null) ? null : batchlet.getProperties().getPropertyList();
-
- String sourceMethod = "invokeBatchlet";
- if (logger.isLoggable(Level.FINER)) {
- logger.entering(sourceClass, sourceMethod, batchletId);
- }
-
- String exitStatus = null;
-
- InjectionReferences injectionRef = new InjectionReferences(jobExecutionImpl.getJobContext(), stepContext,
- propList);
-
- try {
- batchletProxy = ProxyFactory.createBatchletProxy(batchletId, injectionRef, stepContext);
- } catch (ArtifactValidationException e) {
- throw new BatchContainerServiceException("Cannot create the batchlet [" + batchletId + "]", e);
- }
-
- if (logger.isLoggable(Level.FINE))
- logger.fine("Batchlet is loaded and validated: " + batchletProxy);
-
-
- if (wasStopIssued()) {
- logger.fine("Exit without executing batchlet since stop() request has been received.");
- } else {
- String processRetVal = batchletProxy.process();
-
- logger.fine("Set process() return value = " + processRetVal + " for possible use as exitStatus");
- stepContext.setBatchletProcessRetVal(processRetVal);
-
- logger.exiting(sourceClass, sourceMethod, exitStatus==null ? "<null>" : exitStatus);
- }
- }
-
- protected synchronized boolean wasStopIssued() {
- // Might only be set to stopping at the job level. Use the lock for this object on this
- // method along with the stop() method
- if (jobExecutionImpl.getJobContext().getBatchStatus().equals(BatchStatus.STOPPING)){
- stepContext.setBatchStatus(BatchStatus.STOPPING);
- return true;
- } else {
- return false;
- }
- }
- @Override
- protected void invokeCoreStep() throws BatchContainerServiceException {
-
- //TODO If this step is partitioned create partition artifacts
- Partition partition = step.getPartition();
- if (partition != null) {
- //partition.getConcurrencyElements();
- }
- try {
- invokeBatchlet(step.getBatchlet());
- } finally {
- invokeCollectorIfPresent();
- }
-
- }
-
- @Override
- public synchronized void stop() {
-
- // It is possible for stop() to be issued before process()
- if (BatchStatus.STARTING.equals(stepContext.getBatchStatus()) ||
- BatchStatus.STARTED.equals(stepContext.getBatchStatus())) {
-
- stepContext.setBatchStatus(BatchStatus.STOPPING);
-
- if (batchletProxy != null) {
- batchletProxy.stop();
- }
- } else {
- //TODO do we need to throw an error if the batchlet is already stopping/stopped
- //a stop gets issued twice
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/efa64877/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ChunkHelper.java
----------------------------------------------------------------------
diff --git a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ChunkHelper.java b/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ChunkHelper.java
deleted file mode 100755
index 8cda7da..0000000
--- a/JSR352.Runtime/src/com/ibm/jbatch/container/impl/ChunkHelper.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright 2012 International Business Machines Corp.
- *
- * See the NOTICE file distributed with this work for additional information
- * regarding copyright ownership. Licensed under the Apache License,
- * Version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package com.ibm.jbatch.container.impl;
-
-import com.ibm.jbatch.jsl.model.Chunk;
-
-public class ChunkHelper {
-
- public static int getItemCount(Chunk chunk) {
- String chunkSizeStr = chunk.getItemCount();
- int size = 10;
-
- if (chunkSizeStr != null && !chunkSizeStr.isEmpty()) {
- size = Integer.valueOf(chunk.getItemCount());
- }
-
- chunk.setItemCount(Integer.toString(size));
- return size;
- }
-
- public static int getTimeLimit(Chunk chunk){
- String chunkTimeLimitStr = chunk.getTimeLimit();
- int timeLimit = 0; //default time limit = 0 seconds ie no timelimit
-
- if (chunkTimeLimitStr != null && !chunkTimeLimitStr.isEmpty()) {
- timeLimit = Integer.valueOf(chunk.getTimeLimit());
- }
-
- chunk.setTimeLimit(Integer.toString(timeLimit));
- return timeLimit;
- }
-
- public static String getCheckpointPolicy(Chunk chunk) {
- String checkpointPolicy = chunk.getCheckpointPolicy();
-
- if (checkpointPolicy != null && !checkpointPolicy.isEmpty()) {
- if (!(checkpointPolicy.equals("item") || checkpointPolicy.equals("custom"))) {
- throw new IllegalArgumentException("The only supported attributed values for 'checkpoint-policy' are 'item' and 'custom'.");
- }
- } else {
- checkpointPolicy = "item";
- }
-
- chunk.setCheckpointPolicy(checkpointPolicy);
- return checkpointPolicy;
- }
-
- public static int getSkipLimit(Chunk chunk) {
- return Integer.valueOf(chunk.getSkipLimit());
- }
-
- public static int getRetryLimit(Chunk chunk) {
- return Integer.valueOf(chunk.getRetryLimit());
- }
-
-}