You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2006/11/29 03:37:26 UTC
svn commit: r480345 - in /incubator/ode/trunk/bpel-runtime: ./
src/main/java/org/apache/ode/bpel/engine/
src/main/java/org/apache/ode/bpel/intercept/
src/main/java/org/apache/ode/bpel/memdao/
src/test/java/org/apache/ode/bpel/runtime/
Author: mszefler
Date: Tue Nov 28 18:37:25 2006
New Revision: 480345
URL: http://svn.apache.org/viewvc?view=rev&rev=480345
Log:
* Simplification of process-store DAO mechanism
* Remove in-memory store, replace with in Hibernate for testing.
* Event scheme to support distributed deployment.
* Re-factor of ProcesStore / BpelServer relationship.
* Thread safety.
* Some tests for processStore
* Consolidate all schema generation in bpel-schemas
* Maven fixes
Modified:
incubator/ode/trunk/bpel-runtime/pom.xml
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelManagementFacadeImpl.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DocumentInfoGenerator.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InterceptorContextImpl.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Messages.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessAndInstanceManagementImpl.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/MessageExchangeInterceptor.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/ThrottlingInterceptor.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/ActivityRecoveryTest.java
incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
Modified: incubator/ode/trunk/bpel-runtime/pom.xml
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/pom.xml?view=diff&rev=480345&r1=480344&r2=480345
==============================================================================
--- incubator/ode/trunk/bpel-runtime/pom.xml (original)
+++ incubator/ode/trunk/bpel-runtime/pom.xml Tue Nov 28 18:37:25 2006
@@ -37,6 +37,10 @@
</dependency>
<dependency>
<groupId>org.apache.ode</groupId>
+ <artifactId>ode-bpel-schemas</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ode</groupId>
<artifactId>ode-bpel-dao</artifactId>
</dependency>
<dependency>
@@ -129,11 +133,6 @@
<dependency>
<groupId>org.apache.ode</groupId>
<artifactId>ode-naming</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.ode</groupId>
- <artifactId>ode-bpel-schemas</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelManagementFacadeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelManagementFacadeImpl.java?view=diff&rev=480345&r1=480344&r2=480345
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelManagementFacadeImpl.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelManagementFacadeImpl.java Tue Nov 28 18:37:25 2006
@@ -23,6 +23,7 @@
import org.apache.ode.bpel.bdi.breaks.VariableModificationBreakpoint;
import org.apache.ode.bpel.common.CorrelationKey;
import org.apache.ode.bpel.dao.*;
+import org.apache.ode.bpel.iapi.BpelServer;
import org.apache.ode.bpel.iapi.ProcessStore;
import org.apache.ode.bpel.o.OProcess;
import org.apache.ode.bpel.pmapi.*;
@@ -40,12 +41,12 @@
* the methods necessary to support process debugging. It also implements all the methods in the
* newer Process/Instance Management interface (pmapi).
*/
-class BpelManagementFacadeImpl extends ProcessAndInstanceManagementImpl
+public class BpelManagementFacadeImpl extends ProcessAndInstanceManagementImpl
implements BpelManagementFacade {
private static UUIDGen _uuidGen = new UUIDGen();
- BpelManagementFacadeImpl(BpelDatabase db, BpelEngineImpl engine, BpelServerImpl server, ProcessStore store) {
- super(db, engine, server, store);
+ public BpelManagementFacadeImpl(BpelServer server, ProcessStore store) {
+ super(server, store);
}
public short getState(final Long iid) throws ManagementException {
@@ -182,10 +183,10 @@
* @param procid
*/
public OProcess getProcessDef(String procid) throws ManagementException {
- if (_engine == null)
+ if (_server._engine == null)
throw new ProcessingException("ServiceProvider required for debugger operation.");
- BpelProcess process = _engine._activeProcesses.get(QName.valueOf(procid));
+ BpelProcess process = _server._engine._activeProcesses.get(QName.valueOf(procid));
if (process == null)
throw new InvalidRequestException("The process \"" + procid + "\" is not available. Please make sure it is deployed and encompassing System is activated." );
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=480345&r1=480344&r2=480345
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Tue Nov 28 18:37:25 2006
@@ -18,6 +18,22 @@
*/
package org.apache.ode.bpel.engine;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.wsdl.Message;
+import javax.wsdl.Operation;
+import javax.xml.namespace.QName;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.common.CorrelationKey;
@@ -29,14 +45,25 @@
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
import org.apache.ode.bpel.evt.*;
import org.apache.ode.bpel.explang.EvaluationException;
-import org.apache.ode.bpel.iapi.*;
+import org.apache.ode.bpel.iapi.BpelEngineException;
+import org.apache.ode.bpel.iapi.Endpoint;
+import org.apache.ode.bpel.iapi.EndpointReference;
+import org.apache.ode.bpel.iapi.MessageExchange;
+import org.apache.ode.bpel.iapi.PartnerRoleChannel;
+import org.apache.ode.bpel.iapi.ProcessConf;
+import org.apache.ode.bpel.iapi.ProcessState;
import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
import org.apache.ode.bpel.iapi.MessageExchange.Status;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
import org.apache.ode.bpel.intercept.InterceptorInvoker;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
-import org.apache.ode.bpel.o.*;
+import org.apache.ode.bpel.o.OBase;
+import org.apache.ode.bpel.o.OElementVarType;
+import org.apache.ode.bpel.o.OMessageVarType;
+import org.apache.ode.bpel.o.OPartnerLink;
+import org.apache.ode.bpel.o.OProcess;
+import org.apache.ode.bpel.o.OScope;
import org.apache.ode.bpel.runtime.ExpressionLanguageRuntimeRegistry;
import org.apache.ode.bpel.runtime.InvalidProcessException;
import org.apache.ode.bpel.runtime.PROCESS;
@@ -51,15 +78,6 @@
import org.w3c.dom.NodeList;
import org.w3c.dom.Text;
-import javax.wsdl.Message;
-import javax.wsdl.Operation;
-import javax.xml.namespace.QName;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.*;
-
/**
* Entry point into the runtime of a BPEL process.
*
@@ -93,24 +111,48 @@
/** {@link MessageExchangeInterceptor}s registered for this process. */
private final List<MessageExchangeInterceptor> _mexInterceptors = new ArrayList<MessageExchangeInterceptor>();
- private ProcessStore _store;
- private boolean _inMemory;
+ private final ProcessConf _pconf;
+
+ public BpelProcess(ProcessConf conf, OProcess oprocess, BpelEventListener debugger, ExpressionLanguageRuntimeRegistry expLangRuntimeRegistry) {
+ _pid = conf.getProcessId();
+ _pconf = conf;
- public BpelProcess(QName pid, OProcess oprocess, Map<OPartnerLink, Endpoint> myRoleEndpointNames,
- Map<OPartnerLink, Endpoint> initialPartners, BpelEventListener debugger,
- ExpressionLanguageRuntimeRegistry expLangRuntimeRegistry,
- List<MessageExchangeInterceptor> localMexInterceptors, ProcessStore store) {
- _pid = pid;
_replacementMap = new ReplacementMapImpl(oprocess);
_oprocess = oprocess;
_expLangRuntimeRegistry = expLangRuntimeRegistry;
- _mexInterceptors.addAll(localMexInterceptors);
- _store = store;
- _inMemory = store.getProcessConfiguration(_pid).isInMemory();
+
+ // Create myRole endpoint name mapping (from deployment descriptor)
+ HashMap<OPartnerLink, Endpoint> myRoleEndpoints = new HashMap<OPartnerLink, Endpoint>();
+ for (Map.Entry<String, Endpoint> provide : conf.getProvideEndpoints().entrySet()) {
+ OPartnerLink plink = oprocess.getPartnerLink(provide.getKey());
+ if (plink == null) {
+ String errmsg = "Error in deployment descriptor for process " + _pid + "; reference to unknown partner link "
+ + provide.getKey();
+ __log.error(errmsg);
+ throw new BpelEngineException(errmsg);
+ }
+ myRoleEndpoints.put(plink, provide.getValue());
+ }
+
+ // Create partnerRole initial value mapping
+ HashMap<OPartnerLink, Endpoint> partnerRoleIntialValues = new HashMap<OPartnerLink, Endpoint>();
+ for (Map.Entry<String, Endpoint> invoke : conf.getInvokeEndpoints().entrySet()) {
+ OPartnerLink plink = oprocess.getPartnerLink(invoke.getKey());
+ if (plink == null) {
+ String errmsg = "Error in deployment descriptor for process " + _pid + "; reference to unknown partner link "
+ + invoke.getKey();
+ __log.error(errmsg);
+ throw new BpelEngineException(errmsg);
+ }
+ __log.debug("Processing <invoke> element for process " + _pid + ": partnerlink " + invoke.getKey() + " --> "
+ + invoke.getValue());
+
+ partnerRoleIntialValues.put(plink, invoke.getValue());
+ }
for (OPartnerLink pl : _oprocess.getAllPartnerLinks()) {
if (pl.hasMyRole()) {
- Endpoint endpoint = myRoleEndpointNames.get(pl);
+ Endpoint endpoint = myRoleEndpoints.get(pl);
if (endpoint == null)
throw new IllegalArgumentException("No service name for myRole plink " + pl.getName());
PartnerLinkMyRoleImpl myRole = new PartnerLinkMyRoleImpl(pl, endpoint);
@@ -119,7 +161,7 @@
}
if (pl.hasPartnerRole()) {
- PartnerLinkPartnerRoleImpl partnerRole = new PartnerLinkPartnerRoleImpl(pl, initialPartners.get(pl));
+ PartnerLinkPartnerRoleImpl partnerRole = new PartnerLinkPartnerRoleImpl(pl, conf.getInvokeEndpoints().get(pl));
_partnerRoles.put(pl, partnerRole);
}
}
@@ -131,7 +173,7 @@
public void recoverActivity(ProcessInstanceDAO instanceDAO, String channel, long activityId, String action, FaultData fault) {
if (__log.isDebugEnabled())
- __log.debug("Recovering activity in process " + instanceDAO.getInstanceId() + " with action " + action );
+ __log.debug("Recovering activity in process " + instanceDAO.getInstanceId() + " with action " + action);
BpelRuntimeContextImpl processInstance = createRuntimeContext(instanceDAO, null, null);
processInstance.recoverActivity(channel, activityId, action, fault);
@@ -187,8 +229,7 @@
if (target != null) {
mex.setPortOp(target._plinkDef.myRolePortType, target._plinkDef.getMyRoleOperation(mex.getOperationName()));
} else {
- __log.warn("Couldn't find endpoint from service " + mex.getServiceName()
- + " when initializing a myRole mex.");
+ __log.warn("Couldn't find endpoint from service " + mex.getServiceName() + " when initializing a myRole mex.");
}
}
@@ -243,18 +284,15 @@
}
/**
- * Get the element name for a given WSDL part. If the part is an
- * <em>element</em> part, the name of that element is returned. If the
- * part is an XML schema typed part, then the name of the part is returned
- * in the null namespace.
+ * Get the element name for a given WSDL part. If the part is an <em>element</em> part, the name of that element is returned.
+ * If the part is an XML schema typed part, then the name of the part is returned in the null namespace.
*
* @param part
* WSDL {@link javax.wsdl.Part}
* @return name of element containing said part
*/
static QName getElementNameForPart(OMessageVarType.Part part) {
- return (part.type instanceof OElementVarType) ? ((OElementVarType) part.type).elementType : new QName(null,
- part.name);
+ return (part.type instanceof OElementVarType) ? ((OElementVarType) part.type).elementType : new QName(null, part.name);
}
/** Create a version-appropriate runtime context. */
@@ -268,12 +306,10 @@
*
* @param mex
* message exchange
- * @return <code>true</code> if execution should continue,
- * <code>false</code> otherwise
+ * @return <code>true</code> if execution should continue, <code>false</code> otherwise
*/
private boolean processInterceptors(MyRoleMessageExchangeImpl mex, InterceptorInvoker invoker) {
- InterceptorContextImpl ictx = new InterceptorContextImpl(_engine._contexts.dao.getConnection(),
- getProcessDAO(), _engine._contexts.store);
+ InterceptorContextImpl ictx = new InterceptorContextImpl(_engine._contexts.dao.getConnection(), getProcessDAO(),_pconf);
for (MessageExchangeInterceptor i : _mexInterceptors)
if (!mex.processInterceptor(i, mex, ictx, invoker))
@@ -287,8 +323,7 @@
}
/**
- * Replacement object for serializtation of the {@link OBase} (compiled
- * BPEL) objects in the JACOB VPU.
+ * Replacement object for serializtation of the {@link OBase} (compiled BPEL) objects in the JACOB VPU.
*/
public static final class OBaseReplacementImpl implements Externalizable {
private static final long serialVersionUID = 1L;
@@ -326,8 +361,9 @@
}
/**
- * Get the initial value of this role's EPR. This value is obtained from
- * the integration layer when the process is enabled on the server.
+ * Get the initial value of this role's EPR. This value is obtained from the integration layer when the process is enabled
+ * on the server.
+ *
* @return initial epr
*/
EndpointReference getInitialEPR() {
@@ -366,8 +402,7 @@
*/
public void invokeMyRole(MyRoleMessageExchangeImpl mex) {
if (__log.isTraceEnabled()) {
- __log.trace(ObjectPrinter.stringifyMethodEnter(this + ":inputMsgRcvd", new Object[] {
- "messageExchange", mex }));
+ __log.trace(ObjectPrinter.stringifyMethodEnter(this + ":inputMsgRcvd", new Object[] { "messageExchange", mex }));
}
Operation operation = getMyRoleOperation(mex.getOperationName());
@@ -411,9 +446,8 @@
String mySessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID);
String partnerSessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID);
if (__log.isDebugEnabled()) {
- __log.debug("INPUTMSG: " + correlatorId + ": MSG RCVD keys="
- + ArrayUtils.makeCollection(HashSet.class, keys) + " mySessionId=" + mySessionId
- + " partnerSessionId=" + partnerSessionId);
+ __log.debug("INPUTMSG: " + correlatorId + ": MSG RCVD keys=" + ArrayUtils.makeCollection(HashSet.class, keys)
+ + " mySessionId=" + mySessionId + " partnerSessionId=" + partnerSessionId);
}
CorrelationKey matchedKey = null;
@@ -440,7 +474,8 @@
__log.debug("INPUTMSG: " + correlatorId + ": routing failed, CREATING NEW INSTANCE");
}
ProcessDAO processDAO = getProcessDAO();
- if (!_store.getProcessConfiguration(_pid).isActive()) {
+
+ if (_pconf.getState() == ProcessState.RETIRED) {
throw new InvalidProcessException("Process is retired.", InvalidProcessException.RETIRED_CAUSE_CODE);
}
@@ -450,12 +485,13 @@
}
ProcessInstanceDAO newInstance = processDAO.createInstance(correlator);
-
+
BpelRuntimeContextImpl instance = createRuntimeContext(newInstance, new PROCESS(_oprocess), mex);
// send process instance event
- NewProcessInstanceEvent evt = new NewProcessInstanceEvent(new QName(_oprocess.targetNamespace,
- _oprocess.getName()), getProcessDAO().getProcessId(), newInstance.getInstanceId());
+ NewProcessInstanceEvent evt = new NewProcessInstanceEvent(
+ new QName(_oprocess.targetNamespace, _oprocess.getName()), getProcessDAO().getProcessId(), newInstance
+ .getInstanceId());
evt.setPortType(mex.getPortType().getQName());
evt.setOperation(operation.getName());
evt.setMexId(mex.getMessageExchangeId());
@@ -471,7 +507,7 @@
+ messageRoute.getTargetInstance().getInstanceId());
}
- // Attempt to acquire an instance-level lock.
+ // Attempt to acquire an instance-level lock.
// _lockManager.lock(messageRoute.getTargetInstance().getInstanceId(),
// 60, TimeUnit.SECONDS);
@@ -488,8 +524,8 @@
correlator.removeRoutes(messageRoute.getGroupId(), instanceDao);
// send process instance event
- CorrelationMatchEvent evt = new CorrelationMatchEvent(new QName(_oprocess.targetNamespace, _oprocess
- .getName()), getProcessDAO().getProcessId(), instanceDao.getInstanceId(), matchedKey);
+ CorrelationMatchEvent evt = new CorrelationMatchEvent(new QName(_oprocess.targetNamespace, _oprocess.getName()),
+ getProcessDAO().getProcessId(), instanceDao.getInstanceId(), matchedKey);
evt.setPortType(mex.getPortType().getQName());
evt.setOperation(operation.getName());
evt.setMexId(mex.getMessageExchangeId());
@@ -497,9 +533,9 @@
_debugger.onEvent(evt);
// store event
saveEvent(evt, instanceDao);
-
+
// EXPERIMENTAL -- LOCK
- //instanceDao.lock();
+ // instanceDao.lock();
mex.setCorrelationStatus(CorrelationStatus.MATCHED);
mex.getDAO().setInstance(messageRoute.getTargetInstance());
@@ -514,8 +550,8 @@
} else {
// send event
- CorrelationNoMatchEvent evt = new CorrelationNoMatchEvent(mex.getPortType().getQName(), mex
- .getOperation().getName(), mex.getMessageExchangeId(), keys);
+ CorrelationNoMatchEvent evt = new CorrelationNoMatchEvent(mex.getPortType().getQName(), mex.getOperation()
+ .getName(), mex.getMessageExchangeId(), keys);
evt.setProcessId(getProcessDAO().getProcessId());
evt.setProcessName(new QName(_oprocess.targetNamespace, _oprocess.getName()));
@@ -529,8 +565,8 @@
}
}
- // Now we have to update our message exchange status. If the <reply> was not hit during the
- // invocation, then we will be in the "REQUEST" phase which means that either this was a one-way
+ // Now we have to update our message exchange status. If the <reply> was not hit during the
+ // invocation, then we will be in the "REQUEST" phase which means that either this was a one-way
// or a two-way that needs to delivery the reply asynchronously.
if (mex.getStatus() == Status.REQUEST) {
mex.setStatus(Status.ASYNC);
@@ -553,8 +589,7 @@
Set<OScope.CorrelationSet> csets = _plinkDef.getCorrelationSetsForOperation(operation);
for (OScope.CorrelationSet cset : csets) {
- CorrelationKey key = computeCorrelationKey(cset, _oprocess.messageTypes.get(msgDescription.getQName()),
- msg);
+ CorrelationKey key = computeCorrelationKey(cset, _oprocess.messageTypes.get(msgDescription.getQName()), msg);
keys.add(key);
}
@@ -566,8 +601,7 @@
return keys.toArray(new CorrelationKey[keys.size()]);
}
- private CorrelationKey computeCorrelationKey(OScope.CorrelationSet cset, OMessageVarType messagetype,
- Element msg) {
+ private CorrelationKey computeCorrelationKey(OScope.CorrelationSet cset, OMessageVarType messagetype, Element msg) {
String[] values = new String[cset.properties.size()];
int jIdx = 0;
@@ -578,8 +612,8 @@
if (alias == null) {
// TODO: Throw a real exception! And catch this at compile
// time.
- throw new IllegalArgumentException("No alias matching property '" + property.name
- + "' with message type '" + messagetype + "'");
+ throw new IllegalArgumentException("No alias matching property '" + property.name + "' with message type '"
+ + messagetype + "'");
}
String value;
@@ -614,8 +648,7 @@
__log.debug("Processing partner's response for partnerLink: " + messageExchange);
}
- BpelRuntimeContextImpl processInstance = createRuntimeContext(messageExchange.getDAO().getInstance(), null,
- null);
+ BpelRuntimeContextImpl processInstance = createRuntimeContext(messageExchange.getDAO().getInstance(), null, null);
processInstance.invocationResponse(messageExchange);
processInstance.execute();
}
@@ -676,8 +709,8 @@
}
ProcessDAO getProcessDAO() {
- if (_inMemory) return _engine._contexts.inMemDao.getConnection().getProcess(_pid);
- else return _engine._contexts.dao.getConnection().getProcess(_pid);
+ return _pconf.isTransient() ? _engine._contexts.inMemDao.getConnection().getProcess(_pid) : _engine._contexts.dao
+ .getConnection().getProcess(_pid);
}
static String genCorrelatorId(OPartnerLink plink, String opName) {
@@ -703,8 +736,7 @@
myrole._initialEPR = _engine._contexts.bindingContext.activateMyRoleEndpoint(_pid, myrole._endpoint,
myrole._plinkDef.myRolePortType);
- __log.debug("Activated " + _pid + " myrole " + myrole.getPartnerLinkName() + ": EPR is "
- + myrole._initialEPR);
+ __log.debug("Activated " + _pid + " myrole " + myrole.getPartnerLinkName() + ": EPR is " + myrole._initialEPR);
}
for (PartnerLinkPartnerRoleImpl prole : _partnerRoles.values()) {
@@ -716,8 +748,7 @@
prole._initialEPR = epr;
}
- __log.debug("Activated " + _pid + " partnerrole " + prole.getPartnerLinkName() + ": EPR is "
- + prole._initialEPR);
+ __log.debug("Activated " + _pid + " partnerrole " + prole.getPartnerLinkName() + ": EPR is " + prole._initialEPR);
}
@@ -762,17 +793,10 @@
boolean enabled = false;
List<String> scopeNames = null;
if (event instanceof ScopeEvent) {
- scopeNames = ((ScopeEvent)event).getParentScopesNames();
- }
- List<String> eventTypes = _store.getEventsSettings(_pid, scopeNames);
- if (eventTypes.size() == 1 && eventTypes.get(0).equals("all"))
- enabled = true;
- else {
- for (String eventType : eventTypes) {
- if (eventType.equals(event.getType().toString()))
- enabled = true;
- }
+ scopeNames = ((ScopeEvent) event).getParentScopesNames();
}
+
+ enabled = _pconf.isEventEnabled(scopeNames, event.getType());
if (enabled) {
ProcessInstanceDAO instanceDao = getProcessDAO().getInstance(event.getProcessInstanceId());
saveEvent(event, instanceDao);
@@ -784,6 +808,6 @@
}
public boolean isInMemory() {
- return _inMemory;
+ return _pconf.isTransient();
}
}
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?view=diff&rev=480345&r1=480344&r2=480345
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Tue Nov 28 18:37:25 2006
@@ -18,6 +18,12 @@
*/
package org.apache.ode.bpel.engine;
+import java.io.InputStream;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.xml.namespace.QName;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.dao.BpelDAOConnection;
@@ -25,39 +31,42 @@
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.evt.BpelEvent;
import org.apache.ode.bpel.explang.ConfigurationException;
-import org.apache.ode.bpel.iapi.*;
+import org.apache.ode.bpel.iapi.BindingContext;
+import org.apache.ode.bpel.iapi.BpelEngine;
+import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.BpelEventListener;
+import org.apache.ode.bpel.iapi.BpelServer;
+import org.apache.ode.bpel.iapi.EndpointReferenceContext;
+import org.apache.ode.bpel.iapi.MessageExchangeContext;
+import org.apache.ode.bpel.iapi.ProcessConf;
+import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
import org.apache.ode.bpel.o.OExpressionLanguage;
-import org.apache.ode.bpel.o.OPartnerLink;
import org.apache.ode.bpel.o.OProcess;
import org.apache.ode.bpel.o.Serializer;
-import org.apache.ode.bpel.pmapi.BpelManagementFacade;
import org.apache.ode.bpel.runtime.ExpressionLanguageRuntimeRegistry;
import org.apache.ode.utils.msg.MessageBundle;
-import javax.xml.namespace.QName;
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
/**
- * The BPEL server implementation. This implementation is intended to be thread
+ * <p>
+ * The BPEL server implementation.
+ * </p>
+ *
+ * <p>This implementation is intended to be thread
* safe. The key concurrency mechanism is a "management" read/write lock that
* synchronizes all management operations (they require "write" access) and
* prevents concurrent management operations and processing (processing requires
* "read" access). Write access to the lock is scoped to the method, while read
* access is scoped to a transaction.
+ * </p>
+ *
+ * @author Maciej Szefler <mszefler at gmail dot com>
* @author mriou <mriou at apache dot org>
*/
public class BpelServerImpl implements BpelServer {
private static final Log __log = LogFactory.getLog(BpelServer.class);
+
private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
/**
@@ -66,43 +75,33 @@
* in progress.
*/
private ReadWriteLock _mngmtLock = new ReentrantReadWriteLock();
+
+ private enum State {
+ SHUTDOWN,
+ INIT,
+ RUNNING
+ }
+
+ private State _state = State.SHUTDOWN;
+
private Contexts _contexts = new Contexts();
+
BpelEngineImpl _engine;
- private boolean _started;
- private boolean _initialized;
- private BpelDatabase _db;
- public BpelServerImpl() {
- }
+ BpelDatabase _db;
public void start() {
_mngmtLock.writeLock().lock();
try {
- if (!_initialized) {
- String err = "start() called before init()!";
- __log.fatal(err);
- throw new IllegalStateException(err);
- }
-
- if (_started) {
- if (__log.isDebugEnabled()) __log.debug("start() ignored -- already started");
+ if (!checkState(State.INIT, State.RUNNING)) {
+ __log.debug("start() ignored -- already started");
return;
}
- if (__log.isDebugEnabled()) __log.debug("BPEL SERVER starting.");
+ __log.debug("BPEL SERVER starting.");
- _engine = new BpelEngineImpl(_contexts);
- Map<QName, byte[]> pids = _contexts.store.getActiveProcesses();
- for (Map.Entry<QName, byte[]> pid : pids.entrySet())
- try {
- doActivateProcess(pid.getKey(), pid.getValue());
- } catch (Exception ex) {
- String msg = __msgs.msgProcessActivationError(pid.getKey());
- __log.error(msg, ex);
- }
- // readState();
_contexts.scheduler.start();
- _started = true;
+ _state = State.RUNNING;
__log.info(__msgs.msgServerStarted());
} finally {
_mngmtLock.writeLock().unlock();
@@ -134,55 +133,22 @@
public void stop() {
_mngmtLock.writeLock().lock();
try {
- if (!_started) {
- if (__log.isDebugEnabled())
- __log.debug("stop() ignored -- already stopped");
+ if (!checkState(State.RUNNING, State.INIT)) {
+ __log.debug("stop() ignored -- already stopped");
return;
}
- if (__log.isDebugEnabled()) {
- __log.debug("BPEL SERVER STOPPING");
- }
+
+ __log.debug("BPEL SERVER STOPPING");
+
_contexts.scheduler.stop();
_engine = null;
- _started = false;
-
+ _state = State.INIT;
__log.info(__msgs.msgServerStopped());
} finally {
_mngmtLock.writeLock().unlock();
}
}
- /**
- * Load the parsed and compiled BPEL process definition from the database.
- *
- * @param processId
- * process identifier
- * @return process information from configuration database
- */
- private OProcess loadProcess(QName processId, byte[] serProc) {
- if (__log.isTraceEnabled()) {
- __log.trace("loadProcess: " + processId);
- }
- assert _initialized : "loadProcess() called before init()!";
-
- InputStream is = new ByteArrayInputStream(serProc);
- OProcess compiledProcess;
- try {
- Serializer ofh = new Serializer(is);
- compiledProcess = ofh.readOProcess();
- } catch (Exception e) {
- String errmsg = __msgs.msgProcessLoadError(processId);
- __log.error(errmsg, e);
- throw new BpelEngineException(errmsg, e);
- }
-
- return compiledProcess;
- }
-
- public BpelManagementFacade getBpelManagementFacade() {
- return new BpelManagementFacadeImpl(_db, _engine, this, _contexts.store);
- }
-
public void setMessageExchangeContext(MessageExchangeContext mexContext) throws BpelEngineException {
_contexts.mexContext = mexContext;
}
@@ -214,18 +180,19 @@
_contexts.bindingContext = bc;
}
+
public void init() throws BpelEngineException {
_mngmtLock.writeLock().lock();
try {
- if (_initialized)
- throw new IllegalStateException("init() called twice.");
-
- if (__log.isDebugEnabled()) {
- __log.debug("BPEL SERVER initializing ");
- }
+ if (!checkState(State.SHUTDOWN, State.INIT))
+ return;
+ __log.debug("BPEL SERVER initializing ");
+
_db = new BpelDatabase(_contexts.dao, _contexts.scheduler);
- _initialized = true;
+ _state = State.INIT;
+ _engine = new BpelEngineImpl(_contexts);
+
} finally {
_mngmtLock.writeLock().unlock();
}
@@ -234,10 +201,11 @@
public void shutdown() throws BpelEngineException {
_mngmtLock.writeLock().lock();
try {
- if (!_initialized)
- return;
-
stop();
+
+ _db = null;
+ _engine = null;
+ _state = State.SHUTDOWN;
} finally {
_mngmtLock.writeLock().unlock();
}
@@ -245,87 +213,46 @@
}
public BpelEngine getEngine() {
- // Acquire a readlock for the current thread / transaction and then
- // return
- // an engine instance.
-
- // First check if this thread has already requested the engine for this
- // transaction, if not, acquire the lock.
- // if (!_associated.get()) {
- // if (!_started) {
- // String errmsg = "call on getEngine() on server that has not been
- // started!";
- // __log.debug(errmsg);
- // throw new IllegalStateException(errmsg);
- // }
- //
- // // We need to schedule a task to release the lock.
- // // _contexts.scheduler.scheduleTransactionCallback();
- // _associated.set(Boolean.TRUE);
- // }
-
+ // TODO: acquire read lock and tie the release to the current transaction.
return _engine;
}
- public void load(final QName pid, boolean sticky) {
- if (__log.isTraceEnabled())
- __log.trace("load: " + pid);
+
+
+ public void register(ProcessConf conf) {
+ if (conf == null)
+ throw new NullPointerException("must specify non-null process configuration.");
+
+ __log.debug("register: " + conf.getProcessId());
+ // Load the compiled process.
+ OProcess compiledProcess;
try {
- _mngmtLock.writeLock().lockInterruptibly();
- } catch (InterruptedException ie) {
- __log.debug("load() interrupted.", ie);
- throw new BpelEngineException(__msgs.msgOperationInterrupted());
- }
- try {
- if (sticky) _contexts.store.markActive(pid, true);
- byte[] serProc = _contexts.store.getActiveProcesses().get(pid);
- doActivateProcess(pid, serProc);
- } finally {
- _mngmtLock.writeLock().unlock();
+ compiledProcess = deserializeCompiledProcess(conf.getCBPInputStream());
+ } catch (Exception e) {
+ String errmsg = __msgs.msgProcessLoadError(conf.getProcessId());
+ __log.error(errmsg, e);
+ throw new BpelEngineException(errmsg, e);
}
- }
-
- public void unload(QName pid, boolean sticky) throws BpelEngineException {
- if (__log.isTraceEnabled())
- __log.trace("unload " + pid);
+
+ // Ok, IO out of the way, we will mod the server state, so need to get a
+ // lock.
try {
_mngmtLock.writeLock().lockInterruptibly();
} catch (InterruptedException ie) {
- __log.debug("unload() interrupted.", ie);
+ __log.debug("register(...) interrupted.", ie);
throw new BpelEngineException(__msgs.msgOperationInterrupted());
}
try {
- if (sticky) _contexts.store.markActive(pid, false);
- unregisterProcess(pid);
- } finally {
- _mngmtLock.writeLock().unlock();
- }
- }
-
- /**
- * Activate the process in the engine.
- *
- * @param pid
- */
- private void doActivateProcess(final QName pid, byte[] serProcess) {
- _mngmtLock.writeLock().lock();
- try {
- // Load the compiled process.
- OProcess compiledProcess = loadProcess(pid, serProcess);
- // Check that process exist, otherwise creates it (lazy creation)
- checkProcessExistence(pid, compiledProcess);
-
// If the process is already active, do nothing.
- if (_engine.isProcessRegistered(pid)) {
- __log.debug("skipping doActivateProcess(" + pid + ") -- process is already active");
+ if (_engine.isProcessRegistered(conf.getProcessId())) {
+ __log.debug("skipping doRegister" + conf.getProcessId() + ") -- process is already registered");
return;
}
- if (__log.isDebugEnabled())
- __log.debug("Process " + pid + " is not active, creating new entry.");
+ __log.debug("Registering process " + conf.getProcessId() + " with server.");
// Create an expression language registry for this process
ExpressionLanguageRuntimeRegistry elangRegistry = new ExpressionLanguageRuntimeRegistry();
@@ -333,133 +260,93 @@
try {
elangRegistry.registerRuntime(elang);
} catch (ConfigurationException e) {
- String msg = "Expression language registration error.";
+ String msg = __msgs.msgExpLangRegistrationError(elang.expressionLanguageUri, elang.properties);
__log.error(msg, e);
throw new BpelEngineException(msg, e);
}
}
- // Create local message-exchange interceptors.
- List<MessageExchangeInterceptor> localMexInterceptors = new LinkedList<MessageExchangeInterceptor>();
- for (String mexclass : _contexts.store.getMexInterceptors(pid)) {
- try {
- Class cls = Class.forName(mexclass);
- localMexInterceptors.add((MessageExchangeInterceptor) cls.newInstance());
- } catch (Throwable t) {
- String errmsg = "Error instantiating message-exchange interceptor " + mexclass;
- __log.error(errmsg, t);
- }
- }
- // Create myRole endpoint name mapping (from deployment descriptor)
- HashMap<OPartnerLink, Endpoint> myRoleEndpoints = new HashMap<OPartnerLink, Endpoint>();
- for (Map.Entry<String, Endpoint> provide : _contexts.store.getProvideEndpoints(pid).entrySet()) {
- OPartnerLink plink = compiledProcess.getPartnerLink(provide.getKey());
- if (plink == null) {
- String errmsg = "Error in deployment descriptor for process " + pid
- + "; reference to unknown partner link " + provide.getKey();
- __log.error(errmsg);
- throw new BpelEngineException(errmsg);
- }
- myRoleEndpoints.put(plink, provide.getValue());
- }
+ // Create the processDAO if necessary.
+ createProcessDAO(conf.getProcessId(), compiledProcess);
- // Create partnerRole initial value mapping
- HashMap<OPartnerLink, Endpoint> partnerRoleIntialValues = new HashMap<OPartnerLink, Endpoint>();
- for (Map.Entry<String, Endpoint> invoke : _contexts.store.getInvokeEndpoints(pid).entrySet()) {
- OPartnerLink plink = compiledProcess.getPartnerLink(invoke.getKey());
- if (plink == null) {
- String errmsg = "Error in deployment descriptor for process " + pid
- + "; reference to unknown partner link " + invoke.getKey();
- __log.error(errmsg);
- throw new BpelEngineException(errmsg);
- }
- __log.debug("Processing <invoke> element for process " + pid + ": partnerlink " +
- invoke.getKey() + " --> " + invoke.getValue());
-
- partnerRoleIntialValues.put(plink, invoke.getValue());
- }
-
- BpelProcess process = new BpelProcess(pid, compiledProcess, myRoleEndpoints, partnerRoleIntialValues,
- null, elangRegistry, localMexInterceptors, _contexts.store);
+ BpelProcess process = new BpelProcess(conf, compiledProcess, null,elangRegistry);
_engine.registerProcess(process);
- __log.info(__msgs.msgProcessActivated(pid));
+ __log.info(__msgs.msgProcessRegistered(conf.getProcessId()));
} finally {
_mngmtLock.writeLock().unlock();
}
}
- private boolean checkProcessExistence(final QName pid, final OProcess oprocess) {
+ public void unregister(QName pid) throws BpelEngineException {
+ if (__log.isTraceEnabled())
+ __log.trace("unregister: " + pid);
+
try {
- boolean existed = true;
- if (_contexts.store.getProcessConfiguration(pid).isInMemory()) {
- existed = false;
- ProcessDAO newDao = _contexts.inMemDao.getConnection().createProcess(pid, oprocess.getQName());
- for (String correlator : oprocess.getCorrelators()) {
- newDao.addCorrelator(correlator);
- }
- } else {
- existed = _db.exec(new BpelDatabase.Callable<Boolean>() {
- public Boolean run(BpelDAOConnection conn) throws Exception {
- // Hack, but at least for now we need to ensure that we
- // are
- // the only process with this process id.
- ProcessDAO old = conn.getProcess(pid);
- if (old != null) return true;
-
- ProcessDAO newDao = conn.createProcess(pid, oprocess.getQName());
- for (String correlator : oprocess.getCorrelators()) {
- newDao.addCorrelator(correlator);
- }
- return false;
- }
- });
- }
- if (__log.isDebugEnabled()) {
- if (existed) __log.debug("Process runtime already exist (" + pid + "), no need to create.");
- else __log.debug("Created new process runtime " + pid);
- }
- return existed;
- } catch (BpelEngineException ex) {
- throw ex;
- } catch (Exception dce) {
- __log.error("", dce);
- throw new BpelEngineException("", dce);
+ _mngmtLock.writeLock().lockInterruptibly();
+ } catch (InterruptedException ie) {
+ __log.debug("unregister() interrupted.", ie);
+ throw new BpelEngineException(__msgs.msgOperationInterrupted());
}
- }
- private boolean unregisterProcess(final QName pid) {
try {
if (_engine != null)
_engine.unregisterProcess(pid);
- if (__log.isDebugEnabled())
- __log.debug("Unregistering process " + pid);
+ __log.info(__msgs.msgProcessUnregistered(pid));
- // Delete it from the database.
- boolean found = _db.exec(new BpelDatabase.Callable<Boolean>() {
- public Boolean run(BpelDAOConnection conn) throws Exception {
- ProcessDAO proc = conn.getProcess(pid);
- if (proc != null) {
- proc.delete();
- return true;
+ } catch (Exception ex) {
+ __log.error(__msgs.msgProcessUnregisterFailed(pid), ex);
+ throw new BpelEngineException(ex);
+ } finally {
+ _mngmtLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * If necessary, create an object in the data store to represent the
+ * process. We'll re-use an existing object if it already exists and matches
+ * the GUID.
+ *
+ * @param pid
+ * @param oprocess
+ * @return
+ */
+ private void createProcessDAO(final QName pid, final OProcess oprocess) {
+ __log.debug("Creating process DAO for " + pid + " (guid=" + oprocess.guid + ")");
+ try {
+ _db.exec(new BpelDatabase.Callable<Object>() {
+ public Object run(BpelDAOConnection conn) throws Exception {
+ ProcessDAO old = conn.getProcess(pid);
+ if (old != null) {
+ __log.debug("Found ProcessDAO for " + pid + " with GUID " + old.getGuid());
+
+ if (oprocess.guid != null)
+ if (!old.getGuid().equals(oprocess.guid)) {
+ // TODO: Versioning will need to handle this differently.
+ String errmsg = "ProcessDAO GUID " + old.getGuid() + " does not match " + oprocess.guid
+ + "; replacing.";
+ __log.warn(errmsg);
+ old.delete();
+ } else
+ return null;
+ return null;
}
- return false;
+
+ ProcessDAO newDao = conn.createProcess(pid, oprocess.getQName(), oprocess.guid);
+ for (String correlator : oprocess.getCorrelators()) {
+ newDao.addCorrelator(correlator);
+ }
+ return null;
}
});
-
- if (found) {
- __log.info(__msgs.msgProcessUnregistered(pid));
- return true;
- }
- return false;
- } catch (RuntimeException ex) {
+ } catch (BpelEngineException ex) {
throw ex;
- } catch (Exception ex) {
- __log.error(__msgs.msgProcessUnregisterFailed(pid), ex);
- throw new BpelEngineException(ex);
+ } catch (Exception dce) {
+ __log.error("DbError", dce);
+ throw new BpelEngineException("DbError", dce);
}
}
@@ -485,13 +372,56 @@
_contexts.globalIntereceptors.remove(interceptor);
}
+ /**
+ * Check a state transition from state "i" to state "j".
+ */
+ private final boolean checkState(State i, State j) {
+ if (_state == i)
+ return true;
+
+ if (_state == j)
+ return false;
+
+ throw new IllegalStateException("Unexpected state: " + i);
+
+ }
/**
- * Inject a ProcessStore implementation.
- * @param store a ProcessStore instance
+ * De-serialize the compiled process representation from a stream.
+ *
+ * @param is
+ * input stream
+ * @return process information from configuration database
*/
- public void setProcessStore(ProcessStore store) {
- _contexts.store = store;
+ private OProcess deserializeCompiledProcess(InputStream is) throws Exception {
+
+ OProcess compiledProcess;
+ Serializer ofh = new Serializer(is);
+ compiledProcess = ofh.readOProcess();
+ return compiledProcess;
}
+ /* TODO: We need to have a method of cleaning up old deployment data. */
+ private boolean deleteProcessDAO(final QName pid) {
+
+ try {
+ // Delete it from the database.
+ return _db.exec(new BpelDatabase.Callable<Boolean>() {
+ public Boolean run(BpelDAOConnection conn) throws Exception {
+ ProcessDAO proc = conn.getProcess(pid);
+ if (proc != null) {
+ proc.delete();
+ return true;
+ }
+ return false;
+ }
+ });
+
+ } catch (Exception ex) {
+ String errmsg = "DbError";
+ __log.error(errmsg, ex);
+ throw new BpelEngineException(errmsg, ex);
+ }
+
+ }
}
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java?view=diff&rev=480345&r1=480344&r2=480345
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java Tue Nov 28 18:37:25 2006
@@ -44,7 +44,6 @@
BpelDAOConnectionFactory dao;
BpelDAOConnectionFactory inMemDao;
- ProcessStore store;
/** Global Message-Exchange interceptors. Must be copy-on-write!!! */
final List<MessageExchangeInterceptor >globalIntereceptors = new CopyOnWriteArrayList<MessageExchangeInterceptor>();
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DocumentInfoGenerator.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DocumentInfoGenerator.java?view=diff&rev=480345&r1=480344&r2=480345
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DocumentInfoGenerator.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DocumentInfoGenerator.java Tue Nov 28 18:37:25 2006
@@ -46,7 +46,7 @@
}
- DocumentInfoGenerator(File baseDir, File f) {
+ DocumentInfoGenerator(File f) {
_file = f;
recognize();
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InterceptorContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InterceptorContextImpl.java?view=diff&rev=480345&r1=480344&r2=480345
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InterceptorContextImpl.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InterceptorContextImpl.java Tue Nov 28 18:37:25 2006
@@ -20,6 +20,7 @@
import org.apache.ode.bpel.dao.BpelDAOConnection;
import org.apache.ode.bpel.dao.ProcessDAO;
+import org.apache.ode.bpel.iapi.ProcessConf;
import org.apache.ode.bpel.iapi.ProcessStore;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorContext;
@@ -32,12 +33,12 @@
public class InterceptorContextImpl implements InterceptorContext{
private ProcessDAO _processDao;
private BpelDAOConnection _connection;
- private ProcessStore _store;
+ private ProcessConf _pconf;
- public InterceptorContextImpl(BpelDAOConnection connection, ProcessDAO processDAO, ProcessStore store) {
+ public InterceptorContextImpl(BpelDAOConnection connection, ProcessDAO processDAO, ProcessConf pconf) {
_connection = connection;
_processDao = processDAO;
- _store = store;
+ _pconf = pconf;
}
public BpelDAOConnection getConnection() {
@@ -48,8 +49,9 @@
return _processDao;
}
- public ProcessStore getProcessStore() {
- return _store;
+ public ProcessConf getProcessConf() {
+ // TODO Auto-generated method stub
+ return null;
}
}
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Messages.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Messages.java?view=diff&rev=480345&r1=480344&r2=480345
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Messages.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Messages.java Tue Nov 28 18:37:25 2006
@@ -24,6 +24,7 @@
import java.io.File;
import java.net.URI;
import java.util.Date;
+import java.util.Map;
/**
* Message bundle used by the BPEL provider implementation.
@@ -32,152 +33,157 @@
*/
public class Messages extends MessageBundle {
- String msgBarProcessLoadErr() {
- return format("Unable to load compiled BPEL process.");
- }
-
- String msgProcessDeployed(QName processId) {
- return format("Process {0} deployed." , processId);
- }
-
- /** A database error prevented the operation from succeeding. */
- String msgDbError() {
- return format("A database error prevented the operation from succeeding.");
- }
-
- /** The instance "{0}" was not found in the database. */
- String msgInstanceNotFound(Long pid) {
- return format("The instance \"{0}\" was not found in the database.",pid);
- }
-
- String msgUnknownEPR(String epr) {
- return format("Unkown EPR: {0}" , epr);
- }
-
- String msgProcessUnregistered(QName process) {
- return format("Process {0} has been unregistered." , process);
- }
-
- String msgProcessUnregisterFailed(QName process) {
- return format("Failed to unregister process {0}! Check database for consistency!" , process);
- }
-
- String msgProcessNotFound(QName pid) {
- return format("Process {0} not found. ",pid);
- }
-
- String msgProcessNotActive(QName processId) {
- return format("Process {0} is not active. ", processId);
- }
-
- String msgProcessLoadError(QName processId) {
- return format("Process {0}, could not be loaded. ", processId);
- }
-
- String msgDeployFailDescriptorURIInvalid(URI dduri) {
- return format("Deployment failure: invalid deployment descriptor URI \"{0}\" ", dduri);
- }
-
- String msgDeployFailDescriptorInvalid(URI dduri) {
- return format("Deployment failure: invalid/malformed deployment descriptor at \"{0}\"", dduri);
- }
-
- String msgDeployFailDescriptorIOError(URI dduri) {
- return format("Deployment failure: IO error reading deployment descriptor at \"{0}\"", dduri);
- }
-
- /** Partner link declared in process deployment descriptor could not be found in process definition: {0} */
- String msgDDPartnerLinkNotFound(String partnerLinkName) {
- return format("Partner link declared in process deployment descriptor could not be found in process " +
- "definition: {0}", partnerLinkName);
- }
-
- String msgDDPartnerRoleNotFound(String partnerLinkName) {
- return format("Role 'partnerRole' declared in process deployment descriptor isn't defined in process definition " +
- "for partner link : {0}", partnerLinkName);
- }
-
- String msgDDMyRoleNotFound(String partnerLinkName) {
- return format("Role 'myRole' declared in process deployment descriptor isn't defined in process definition for " +
- "partner link : {0}", partnerLinkName);
- }
-
- String msgDDNoInitiliazePartnerRole(String partnerLinkName) {
- return format("Partner link {0} is defined in process as initializePartnerRole=no, its partner role endpoint " +
- "can't be initialized by deployment descriptor.", partnerLinkName);
- }
-
- String msgProcessDeployErrAlreadyDeployed(QName processId) {
- return format("The process could not be deployed; another process is already deployed as {0}!",processId);
- }
-
- String msgScheduledJobReferencesUnknownInstance(Long iid) {
- return format("Received a scheduled job event for unknown instance {0}", iid);
- }
-
- String msgReschedulingJobForInactiveProcess(QName processId, String jobId, Date rescheduled) {
- return format("Received a scheduled job event for inactive process {0}; " +
- "rescheduling job {1} for {2}", processId, jobId, rescheduled);
- }
-
- String msgProcessActivationError(QName pid) {
- return format("Error activating process {0}",pid);
- }
-
- String msgOperationInterrupted() {
- return format("Operation was interrupted.");
- }
-
- String msgProcessActivated(QName pid) {
- return format("Activated process {0}.",pid);
- }
-
- String msgServerStarted() {
- return format("BPEL Server Started.");
- }
-
- String msgServerStopped() {
- return format("BPEL Server Stopped.");
- }
-
- String msgUndefinedServicePort(QName service, String port) {
- return format("The service name and port defined in your deployment descriptor couldn't be found " +
- "in any WSDL document: {0} {1}.", service, port);
- }
-
- String msgInterceptorAborted(String mexId, String interceptor, String msg) {
- return format("Message exchange {0} aborted by interceptor {1}: {2}", mexId, interceptor, msg);
- }
-
- String msgMyRoleRoutingFailure(String messageExchangeId) {
- return format("Unable to route message exchange {0}, EPR was not specified " +
- "and the target my-role could not be inferred.",messageExchangeId);
- }
-
- String msgPropertyAliasReturnedNullSet(String alias, String variable) {
- return this.format("msgPropertyAliasReturnedNullSet: {0} {1}", alias, variable);
- }
-
- String msgUnknownOperation(String operationName,QName portType) {
- return format("Unknown operation \"{0}\" for port type \"{1}\".",operationName,portType);
- }
-
- String msgPropertyAliasDerefFailedOnMessage(String aliasDescription, String reason) {
- return this.format(
- "Unable to evaluate apply property alias \"{0}\" to incoming message: {1}",
- aliasDescription, reason);
- }
-
- public String msgDeployStarting(File deploymentUnitDirectory) {
- return format("Starting deployment of processes from directory \"{0}\". ", deploymentUnitDirectory);
- }
-
- public String msgDeployFailed(QName name, File deploymentUnitDirectory) {
- return format("Deployment of process \"{0}\" from \"{1}\" failed.", name,deploymentUnitDirectory);
- }
-
- public String msgDeployRollback(File deploymentUnitDirectory) {
- return format("Deployment of processes from \"{0}\" failed, rolling back. ", deploymentUnitDirectory);
- }
-
+ String msgBarProcessLoadErr() {
+ return format("Unable to load compiled BPEL process.");
+ }
+
+ String msgProcessDeployed(QName processId) {
+ return format("Process {0} deployed.", processId);
+ }
+
+ /** A database error prevented the operation from succeeding. */
+ String msgDbError() {
+ return format("A database error prevented the operation from succeeding.");
+ }
+
+ /** The instance "{0}" was not found in the database. */
+ String msgInstanceNotFound(Long pid) {
+ return format("The instance \"{0}\" was not found in the database.", pid);
+ }
+
+ String msgUnknownEPR(String epr) {
+ return format("Unkown EPR: {0}", epr);
+ }
+
+ String msgProcessUnregistered(QName process) {
+ return format("Process {0} has been unregistered.", process);
+ }
+
+ String msgProcessUnregisterFailed(QName process) {
+ return format("Failed to unregister process {0}! Check database for consistency!", process);
+ }
+
+ String msgProcessNotFound(QName pid) {
+ return format("Process {0} not found. ", pid);
+ }
+
+ String msgProcessNotActive(QName processId) {
+ return format("Process {0} is not active. ", processId);
+ }
+
+ String msgProcessLoadError(QName processId) {
+ return format("Process {0}, could not be loaded. ", processId);
+ }
+
+ String msgDeployFailDescriptorURIInvalid(URI dduri) {
+ return format("Deployment failure: invalid deployment descriptor URI \"{0}\" ", dduri);
+ }
+
+ String msgDeployFailDescriptorInvalid(URI dduri) {
+ return format("Deployment failure: invalid/malformed deployment descriptor at \"{0}\"", dduri);
+ }
+
+ String msgDeployFailDescriptorIOError(URI dduri) {
+ return format("Deployment failure: IO error reading deployment descriptor at \"{0}\"", dduri);
+ }
+
+ /**
+ * Partner link declared in process deployment descriptor could not be found
+ * in process definition: {0}
+ */
+ String msgDDPartnerLinkNotFound(String partnerLinkName) {
+ return format("Partner link declared in process deployment descriptor could not be found in process " + "definition: {0}",
+ partnerLinkName);
+ }
+
+ String msgDDPartnerRoleNotFound(String partnerLinkName) {
+ return format("Role 'partnerRole' declared in process deployment descriptor isn't defined in process definition "
+ + "for partner link : {0}", partnerLinkName);
+ }
+
+ String msgDDMyRoleNotFound(String partnerLinkName) {
+ return format("Role 'myRole' declared in process deployment descriptor isn't defined in process definition for "
+ + "partner link : {0}", partnerLinkName);
+ }
+
+ String msgDDNoInitiliazePartnerRole(String partnerLinkName) {
+ return format("Partner link {0} is defined in process as initializePartnerRole=no, its partner role endpoint "
+ + "can't be initialized by deployment descriptor.", partnerLinkName);
+ }
+
+ String msgProcessDeployErrAlreadyDeployed(QName processId) {
+ return format("The process could not be deployed; another process is already deployed as {0}!", processId);
+ }
+
+ String msgScheduledJobReferencesUnknownInstance(Long iid) {
+ return format("Received a scheduled job event for unknown instance {0}", iid);
+ }
+
+ String msgReschedulingJobForInactiveProcess(QName processId, String jobId, Date rescheduled) {
+ return format("Received a scheduled job event for inactive process {0}; " + "rescheduling job {1} for {2}", processId,
+ jobId, rescheduled);
+ }
+
+ String msgProcessActivationError(QName pid) {
+ return format("Error activating process {0}", pid);
+ }
+
+ String msgOperationInterrupted() {
+ return format("Operation was interrupted.");
+ }
+
+ String msgProcessRegistered(QName pid) {
+ return format("Activated process {0}.", pid);
+ }
+
+ String msgServerStarted() {
+ return format("BPEL Server Started.");
+ }
+
+ String msgServerStopped() {
+ return format("BPEL Server Stopped.");
+ }
+
+ String msgUndefinedServicePort(QName service, String port) {
+ return format("The service name and port defined in your deployment descriptor couldn't be found "
+ + "in any WSDL document: {0} {1}.", service, port);
+ }
+
+ String msgInterceptorAborted(String mexId, String interceptor, String msg) {
+ return format("Message exchange {0} aborted by interceptor {1}: {2}", mexId, interceptor, msg);
+ }
+
+ String msgMyRoleRoutingFailure(String messageExchangeId) {
+ return format("Unable to route message exchange {0}, EPR was not specified "
+ + "and the target my-role could not be inferred.", messageExchangeId);
+ }
+
+ String msgPropertyAliasReturnedNullSet(String alias, String variable) {
+ return this.format("msgPropertyAliasReturnedNullSet: {0} {1}", alias, variable);
+ }
+
+ String msgUnknownOperation(String operationName, QName portType) {
+ return format("Unknown operation \"{0}\" for port type \"{1}\".", operationName, portType);
+ }
+
+ String msgPropertyAliasDerefFailedOnMessage(String aliasDescription, String reason) {
+ return this.format("Unable to evaluate apply property alias \"{0}\" to incoming message: {1}", aliasDescription, reason);
+ }
+
+ public String msgDeployStarting(File deploymentUnitDirectory) {
+ return format("Starting deployment of processes from directory \"{0}\". ", deploymentUnitDirectory);
+ }
+
+ public String msgDeployFailed(QName name, File deploymentUnitDirectory) {
+ return format("Deployment of process \"{0}\" from \"{1}\" failed.", name, deploymentUnitDirectory);
+ }
+
+ public String msgDeployRollback(File deploymentUnitDirectory) {
+ return format("Deployment of processes from \"{0}\" failed, rolling back. ", deploymentUnitDirectory);
+ }
+
+ public String msgExpLangRegistrationError(String expressionLanguageUri, Map<String, String> properties) {
+ return format("Error registering expression language \"" + expressionLanguageUri + "\" with properties " + properties);
+ }
+
}
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?view=diff&rev=480345&r1=480344&r2=480345
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java Tue Nov 28 18:37:25 2006
@@ -58,8 +58,8 @@
* <code>false</code> otherwise
*/
private boolean processInterceptors(MyRoleMessageExchangeImpl mex, InterceptorInvoker invoker) {
- InterceptorContextImpl ictx = new InterceptorContextImpl(_engine._contexts.dao.getConnection(), null,
- _engine._contexts.store);
+ InterceptorContextImpl ictx = new InterceptorContextImpl(_engine._contexts.dao.getConnection(),
+ mex._dao.getProcess(), null);
for (MessageExchangeInterceptor i : _engine.getGlobalInterceptors())
if (!processInterceptor(i, mex, ictx, invoker))
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessAndInstanceManagementImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessAndInstanceManagementImpl.java?view=diff&rev=480345&r1=480344&r2=480345
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessAndInstanceManagementImpl.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessAndInstanceManagementImpl.java Tue Nov 28 18:37:25 2006
@@ -28,8 +28,10 @@
import org.apache.ode.bpel.evt.*;
import org.apache.ode.bpel.evtproc.ActivityStateDocumentBuilder;
import org.apache.ode.bpel.iapi.BpelEngineException;
+import org.apache.ode.bpel.iapi.BpelServer;
import org.apache.ode.bpel.iapi.EndpointReference;
import org.apache.ode.bpel.iapi.ProcessConf;
+import org.apache.ode.bpel.iapi.ProcessState;
import org.apache.ode.bpel.iapi.ProcessStore;
import org.apache.ode.bpel.o.OBase;
import org.apache.ode.bpel.o.OPartnerLink;
@@ -52,23 +54,21 @@
*
* TODO Pull up IM/PM methods from BpelManagementFacadeImpl
*/
-class ProcessAndInstanceManagementImpl
+public class ProcessAndInstanceManagementImpl
implements InstanceManagement, ProcessManagement {
protected static final Messages __msgs = MessageBundle.getMessages(Messages.class);
protected static Log __log = LogFactory.getLog(BpelManagementFacadeImpl.class);
protected static final ProcessStatusConverter __psc = new ProcessStatusConverter();
- protected BpelEngineImpl _engine;
- protected BpelServerImpl _server;
protected BpelDatabase _db;
protected ProcessStore _store;
protected Calendar _calendar = Calendar.getInstance(); // Calendar can be expensive to initialize so we cache and clone it
+ protected BpelServerImpl _server;
- public ProcessAndInstanceManagementImpl(BpelDatabase db, BpelEngineImpl engine,
- BpelServerImpl server, ProcessStore store) {
- _db = db;
- _engine = engine;
- _server = server;
+ public ProcessAndInstanceManagementImpl(BpelServer server,
+ ProcessStore store) {
+ _server = (BpelServerImpl) server;
+ _db = _server._db;
_store = store;
}
@@ -120,7 +120,7 @@
public ProcessInfoDocument setRetired(final QName pid, final boolean retired)
throws ManagementException {
try {
- _store.markActive(pid, !retired);
+ _store.setState(pid, retired ? ProcessState.RETIRED : ProcessState.ACTIVE);
} catch (BpelEngineException e) {
throw new ProcessNotFoundException("ProcessNotFound:" + pid);
}
@@ -137,7 +137,7 @@
ProcessDAO proc = conn.getProcess(pid);
if (proc == null)
throw new ProcessNotFoundException("ProcessNotFound:" + pid);
- _store.setProperty(pid, propertyName.getLocalPart(), propertyName.getNamespaceURI(), value);
+ _store.setProperty(pid, propertyName, value);
fillProcessInfo(pi, proc, new ProcessInfoCustomizer(ProcessInfoCustomizer.Item.PROPERTIES));
return null;
}
@@ -161,7 +161,7 @@
ProcessDAO proc = conn.getProcess(pid);
if (proc == null)
throw new ProcessNotFoundException("ProcessNotFound:" + pid);
- _store.setProperty(pid, propertyName.getLocalPart(), propertyName.getNamespaceURI(), value);
+ _store.setProperty(pid, propertyName, value);
fillProcessInfo(pi, proc, new ProcessInfoCustomizer(ProcessInfoCustomizer.Item.PROPERTIES));
return null;
}
@@ -296,7 +296,7 @@
return null;
for (ActivityRecoveryDAO recovery: instance.getActivityRecoveries()) {
if (recovery.getActivityId() == aid) {
- BpelProcess process = _engine._activeProcesses.get(instance.getProcess().getProcessId());
+ BpelProcess process = _server._engine._activeProcesses.get(instance.getProcess().getProcessId());
if (process != null) {
process.recoverActivity(instance, recovery.getChannel(), aid, action, null);
break;
@@ -377,7 +377,7 @@
public ActivityExtInfoListDocument getExtensibilityElements(QName pid, Integer[] aids) {
ActivityExtInfoListDocument aeild = ActivityExtInfoListDocument.Factory.newInstance();
TActivitytExtInfoList taeil = aeild.addNewActivityExtInfoList();
- OProcess oprocess = _engine.getOProcess(pid);
+ OProcess oprocess = _server._engine.getOProcess(pid);
for (int aid : aids) {
OBase obase = oprocess.getChild(aid);
@@ -411,7 +411,7 @@
*/
protected final DebuggerSupport getDebugger(QName procid) throws ManagementException {
- BpelProcess process = _engine._activeProcesses.get(procid);
+ BpelProcess process = _server._engine._activeProcesses.get(procid);
if (process == null)
throw new InvalidRequestException("The process \"" + procid + "\" is available." );
@@ -575,7 +575,7 @@
info.setPid(proc.getProcessId().toString());
// TODO: ACTIVE and RETIRED should be used separately.
//Active process may be retired at the same time
- if(!pconf.isActive()) {
+ if(pconf.getState() == ProcessState.RETIRED) {
info.setStatus(TProcessStatus.RETIRED);
} else {
info.setStatus(TProcessStatus.ACTIVE);
@@ -599,11 +599,11 @@
}
TProcessInfo.Documents docinfo = info.addNewDocuments();
- File files[] = pconf.getFiles();
+ List<File> files = pconf.getFiles();
if (files != null)
- genDocumentInfo(docinfo, _store.getDeploymentDir(), files, true);
+ genDocumentInfo(docinfo, files.toArray(new File[files.size()]), true);
else if (__log.isDebugEnabled())
- __log.debug("fillProcessInfo: No files for " + _store.getDeploymentDir() + " !!!");
+ __log.debug("fillProcessInfo: No files for " + pconf.getProcessId() + " !!!");
if (custom.includeProcessProperties()) {
TProcessProperties properties = info.addNewProperties();
@@ -617,12 +617,12 @@
}
}
- OProcess oprocess = _engine.getOProcess(proc.getProcessId());
+ OProcess oprocess = _server._engine.getOProcess(proc.getProcessId());
if (custom.includeEndpoints() && oprocess != null) {
TEndpointReferences eprs = info.addNewEndpoints();
for (OPartnerLink oplink : oprocess.getAllPartnerLinks()) {
if (oplink.hasPartnerRole() && oplink.initializePartnerRole) {
- EndpointReference pepr = _engine._activeProcesses.get(proc.getProcessId())
+ EndpointReference pepr = _server._engine._activeProcesses.get(proc.getProcessId())
.getInitialPartnerRoleEPR(oplink);
if (pepr!= null) {
@@ -643,7 +643,7 @@
* @param files files
* @param recurse recurse down directories?
*/
- private void genDocumentInfo(TProcessInfo.Documents docinfo, File rootdir, File[] files,boolean recurse) {
+ private void genDocumentInfo(TProcessInfo.Documents docinfo, File[] files,boolean recurse) {
if (files == null)
return;
for (File f : files) {
@@ -652,15 +652,15 @@
if (f.isDirectory()) {
if (recurse)
- genDocumentInfo(docinfo, rootdir, f.listFiles(), true);
+ genDocumentInfo(docinfo, f.listFiles(), true);
} else if (f.isFile()) {
- genDocumentInfo(docinfo, rootdir, f);
+ genDocumentInfo(docinfo,f);
}
}
}
- private void genDocumentInfo(TProcessInfo.Documents docinfo, File rootDir, File f) {
- DocumentInfoGenerator dig = new DocumentInfoGenerator(rootDir,f);
+ private void genDocumentInfo(TProcessInfo.Documents docinfo, File f) {
+ DocumentInfoGenerator dig = new DocumentInfoGenerator(f);
if (dig.isRecognized() && dig.isVisible()) {
TDocumentInfo doc = docinfo.addNewDocument();
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/MessageExchangeInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/MessageExchangeInterceptor.java?view=diff&rev=480345&r1=480344&r2=480345
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/MessageExchangeInterceptor.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/MessageExchangeInterceptor.java Tue Nov 28 18:37:25 2006
@@ -22,6 +22,7 @@
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
+import org.apache.ode.bpel.iapi.ProcessConf;
import org.apache.ode.bpel.iapi.ProcessStore;
/**
@@ -81,7 +82,7 @@
ProcessDAO getProcessDAO();
- ProcessStore getProcessStore();
+ ProcessConf getProcessConf();
}
}
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/ThrottlingInterceptor.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/ThrottlingInterceptor.java?view=diff&rev=480345&r1=480344&r2=480345
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/ThrottlingInterceptor.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/ThrottlingInterceptor.java Tue Nov 28 18:37:25 2006
@@ -57,8 +57,7 @@
* @return value of the property, or <code>null</code> if not set
*/
private String getSimpleProperty(QName propertyName, InterceptorContext ic) {
- Map<QName, Node> props = ic.getProcessStore().getProcessConfiguration(
- ic.getProcessDAO().getProcessId()).getProperties();
+ Map<QName, Node> props = ic.getProcessConf().getProperties();
for (Map.Entry<QName, Node> prop : props.entrySet()) {
if (prop.getKey().equals(propertyName))
return ((Text)prop.getValue()).getWholeText();
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java?view=diff&rev=480345&r1=480344&r2=480345
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java Tue Nov 28 18:37:25 2006
@@ -42,8 +42,8 @@
return _store.get(processId);
}
- public ProcessDAO createProcess(QName pid, QName type) {
- ProcessDaoImpl process = new ProcessDaoImpl(this,_store,pid,type);
+ public ProcessDAO createProcess(QName pid, QName type, String guid) {
+ ProcessDaoImpl process = new ProcessDaoImpl(this,_store,pid,type, guid);
_store.put(pid,process);
return process;
}
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java?view=diff&rev=480345&r1=480344&r2=480345
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java Tue Nov 28 18:37:25 2006
@@ -32,13 +32,16 @@
private Map<QName, ProcessDaoImpl> _store;
private BpelDAOConnectionImpl _conn;
+ private String _guid;
+
public ProcessDaoImpl(BpelDAOConnectionImpl conn,
Map<QName, ProcessDaoImpl> store,
- QName processId, QName type) {
+ QName processId, QName type, String guid) {
if (__log.isDebugEnabled()) {
__log.debug("Creating ProcessDao object for process \"" + processId + "\".");
}
+ _guid = guid;
_conn = conn;
_store = store;
_processId = processId;
@@ -143,5 +146,13 @@
public Date getActivityFailureDateTime() {
return null;
+ }
+
+ public String getGuid() {
+ return _guid;
+ }
+
+ public void setGuid(String guid) {
+ _guid = guid;
}
}
Modified: incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/ActivityRecoveryTest.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/ActivityRecoveryTest.java?view=diff&rev=480345&r1=480344&r2=480345
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/ActivityRecoveryTest.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/ActivityRecoveryTest.java Tue Nov 28 18:37:25 2006
@@ -26,6 +26,7 @@
import junit.framework.TestCase;
+import org.apache.ode.bpel.engine.BpelManagementFacadeImpl;
import org.apache.ode.bpel.iapi.ContextException;
import org.apache.ode.bpel.iapi.Message;
import org.apache.ode.bpel.iapi.MessageExchange;
@@ -138,11 +139,11 @@
}
};
_server.deploy(new File(new URI(this.getClass().getResource("/recovery").toString())));
- _management = _server.getBpelManagementFacade();
+ _management = new BpelManagementFacadeImpl(_server._server,_server._store);
}
protected void tearDown() throws Exception {
- _server.getBpelManagementFacade().delete(null);
+ _management.delete(null);
_server.shutdown();
}
@@ -152,7 +153,7 @@
*/
protected void execute(String process, int failFor) throws Exception {
_failFor = failFor;
- _server.getBpelManagementFacade().delete(null);
+ _management.delete(null);
_processQName = new QName(NAMESPACE, process);
_server.invoke(_processQName, "instantiate",
DOMUtils.newDocument().createElementNS(NAMESPACE, "tns:RequestElement"));
@@ -247,7 +248,7 @@
assertNull(activity.getFailure());
}
for (TScopeRef ref : scope.getChildren().getChildRefList()) {
- TScopeInfo child = _server.getBpelManagementFacade().getScopeInfoWithActivity(ref.getSiid(), true).getScopeInfo();
+ TScopeInfo child = _management.getScopeInfoWithActivity(ref.getSiid(), true).getScopeInfo();
if (child != null)
getRecoveriesInScope(instance, child, recoveries);
}
Modified: incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java?view=diff&rev=480345&r1=480344&r2=480345
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java Tue Nov 28 18:37:25 2006
@@ -82,12 +82,11 @@
if (_scheduler == null)
throw new RuntimeException("No scheduler");
//_store = new ProcessStoreImpl(new File("."), _dataSource, _txManager);
- _store = new ProcessStoreImpl(new File("."), null, null);
+ _store = new ProcessStoreImpl();
_server.setScheduler(_scheduler);
_server.setEndpointReferenceContext(createEndpointReferenceContext());
_server.setMessageExchangeContext(createMessageExchangeContext());
_server.setBindingContext(createBindingContext());
- _server.setProcessStore(_store);
_server.init();
_server.start();
} catch (Exception except) {
@@ -100,7 +99,7 @@
public Collection<QName> deploy(File deploymentUnitDirectory) {
Collection<QName> pids = _store.deploy(deploymentUnitDirectory);
for (QName pid: pids)
- _server.load(pid, true);
+ _server.register(_store.getProcessConfiguration(pid));
return pids;
}
@@ -124,9 +123,6 @@
}
}
- public BpelManagementFacade getBpelManagementFacade() {
- return _server.getBpelManagementFacade();
- }
public TransactionManager getTransactionManager() {
return _txManager;