You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2006/12/22 00:27:13 UTC
svn commit: r489498 -
/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessAndInstanceManagementImpl.java
Author: mszefler
Date: Thu Dec 21 15:27:12 2006
New Revision: 489498
URL: http://svn.apache.org/viewvc?view=rev&rev=489498
Log:
Update the PMAPI to rely more on ProcessStore rather than the runtime db.
Modified:
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessAndInstanceManagementImpl.java
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessAndInstanceManagementImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessAndInstanceManagementImpl.java?view=diff&rev=489498&r1=489497&r2=489498
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessAndInstanceManagementImpl.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessAndInstanceManagementImpl.java Thu Dec 21 15:27:12 2006
@@ -19,9 +19,11 @@
package org.apache.ode.bpel.engine;
+import org.apache.commons.collections.comparators.ComparatorChain;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.common.BpelEventFilter;
+import org.apache.ode.bpel.common.Filter;
import org.apache.ode.bpel.common.InstanceFilter;
import org.apache.ode.bpel.common.ProcessFilter;
import org.apache.ode.bpel.dao.*;
@@ -37,58 +39,69 @@
import org.apache.ode.bpel.o.OPartnerLink;
import org.apache.ode.bpel.o.OProcess;
import org.apache.ode.bpel.pmapi.*;
+import org.apache.ode.daohib.bpel.hobj.HProcess;
import org.apache.ode.utils.ISO8601DateParser;
import org.apache.ode.utils.msg.MessageBundle;
import org.apache.ode.utils.stl.CollectionsX;
+import org.apache.ode.utils.stl.MemberOfFunction;
import org.apache.ode.utils.stl.UnaryFunction;
+import org.hibernate.Criteria;
+import org.hibernate.Session;
+import org.hibernate.criterion.Example;
+import org.hibernate.criterion.Property;
+import org.hibernate.criterion.Restrictions;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import javax.xml.namespace.QName;
import java.io.File;
+import java.text.ParseException;
import java.util.*;
+import java.util.regex.Pattern;
/**
* Implentation of the Process and InstanceManagement APIs.
*
- * TODO Pull up IM/PM methods from BpelManagementFacadeImpl
+ * @todo Move this out of the engine, it no longer belongs here.
*/
-public class ProcessAndInstanceManagementImpl
- implements InstanceManagement, ProcessManagement {
+public class ProcessAndInstanceManagementImpl implements InstanceManagement, ProcessManagement {
protected static final Messages __msgs = MessageBundle.getMessages(Messages.class);
+
protected static Log __log = LogFactory.getLog(BpelManagementFacadeImpl.class);
+
protected static final ProcessStatusConverter __psc = new ProcessStatusConverter();
+
protected BpelDatabase _db;
+
protected ProcessStore _store;
- protected Calendar _calendar = Calendar.getInstance(); // Calendar can be expensive to initialize so we cache and clone it
+
+ protected Calendar _calendar = Calendar.getInstance(); // Calendar can be
+
+ // expensive to
+ // initialize so we
+ // cache and clone
+ // it
protected BpelServerImpl _server;
- public ProcessAndInstanceManagementImpl(BpelServer server,
- ProcessStore store) {
+ public ProcessAndInstanceManagementImpl(BpelServer server, ProcessStore store) {
_server = (BpelServerImpl) server;
_db = _server._db;
_store = store;
}
- public ProcessInfoListDocument listProcessesCustom(String filter, String orderKeys, final ProcessInfoCustomizer custom) {
+ public ProcessInfoListDocument listProcessesCustom(String filter, String orderKeys,
+ final ProcessInfoCustomizer custom) {
ProcessInfoListDocument ret = ProcessInfoListDocument.Factory.newInstance();
final TProcessInfoList procInfoList = ret.addNewProcessInfoList();
final ProcessFilter processFilter = new ProcessFilter(filter, orderKeys);
try {
- _db.exec(new BpelDatabase.Callable<Object>() {
- public Object run(BpelDAOConnection conn) {
- Collection<ProcessDAO> processes = conn.processQuery(processFilter);
- for (ProcessDAO proc : processes) {
- fillProcessInfo(procInfoList.addNewProcessInfo(), proc, custom);
- }
- return null;
- }
- });
+ for (ProcessConf pconf : processQuery(processFilter))
+ fillProcessInfo(procInfoList.addNewProcessInfo(), pconf, custom);
} catch (Exception e) {
- throw new ProcessingException("Exception while listing processes",e);
+ throw new ProcessingException("Exception while listing processes", e);
}
return ret;
@@ -106,19 +119,20 @@
return genProcessInfoDocument(pid, custom);
}
-
public ProcessInfoDocument getProcessInfo(QName pid) {
return getProcessInfoCustom(pid, ProcessInfoCustomizer.ALL);
}
public ProcessInfoDocument activate(QName pid) {
- // TODO: Figure out how to deal with activation/retirement.
+ try {
+ _store.setState(pid, org.apache.ode.bpel.iapi.ProcessState.ACTIVE);
+ } catch (Exception ex) {
+ throw new ManagementException("Error setting process state.", ex);
+ }
return genProcessInfoDocument(pid, ProcessInfoCustomizer.NONE);
}
-
- public ProcessInfoDocument setRetired(final QName pid, final boolean retired)
- throws ManagementException {
+ public ProcessInfoDocument setRetired(final QName pid, final boolean retired) throws ManagementException {
try {
_store.setState(pid, retired ? ProcessState.RETIRED : ProcessState.ACTIVE);
} catch (BpelEngineException e) {
@@ -132,20 +146,26 @@
ProcessInfoDocument ret = ProcessInfoDocument.Factory.newInstance();
final TProcessInfo pi = ret.addNewProcessInfo();
try {
- _db.exec(new BpelDatabase.Callable<Object>() {
- public Object run(BpelDAOConnection conn) throws Exception {
- ProcessDAO proc = conn.getProcess(pid);
- if (proc == null)
- throw new ProcessNotFoundException("ProcessNotFound:" + pid);
- _store.setProperty(pid, propertyName, value);
- fillProcessInfo(pi, proc, new ProcessInfoCustomizer(ProcessInfoCustomizer.Item.PROPERTIES));
- return null;
- }
- });
+ try {
+ _store.setProperty(pid, propertyName, value);
+ } catch (Exception ex) {
+ // Likely the process no longer exists in the store.
+ __log.debug("Error setting property value for " + pid + "; " + propertyName);
+ }
+
+ // We have to do this after we set the property, since the
+ // ProcessConf object
+ // is immutable.
+ ProcessConf proc = _store.getProcessConfiguration(pid);
+ if (proc == null)
+ throw new ProcessNotFoundException("ProcessNotFound:" + pid);
+
+ fillProcessInfo(pi, proc, new ProcessInfoCustomizer(ProcessInfoCustomizer.Item.PROPERTIES));
+
} catch (ManagementException me) {
throw me;
} catch (Exception e) {
- throw new ProcessingException("Exception while setting process property",e);
+ throw new ProcessingException("Exception while setting process property", e);
}
return ret;
@@ -156,20 +176,26 @@
ProcessInfoDocument ret = ProcessInfoDocument.Factory.newInstance();
final TProcessInfo pi = ret.addNewProcessInfo();
try {
- _db.exec(new BpelDatabase.Callable<Object>() {
- public Object run(BpelDAOConnection conn) throws Exception {
- ProcessDAO proc = conn.getProcess(pid);
- if (proc == null)
- throw new ProcessNotFoundException("ProcessNotFound:" + pid);
- _store.setProperty(pid, propertyName, value);
- fillProcessInfo(pi, proc, new ProcessInfoCustomizer(ProcessInfoCustomizer.Item.PROPERTIES));
- return null;
- }
- });
+ try {
+ _store.setProperty(pid, propertyName, value);
+ } catch (Exception ex) {
+ // Likely the process no longer exists in the store.
+ __log.debug("Error setting property value for " + pid + "; " + propertyName);
+ }
+
+ // We have to do this after we set the property, since the
+ // ProcessConf object
+ // is immutable.
+ ProcessConf proc = _store.getProcessConfiguration(pid);
+ if (proc == null)
+ throw new ProcessNotFoundException("ProcessNotFound:" + pid);
+
+ fillProcessInfo(pi, proc, new ProcessInfoCustomizer(ProcessInfoCustomizer.Item.PROPERTIES));
+
} catch (ManagementException me) {
throw me;
} catch (Exception e) {
- throw new ProcessingException("Exception while setting process property",e);
+ throw new ProcessingException("Exception while setting process property", e);
}
return ret;
@@ -191,7 +217,7 @@
}
});
} catch (Exception e) {
- throw new ProcessingException("Exception while listing instances",e);
+ throw new ProcessingException("Exception while listing instances", e);
}
return ret;
@@ -209,8 +235,6 @@
return genInstanceInfoDocument(iid);
}
-
-
public ScopeInfoDocument getScopeInfo(String siid) {
return getScopeInfoWithActivity(siid, false);
}
@@ -219,12 +243,11 @@
return genScopeInfoDocument(siid, includeActivityInfo);
}
- public VariableInfoDocument getVariableInfo(final String scopeId, final String varName)
- throws ManagementException {
+ public VariableInfoDocument getVariableInfo(final String scopeId, final String varName) throws ManagementException {
VariableInfoDocument ret = VariableInfoDocument.Factory.newInstance();
final TVariableInfo vinf = ret.addNewVariableInfo();
final TVariableRef sref = vinf.addNewSelf();
- dbexec(new BpelDatabase.Callable<Object>() {
+ dbexec(new BpelDatabase.Callable<Object>() {
public Object run(BpelDAOConnection session) throws Exception {
ScopeDAO scope = session.getScope(new Long(scopeId));
if (scope == null) {
@@ -233,7 +256,7 @@
sref.setSiid(scopeId);
sref.setIid(scope.getProcessInstance().getInstanceId().toString());
- sref.setName(varName );
+ sref.setName(varName);
XmlDataDAO var = scope.getVariable(varName);
if (var == null) {
@@ -243,8 +266,7 @@
Node nval = var.get();
if (nval != null) {
TVariableInfo.Value val = vinf.addNewValue();
- val.getDomNode().appendChild(
- val.getDomNode().getOwnerDocument().importNode(nval,true));
+ val.getDomNode().appendChild(val.getDomNode().getOwnerDocument().importNode(nval, true));
}
return null;
}
@@ -260,7 +282,6 @@
return genInstanceInfoDocument(iid);
}
-
public InstanceInfoDocument resume(final Long iid) {
// We need debugger support in order to resume (since we have to force
// a reduction. If one is not available the getDebugger() method should
@@ -270,8 +291,7 @@
return genInstanceInfoDocument(iid);
}
- public InstanceInfoDocument suspend(final Long iid)
- throws ManagementException {
+ public InstanceInfoDocument suspend(final Long iid) throws ManagementException {
DebuggerSupport debugSupport = getDebugger(iid);
assert debugSupport != null : "getDebugger(Long) returned NULL!";
debugSupport.suspend(iid);
@@ -294,9 +314,10 @@
ProcessInstanceDAO instance = conn.getInstance(iid);
if (instance == null)
return null;
- for (ActivityRecoveryDAO recovery: instance.getActivityRecoveries()) {
+ for (ActivityRecoveryDAO recovery : instance.getActivityRecoveries()) {
if (recovery.getActivityId() == aid) {
- BpelProcess process = _server._engine._activeProcesses.get(instance.getProcess().getProcessId());
+ BpelProcess process = _server._engine._activeProcesses.get(instance.getProcess()
+ .getProcessId());
if (process != null) {
process.recoverActivity(instance, recovery.getChannel(), aid, action, null);
break;
@@ -321,14 +342,14 @@
_db.exec(new BpelDatabase.Callable<Object>() {
public Object run(BpelDAOConnection conn) {
Collection<ProcessInstanceDAO> instances = conn.instanceQuery(instanceFilter);
- for (ProcessInstanceDAO instance :instances) {
+ for (ProcessInstanceDAO instance : instances) {
instance.delete();
}
return null;
}
});
} catch (Exception e) {
- throw new ProcessingException("Exception during instance deletion",e);
+ throw new ProcessingException("Exception during instance deletion", e);
}
return ret;
@@ -338,17 +359,17 @@
// EVENT RETRIEVAL
//
public List<String> getEventTimeline(String instanceFilter, String eventFilter) {
- final InstanceFilter ifilter = new InstanceFilter(instanceFilter,null,0);
- final BpelEventFilter efilter = new BpelEventFilter(eventFilter,0);
+ final InstanceFilter ifilter = new InstanceFilter(instanceFilter, null, 0);
+ final BpelEventFilter efilter = new BpelEventFilter(eventFilter, 0);
- List<Date> tline = dbexec(new BpelDatabase.Callable<List<Date>>() {
+ List<Date> tline = dbexec(new BpelDatabase.Callable<List<Date>>() {
public List<Date> run(BpelDAOConnection session) throws Exception {
return session.bpelEventTimelineQuery(ifilter, efilter);
}
});
ArrayList<String> ret = new ArrayList<String>(tline.size());
- CollectionsX.transform(ret,tline,new UnaryFunction<Date,String>() {
+ CollectionsX.transform(ret, tline, new UnaryFunction<Date, String>() {
public String apply(Date x) {
return ISO8601DateParser.format(x);
}
@@ -357,11 +378,11 @@
}
public EventInfoListDocument listEvents(String instanceFilter, String eventFilter, int maxCount) {
- final InstanceFilter ifilter = new InstanceFilter(instanceFilter,null,0);
- final BpelEventFilter efilter = new BpelEventFilter(eventFilter,maxCount);
+ final InstanceFilter ifilter = new InstanceFilter(instanceFilter, null, 0);
+ final BpelEventFilter efilter = new BpelEventFilter(eventFilter, maxCount);
EventInfoListDocument eid = EventInfoListDocument.Factory.newInstance();
final TEventInfoList eil = eid.addNewEventInfoList();
- dbexec(new BpelDatabase.Callable<Object>() {
+ dbexec(new BpelDatabase.Callable<Object>() {
public Object run(BpelDAOConnection session) throws Exception {
List<BpelEvent> events = session.bpelEventQuery(ifilter, efilter);
for (BpelEvent event : events) {
@@ -384,11 +405,11 @@
if (obase != null && obase.debugInfo != null && obase.debugInfo.extensibilityElements != null) {
for (Map.Entry<QName, Object> entry : obase.debugInfo.extensibilityElements.entrySet()) {
TActivityExtInfo taei = taeil.addNewActivityExtInfo();
- taei.setAiid(""+aid);
+ taei.setAiid("" + aid);
Object extValue = entry.getValue();
if (extValue instanceof Element)
- taei.getDomNode().appendChild(taei.getDomNode()
- .getOwnerDocument().importNode((Element) extValue, true));
+ taei.getDomNode().appendChild(
+ taei.getDomNode().getOwnerDocument().importNode((Element) extValue, true));
else if (extValue instanceof String) {
Element valueElmt = taei.getDomNode().getOwnerDocument().createElementNS(
entry.getKey().getNamespaceURI(), entry.getKey().getLocalPart());
@@ -402,10 +423,12 @@
}
/**
- * Get the {@link DebuggerSupport} object for the given process identifier. Debugger
- * support is required for operations that resume execution in some way or manipulate
- * the breakpoints.
- * @param procid process identifier
+ * Get the {@link DebuggerSupport} object for the given process identifier.
+ * Debugger support is required for operations that resume execution in some
+ * way or manipulate the breakpoints.
+ *
+ * @param procid
+ * process identifier
* @return associated debugger support object
* @throws ManagementException
*/
@@ -413,17 +436,18 @@
BpelProcess process = _server._engine._activeProcesses.get(procid);
if (process == null)
- throw new InvalidRequestException("The process \"" + procid + "\" is available." );
+ throw new InvalidRequestException("The process \"" + procid + "\" is available.");
return process._debugger;
}
-
/**
- * Get the {@link DebuggerSupport} object for the given instance identifier. Debugger
- * support is required for operations that resume execution in some way or manipulate
- * the breakpoints.
- * @param iid instance identifier
+ * Get the {@link DebuggerSupport} object for the given instance identifier.
+ * Debugger support is required for operations that resume execution in some
+ * way or manipulate the breakpoints.
+ *
+ * @param iid
+ * instance identifier
* @return associated debugger support object
* @throws ManagementException
*/
@@ -445,8 +469,11 @@
}
/**
- * Execute a database transaction, unwrapping nested {@link ManagementException}s.
- * @param runnable action to run
+ * Execute a database transaction, unwrapping nested
+ * {@link ManagementException}s.
+ *
+ * @param runnable
+ * action to run
* @return
* @throws ManagementException
*/
@@ -461,8 +488,11 @@
}
/**
- * Execute a database transaction, unwrapping nested {@link ManagementException}s.
- * @param callable action to run
+ * Execute a database transaction, unwrapping nested
+ * {@link ManagementException}s.
+ *
+ * @param callable
+ * action to run
* @return
* @throws ManagementException
*/
@@ -473,7 +503,7 @@
// Passthrough.
throw me;
} catch (Exception ex) {
- throw new ManagementException("Exception during database operation",ex);
+ throw new ManagementException("Exception during database operation", ex);
}
}
@@ -482,19 +512,14 @@
ProcessInfoDocument ret = ProcessInfoDocument.Factory.newInstance();
final TProcessInfo pi = ret.addNewProcessInfo();
try {
- _db.exec(new BpelDatabase.Callable<Object>() {
- public Object run(BpelDAOConnection conn) {
- ProcessDAO proc = conn.getProcess(procid);
- if (proc == null)
- throw new InvalidRequestException("ProcessNotFound:" + procid);
- fillProcessInfo(pi, proc, custom);
- return null;
- }
- });
+ ProcessConf pconf = _store.getProcessConfiguration(procid);
+ if (pconf == null)
+ throw new ProcessNotFoundException("ProcessNotFound:" + procid);
+ fillProcessInfo(pi, pconf, custom);
} catch (ManagementException me) {
throw me;
} catch (Exception e) {
- throw new ProcessingException("Exception while retrieving process information",e);
+ throw new ProcessingException("Exception while retrieving process information", e);
}
return ret;
@@ -503,7 +528,9 @@
/**
* Generate a {@link InstanceInfoDocument} for a given instance. This
* document contains general information about the instance.
- * @param iid instance identifier
+ *
+ * @param iid
+ * instance identifier
* @return generated document
*/
private InstanceInfoDocument genInstanceInfoDocument(final Long iid) {
@@ -521,18 +548,19 @@
if (instance == null)
throw new InstanceNotFoundException("" + iid);
// TODO: deal with "ERROR" state information.
- fillInstanceInfo(ii,instance);
+ fillInstanceInfo(ii, instance);
return null;
}
});
-
return ret;
}
/**
* Generate a {@link ScopeInfoDocument} for a given scope instance.
- * @param siid scope instance identifier
+ *
+ * @param siid
+ * scope instance identifier
* @param includeActivityInfo
* @return generated document
*/
@@ -557,7 +585,7 @@
if (instance == null)
throw new InvalidRequestException("Scope not found: " + siidl);
// TODO: deal with "ERROR" state information.
- fillScopeInfo(ii,instance,includeActivityInfo);
+ fillScopeInfo(ii, instance, includeActivityInfo);
return null;
}
});
@@ -566,36 +594,45 @@
/**
* Fill in the <code>process-info</code> element of the transfer object.
- * @param info destination XMLBean
- * @param proc source DAO object
- * @param custom used to customize the quantity of information produced in the info
- */
- private void fillProcessInfo(TProcessInfo info, ProcessDAO proc, ProcessInfoCustomizer custom) {
- ProcessConf pconf = _store.getProcessConfiguration(proc.getProcessId());
- info.setPid(proc.getProcessId().toString());
+ *
+ * @param info
+ * destination XMLBean
+ * @param pconf
+ * process configuration object (from store)
+ * @param proc
+ * source DAO object
+ * @param custom
+ * used to customize the quantity of information produced in the
+ * info
+ */
+ private void fillProcessInfo(TProcessInfo info, ProcessConf pconf, ProcessInfoCustomizer custom) {
+ if (pconf == null)
+ throw new IllegalArgumentException("Null pconf.");
+
+ info.setPid(pconf.getProcessId().toString());
// TODO: ACTIVE and RETIRED should be used separately.
- //Active process may be retired at the same time
- if(pconf.getState() == ProcessState.RETIRED) {
+ // Active process may be retired at the same time
+ if (pconf.getState() == ProcessState.RETIRED) {
info.setStatus(TProcessStatus.RETIRED);
} else {
info.setStatus(TProcessStatus.ACTIVE);
}
TDefinitionInfo definfo = info.addNewDefinitionInfo();
- definfo.setProcessName(proc.getType());
+ definfo.setProcessName(pconf.getType());
TDeploymentInfo depinfo = info.addNewDeploymentInfo();
depinfo.setDeployDate(toCalendar(pconf.getDeployDate()));
depinfo.setDeployer(pconf.getDeployer());
if (custom.includeInstanceSummary()) {
TInstanceSummary isum = info.addNewInstanceSummary();
- genInstanceSummaryEntry(isum.addNewInstances(),TInstanceStatus.ACTIVE, proc);
- genInstanceSummaryEntry(isum.addNewInstances(),TInstanceStatus.COMPLETED, proc);
- genInstanceSummaryEntry(isum.addNewInstances(),TInstanceStatus.ERROR, proc);
- genInstanceSummaryEntry(isum.addNewInstances(),TInstanceStatus.FAILED, proc);
- genInstanceSummaryEntry(isum.addNewInstances(),TInstanceStatus.SUSPENDED, proc);
- genInstanceSummaryEntry(isum.addNewInstances(),TInstanceStatus.TERMINATED, proc);
- getInstanceSummaryActivityFailure(isum, proc);
+ genInstanceSummaryEntry(isum.addNewInstances(), TInstanceStatus.ACTIVE, pconf);
+ genInstanceSummaryEntry(isum.addNewInstances(), TInstanceStatus.COMPLETED, pconf);
+ genInstanceSummaryEntry(isum.addNewInstances(), TInstanceStatus.ERROR, pconf);
+ genInstanceSummaryEntry(isum.addNewInstances(), TInstanceStatus.FAILED, pconf);
+ genInstanceSummaryEntry(isum.addNewInstances(), TInstanceStatus.SUSPENDED, pconf);
+ genInstanceSummaryEntry(isum.addNewInstances(), TInstanceStatus.TERMINATED, pconf);
+ getInstanceSummaryActivityFailure(isum, pconf);
}
TProcessInfo.Documents docinfo = info.addNewDocuments();
@@ -617,15 +654,16 @@
}
}
- OProcess oprocess = _server._engine.getOProcess(proc.getProcessId());
+ OProcess oprocess = _server._engine.getOProcess(pconf.getProcessId());
if (custom.includeEndpoints() && oprocess != null) {
TEndpointReferences eprs = info.addNewEndpoints();
for (OPartnerLink oplink : oprocess.getAllPartnerLinks()) {
if (oplink.hasPartnerRole() && oplink.initializePartnerRole) {
- EndpointReference pepr = _server._engine._activeProcesses.get(proc.getProcessId())
+ // TODO: this is very uncool.
+ EndpointReference pepr = _server._engine._activeProcesses.get(pconf.getProcessId())
.getInitialPartnerRoleEPR(oplink);
-
- if (pepr!= null) {
+
+ if (pepr != null) {
TEndpointReferences.EndpointRef epr = eprs.addNewEndpointRef();
Document eprNodeDoc = epr.getDomNode().getOwnerDocument();
epr.getDomNode().appendChild(eprNodeDoc.importNode(pepr.toXML().getDocumentElement(), true));
@@ -634,16 +672,20 @@
}
}
- //TODO: add documents to the above data structure.
+ // TODO: add documents to the above data structure.
}
/**
* Generate document information elements for a set of files.
- * @param docinfo target element
- * @param files files
- * @param recurse recurse down directories?
+ *
+ * @param docinfo
+ * target element
+ * @param files
+ * files
+ * @param recurse
+ * recurse down directories?
*/
- private void genDocumentInfo(TProcessInfo.Documents docinfo, File[] files,boolean recurse) {
+ private void genDocumentInfo(TProcessInfo.Documents docinfo, File[] files, boolean recurse) {
if (files == null)
return;
for (File f : files) {
@@ -654,7 +696,7 @@
if (recurse)
genDocumentInfo(docinfo, f.listFiles(), true);
} else if (f.isFile()) {
- genDocumentInfo(docinfo,f);
+ genDocumentInfo(docinfo, f);
}
}
}
@@ -670,42 +712,54 @@
}
}
- private void genInstanceSummaryEntry(TInstanceSummary.Instances instances, TInstanceStatus.Enum state, ProcessDAO proc) {
+ private void genInstanceSummaryEntry(TInstanceSummary.Instances instances, TInstanceStatus.Enum state,
+ ProcessConf pconf) {
instances.setState(state);
String queryStatus = InstanceFilter.StatusKeys.valueOf(state.toString()).toString().toLowerCase();
- InstanceFilter instanceFilter = new InstanceFilter("status=" + queryStatus
- + " name=" + proc.getType().getLocalPart()
- + " namespace=" + proc.getType().getNamespaceURI());
- int count = _db.getConnection().instanceQuery(instanceFilter).size();
+ final InstanceFilter instanceFilter = new InstanceFilter("status=" + queryStatus + " name="
+ + pconf.getProcessId().getLocalPart() + " namespace=" + pconf.getProcessId().getNamespaceURI());
+ int count = dbexec(new BpelDatabase.Callable<Integer>() {
+
+ public Integer run(BpelDAOConnection conn) throws Exception {
+ return conn.instanceQuery(instanceFilter).size();
+ }
+ });
instances.setCount(count);
}
- private void getInstanceSummaryActivityFailure(TInstanceSummary summary, ProcessDAO proc) {
- String queryStatus = InstanceFilter.StatusKeys.valueOf(TInstanceStatus.ACTIVE.toString()).toString().toLowerCase();
- InstanceFilter instanceFilter = new InstanceFilter("status=" + queryStatus
- + " name=" + proc.getType().getLocalPart()
- + " namespace=" + proc.getType().getNamespaceURI());
- int failureInstances = 0;
- Date lastFailureDt = null;
- for (ProcessInstanceDAO instance : _db.getConnection().instanceQuery(instanceFilter)) {
- int count = instance.getActivityFailureCount();
- if (count > 0) {
- ++failureInstances;
- Date failureDt = instance.getActivityFailureDateTime();
- if (lastFailureDt == null || lastFailureDt.before(failureDt))
- lastFailureDt = failureDt;
- }
- }
- if (failureInstances > 0) {
- TFailuresInfo failures = summary.addNewFailures();
- failures.setDtFailure(toCalendar(lastFailureDt));
- failures.setCount(failureInstances);
- }
+ private void getInstanceSummaryActivityFailure(final TInstanceSummary summary, ProcessConf pconf) {
+ String queryStatus = InstanceFilter.StatusKeys.valueOf(TInstanceStatus.ACTIVE.toString()).toString()
+ .toLowerCase();
+ final InstanceFilter instanceFilter = new InstanceFilter("status=" + queryStatus + " name="
+ + pconf.getProcessId().getLocalPart() + " namespace=" + pconf.getProcessId().getNamespaceURI());
+ dbexec(new BpelDatabase.Callable<Void>() {
+
+ public Void run(BpelDAOConnection conn) throws Exception {
+ Date lastFailureDt = null;
+ int failureInstances = 0;
+ for (ProcessInstanceDAO instance : conn.instanceQuery(instanceFilter)) {
+ int count = instance.getActivityFailureCount();
+ if (count > 0) {
+ ++failureInstances;
+ Date failureDt = instance.getActivityFailureDateTime();
+ if (lastFailureDt == null || lastFailureDt.before(failureDt))
+ lastFailureDt = failureDt;
+ }
+ }
+ if (failureInstances > 0) {
+ TFailuresInfo failures = summary.addNewFailures();
+ failures.setDtFailure(toCalendar(lastFailureDt));
+ failures.setCount(failureInstances);
+ }
+
+ return null;
+ }
+
+ });
}
private void fillInstanceInfo(TInstanceInfo info, ProcessInstanceDAO instance) {
info.setIid("" + instance.getInstanceId());
- // TODO: add process QName to instance-info schema
ProcessDAO processDAO = instance.getProcess();
info.setPid(processDAO.getProcessId().toString());
info.setProcessName(processDAO.getType());
@@ -731,7 +785,7 @@
for (CorrelationSetDAO correlationSetDAO : instance.getCorrelationSets()) {
for (Map.Entry<QName, String> property : correlationSetDAO.getProperties().entrySet()) {
TCorrelationProperty tproperty = corrProperties.addNewCorrelationProperty();
- tproperty.setCsetid(""+correlationSetDAO.getCorrelationSetId());
+ tproperty.setCsetid("" + correlationSetDAO.getCorrelationSetId());
tproperty.setPropertyName(property.getKey());
tproperty.setStringValue(property.getValue());
}
@@ -743,9 +797,9 @@
eventInfo.setCount(flc.count);
if (instance.getActivityFailureCount() > 0) {
- TFailuresInfo failures = info.addNewFailures();
- failures.setDtFailure(toCalendar(instance.getActivityFailureDateTime()));
- failures.setCount(instance.getActivityFailureCount());
+ TFailuresInfo failures = info.addNewFailures();
+ failures.setDtFailure(toCalendar(instance.getActivityFailureDateTime()));
+ failures.setCount(instance.getActivityFailureCount());
}
}
@@ -769,13 +823,12 @@
if (!scope.getCorrelationSets().isEmpty()) {
TScopeInfo.CorrelationSets correlationSets = scopeInfo.addNewCorrelationSets();
for (CorrelationSetDAO correlationSetDAO : scope.getCorrelationSets()) {
- TScopeInfo.CorrelationSets.CorrelationSet correlationSet =
- correlationSets.addNewCorrelationSet();
- correlationSet.setCsetid(""+correlationSetDAO.getCorrelationSetId());
+ TScopeInfo.CorrelationSets.CorrelationSet correlationSet = correlationSets.addNewCorrelationSet();
+ correlationSet.setCsetid("" + correlationSetDAO.getCorrelationSetId());
correlationSet.setName(correlationSetDAO.getName());
for (Map.Entry<QName, String> property : correlationSetDAO.getProperties().entrySet()) {
TCorrelationProperty tproperty = correlationSet.addNewCorrelationProperty();
- tproperty.setCsetid(""+correlationSetDAO.getCorrelationSetId());
+ tproperty.setCsetid("" + correlationSetDAO.getCorrelationSetId());
tproperty.setPropertyName(property.getKey());
tproperty.setStringValue(property.getValue());
}
@@ -784,22 +837,23 @@
}
if (includeActivityInfo) {
- Collection<ActivityRecoveryDAO> recoveries = scope.getProcessInstance().getActivityRecoveries();
+ Collection<ActivityRecoveryDAO> recoveries = scope.getProcessInstance().getActivityRecoveries();
TScopeInfo.Activities activities = scopeInfo.addNewActivities();
List<BpelEvent> events = scope.listEvents(null);
ActivityStateDocumentBuilder b = new ActivityStateDocumentBuilder();
- for (BpelEvent e : events) b.onEvent(e);
+ for (BpelEvent e : events)
+ b.onEvent(e);
for (ActivityInfoDocument ai : b.getActivities()) {
for (ActivityRecoveryDAO recovery : recoveries) {
- if (String.valueOf(recovery.getActivityId()).equals(ai.getActivityInfo().getAiid())) {
- TFailureInfo failure = ai.getActivityInfo().addNewFailure();
- failure.setReason(recovery.getReason());
- failure.setDtFailure(toCalendar(recovery.getDateTime()));
- failure.setActions(recovery.getActions());
- failure.setRetries(recovery.getRetries());
- ai.getActivityInfo().setStatus(TActivityStatus.FAILURE);
- }
+ if (String.valueOf(recovery.getActivityId()).equals(ai.getActivityInfo().getAiid())) {
+ TFailureInfo failure = ai.getActivityInfo().addNewFailure();
+ failure.setReason(recovery.getReason());
+ failure.setDtFailure(toCalendar(recovery.getDateTime()));
+ failure.setActions(recovery.getActions());
+ failure.setRetries(recovery.getRetries());
+ ai.getActivityInfo().setStatus(TActivityStatus.FAILURE);
+ }
}
activities.addNewActivityInfo().set(ai.getActivityInfo());
}
@@ -847,87 +901,89 @@
info.setLineNumber(event.getLineNo());
info.setTimestamp(toCalendar(event.getTimestamp()));
if (event instanceof ActivityEvent) {
- info.setActivityName(((ActivityEvent)event).getActivityName());
- info.setActivityId(((ActivityEvent)event).getActivityId());
- info.setActivityType(((ActivityEvent)event).getActivityType());
- info.setActivityDefinitionId(((ActivityEvent)event).getActivityDeclarationId());
+ info.setActivityName(((ActivityEvent) event).getActivityName());
+ info.setActivityId(((ActivityEvent) event).getActivityId());
+ info.setActivityType(((ActivityEvent) event).getActivityType());
+ info.setActivityDefinitionId(((ActivityEvent) event).getActivityDeclarationId());
}
if (event instanceof CorrelationEvent) {
- info.setPortType(((CorrelationEvent)event).getPortType());
- info.setOperation(((CorrelationEvent)event).getOperation());
- info.setMexId(((CorrelationEvent)event).getMessageExchangeId());
+ info.setPortType(((CorrelationEvent) event).getPortType());
+ info.setOperation(((CorrelationEvent) event).getOperation());
+ info.setMexId(((CorrelationEvent) event).getMessageExchangeId());
}
if (event instanceof CorrelationMatchEvent) {
- info.setPortType(((CorrelationMatchEvent)event).getPortType());
+ info.setPortType(((CorrelationMatchEvent) event).getPortType());
}
if (event instanceof CorrelationSetEvent) {
- info.setCorrelationSet(((CorrelationSetEvent)event).getCorrelationSetName());
+ info.setCorrelationSet(((CorrelationSetEvent) event).getCorrelationSetName());
}
if (event instanceof CorrelationSetWriteEvent) {
- info.setCorrelationKey(((CorrelationSetWriteEvent)event).getCorrelationSetName());
+ info.setCorrelationKey(((CorrelationSetWriteEvent) event).getCorrelationSetName());
}
if (event instanceof ExpressionEvaluationEvent) {
- info.setExpression(((ExpressionEvaluationEvent)event).getExpression());
+ info.setExpression(((ExpressionEvaluationEvent) event).getExpression());
}
if (event instanceof ExpressionEvaluationFailedEvent) {
- info.setFault(((ExpressionEvaluationFailedEvent)event).getFault());
+ info.setFault(((ExpressionEvaluationFailedEvent) event).getFault());
}
if (event instanceof NewProcessInstanceEvent) {
- if ((((NewProcessInstanceEvent)event).getRootScopeId()) != null)
- info.setRootScopeId(((NewProcessInstanceEvent)event).getRootScopeId());
- info.setScopeDefinitionId(((NewProcessInstanceEvent)event).getScopeDeclarationId());
+ if ((((NewProcessInstanceEvent) event).getRootScopeId()) != null)
+ info.setRootScopeId(((NewProcessInstanceEvent) event).getRootScopeId());
+ info.setScopeDefinitionId(((NewProcessInstanceEvent) event).getScopeDeclarationId());
}
if (event instanceof PartnerLinkEvent) {
- info.setPartnerLinkName(((PartnerLinkEvent)event).getpLinkName());
+ info.setPartnerLinkName(((PartnerLinkEvent) event).getpLinkName());
}
if (event instanceof ProcessCompletionEvent) {
- info.setFault(((ProcessCompletionEvent)event).getFault());
+ info.setFault(((ProcessCompletionEvent) event).getFault());
}
if (event instanceof ProcessEvent) {
- info.setProcessId(((ProcessEvent)event).getProcessId());
- info.setProcessType(((ProcessEvent)event).getProcessName());
+ info.setProcessId(((ProcessEvent) event).getProcessId());
+ info.setProcessType(((ProcessEvent) event).getProcessName());
}
if (event instanceof ProcessInstanceEvent) {
- info.setInstanceId(((ProcessInstanceEvent)event).getProcessInstanceId());
+ info.setInstanceId(((ProcessInstanceEvent) event).getProcessInstanceId());
}
if (event instanceof ProcessInstanceStartedEvent) {
- info.setRootScopeId(((ProcessInstanceStartedEvent)event).getRootScopeId());
- info.setRootScopeDeclarationId(((ProcessInstanceStartedEvent)event).getScopeDeclarationId());
+ info.setRootScopeId(((ProcessInstanceStartedEvent) event).getRootScopeId());
+ info.setRootScopeDeclarationId(((ProcessInstanceStartedEvent) event).getScopeDeclarationId());
}
if (event instanceof ProcessInstanceStateChangeEvent) {
- info.setOldState(((ProcessInstanceStateChangeEvent)event).getOldState());
- info.setNewState(((ProcessInstanceStateChangeEvent)event).getNewState());
+ info.setOldState(((ProcessInstanceStateChangeEvent) event).getOldState());
+ info.setNewState(((ProcessInstanceStateChangeEvent) event).getNewState());
}
if (event instanceof ProcessMessageExchangeEvent) {
- info.setPortType(((ProcessMessageExchangeEvent)event).getPortType());
- info.setOperation(((ProcessMessageExchangeEvent)event).getOperation());
- info.setMexId(((ProcessMessageExchangeEvent)event).getMessageExchangeId());
+ info.setPortType(((ProcessMessageExchangeEvent) event).getPortType());
+ info.setOperation(((ProcessMessageExchangeEvent) event).getOperation());
+ info.setMexId(((ProcessMessageExchangeEvent) event).getMessageExchangeId());
}
if (event instanceof ScopeCompletionEvent) {
- info.setSuccess(((ScopeCompletionEvent)event).isSuccess());
- info.setFault(((ScopeCompletionEvent)event).getFault());
+ info.setSuccess(((ScopeCompletionEvent) event).isSuccess());
+ info.setFault(((ScopeCompletionEvent) event).getFault());
}
if (event instanceof ScopeEvent) {
- info.setScopeId(((ScopeEvent)event).getScopeId());
- if (((ScopeEvent)event).getParentScopeId() != null)
- info.setParentScopeId(((ScopeEvent)event).getParentScopeId());
- if (((ScopeEvent)event).getScopeName() != null)
- info.setScopeName(((ScopeEvent)event).getScopeName());
- info.setScopeDefinitionId(((ScopeEvent)event).getScopeDeclarationId());
+ info.setScopeId(((ScopeEvent) event).getScopeId());
+ if (((ScopeEvent) event).getParentScopeId() != null)
+ info.setParentScopeId(((ScopeEvent) event).getParentScopeId());
+ if (((ScopeEvent) event).getScopeName() != null)
+ info.setScopeName(((ScopeEvent) event).getScopeName());
+ info.setScopeDefinitionId(((ScopeEvent) event).getScopeDeclarationId());
}
if (event instanceof ScopeFaultEvent) {
- info.setFault(((ScopeFaultEvent)event).getFaultType());
- info.setFaultLineNumber(((ScopeFaultEvent)event).getFaultLineNo());
- info.setExplanation(((ScopeFaultEvent)event).getExplanation());
+ info.setFault(((ScopeFaultEvent) event).getFaultType());
+ info.setFaultLineNumber(((ScopeFaultEvent) event).getFaultLineNo());
+ info.setExplanation(((ScopeFaultEvent) event).getExplanation());
}
if (event instanceof VariableEvent) {
- info.setVariableName(((VariableEvent)event).getVarName());
+ info.setVariableName(((VariableEvent) event).getVarName());
}
}
/**
* Convert a {@link Date} to a {@link Calendar}.
- * @param dtime a {@link Date}
+ *
+ * @param dtime
+ * a {@link Date}
* @return a {@link Calendar}
*/
private Calendar toCalendar(Date dtime) {
@@ -939,7 +995,6 @@
return c;
}
-
/**
* @see org.apache.ode.bpel.pmapi.InstanceManagement#queryInstances(java.lang.String)
*/
@@ -958,10 +1013,155 @@
}
});
} catch (Exception e) {
- throw new ProcessingException("Exception while querying instances",e);
+ throw new ProcessingException("Exception while querying instances", e);
}
return ret;
}
+ /**
+ * Query processes based on a {@link ProcessFilter} criteria. This is
+ * implemented in memory rather than via database calls since the processes
+ * are managed by the {@link ProcessStore} object and we don't want to make
+ * this needlessly complicated.
+ *
+ * @param filter
+ * @return
+ */
+ @SuppressWarnings("unchecked")
+ Collection<ProcessConf> processQuery(ProcessFilter filter) {
+
+ List<QName> pids = _store.getProcesses();
+
+ // Name filter can be implemented using only the PIDs.
+ if (filter != null && filter.getNameFilter() != null) {
+ final Pattern pattern = Pattern.compile(filter.getNameFilter());
+ CollectionsX.remove_if(pids, new MemberOfFunction<QName>() {
+ @Override
+ public boolean isMember(QName o) {
+ return !pattern.matcher(o.getLocalPart()).matches();
+ }
+ });
+ }
+
+ if (filter != null && filter.getNamespaceFilter() != null) {
+ final Pattern pattern = Pattern.compile(filter.getNameFilter());
+ CollectionsX.remove_if(pids, new MemberOfFunction<QName>() {
+ @Override
+ public boolean isMember(QName o) {
+ String ns = o.getNamespaceURI() == null ? "" : o.getNamespaceURI();
+ return !pattern.matcher(ns).matches();
+ }
+
+ });
+ }
+
+ // Now we need the process conf objects, we need to be
+ // careful since someone could have deleted them by now
+ List<ProcessConf> confs = new LinkedList<ProcessConf>();
+ for (QName pid : pids) {
+ ProcessConf pconf = _store.getProcessConfiguration(pid);
+ confs.add(pconf);
+ }
+
+ if (filter != null) {
+ // TODO Implement process status filtering when status will exist
+ // Specific filter for deployment date.
+ if (filter.getDeployedDateFilter() != null) {
+ for (final String ddf : filter.getDeployedDateFilter()) {
+ final Date dd;
+ try {
+ dd = ISO8601DateParser.parse(Filter.getDateWithoutOp(ddf));
+ } catch (ParseException e) {
+ // Should never happen.
+ throw new RuntimeException(e);
+ }
+
+ CollectionsX.remove_if(confs, new MemberOfFunction<ProcessConf>() {
+ @Override
+ public boolean isMember(ProcessConf o) {
+
+ if (ddf.startsWith("="))
+ return !o.getDeployDate().equals(dd);
+
+ if (ddf.startsWith("<="))
+ return o.getDeployDate().getTime() > dd.getTime();
+
+ if (ddf.startsWith(">="))
+ return o.getDeployDate().getTime() < dd.getTime();
+
+ if (ddf.startsWith("<"))
+ return o.getDeployDate().getTime() >= dd.getTime();
+
+ if (ddf.startsWith(">"))
+ return o.getDeployDate().getTime() <= dd.getTime();
+
+ return false;
+ }
+
+ });
+
+ }
+ }
+
+ // Ordering
+ if (filter.getOrders() != null) {
+ ComparatorChain cchain = new ComparatorChain();
+ for (String key : filter.getOrders()) {
+ boolean ascending = true;
+ String orderKey = key;
+ if (key.startsWith("+") || key.startsWith("-")) {
+ orderKey = key.substring(1, key.length());
+ if (key.startsWith("-"))
+ ascending = false;
+ }
+
+ Comparator c;
+ if ("name".equals(orderKey))
+ c = new Comparator<ProcessConf>() {
+ public int compare(ProcessConf o1, ProcessConf o2) {
+ return o1.getProcessId().getLocalPart().compareTo(o2.getProcessId().getLocalPart());
+ }
+ };
+ else if ("namespace".equals(orderKey))
+ c = new Comparator<ProcessConf>() {
+ public int compare(ProcessConf o1, ProcessConf o2) {
+ String ns1 = o1.getProcessId().getNamespaceURI() == null ? "" : o1.getProcessId()
+ .getNamespaceURI();
+ String ns2 = o2.getProcessId().getNamespaceURI() == null ? "" : o2.getProcessId()
+ .getNamespaceURI();
+ return ns1.compareTo(ns2);
+ }
+ };
+ else if ("version".equals(orderKey))
+ c = new Comparator<ProcessConf>() {
+ public int compare(ProcessConf o1, ProcessConf o2) {
+ // TODO: implement version comparisons.
+ return 0;
+ }
+ };
+ else if ("deployed".equals(orderKey))
+ c = new Comparator<ProcessConf>() {
+ public int compare(ProcessConf o1, ProcessConf o2) {
+ return o1.getDeployDate().compareTo(o2.getDeployDate());
+ }
+
+ };
+
+ else {
+ // unrecognized
+ __log.debug("unrecognized order key" + orderKey);
+ continue;
+ }
+
+ cchain.addComparator(c, !ascending);
+ }
+
+ Collections.sort(confs, cchain);
+ }
+
+ }
+
+ return confs;
+ }
}