You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mr...@apache.org on 2007/02/06 23:17:57 UTC
svn commit: r504332 [1/2] - in /incubator/ode/trunk:
axis2/src/main/java/org/apache/ode/axis2/
bpel-api/src/main/java/org/apache/ode/bpel/iapi/
bpel-compiler/src/main/java/org/apache/ode/bpel/compiler/
bpel-obj/src/main/java/org/apache/ode/bpel/o/ bpel...
Author: mriou
Date: Tue Feb 6 14:17:56 2007
New Revision: 504332
URL: http://svn.apache.org/viewvc?view=rev&rev=504332
Log:
Being smarter about memory management. First the process definitions (OProcess) aren't loaded at startup but are lay loaded at the first request instead. Second a process reaper can be enabled to disassociate (dehydrate) processes according to a specific policy. The default policy controls both the time to live and the total number of processes held in memory. Processes are then rehydrated on demand.
Added:
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/CountLRUDehydrationPolicy.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DehydrationPolicy.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkRoleImpl.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessLifecycleCallback.java
Modified:
incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/BindingContextImpl.java
incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEConfigProperties.java
incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
incubator/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BindingContext.java
incubator/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelEngine.java
incubator/ode/trunk/bpel-compiler/src/main/java/org/apache/ode/bpel/compiler/BpelCompiler.java
incubator/ode/trunk/bpel-obj/src/main/java/org/apache/ode/bpel/o/OProcess.java
incubator/ode/trunk/bpel-obj/src/main/java/org/apache/ode/bpel/o/Serializer.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelManagementFacadeImpl.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReplacementMapImpl.java
incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
incubator/ode/trunk/bpel-test/src/main/java/org/apache/ode/test/BindingContextImpl.java
incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/BindingContextImpl.java
incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeContext.java
incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/util/WSDLFlattener.java
Modified: incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/BindingContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/BindingContextImpl.java?view=diff&rev=504332&r1=504331&r2=504332
==============================================================================
--- incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/BindingContextImpl.java (original)
+++ incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/BindingContextImpl.java Tue Feb 6 14:17:56 2007
@@ -42,8 +42,7 @@
_store = store;
}
- public EndpointReference activateMyRoleEndpoint(QName processId, Endpoint myRoleEndpoint,
- PortType portType) {
+ public EndpointReference activateMyRoleEndpoint(QName processId, Endpoint myRoleEndpoint) {
try {
ODEService svc = _server.createService(_store.getProcessConfiguration(processId).getDefinitionForService(myRoleEndpoint.serviceName)
, myRoleEndpoint.serviceName, myRoleEndpoint.portName);
Modified: incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEConfigProperties.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEConfigProperties.java?view=diff&rev=504332&r1=504331&r2=504332
==============================================================================
--- incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEConfigProperties.java (original)
+++ incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEConfigProperties.java Tue Feb 6 14:17:56 2007
@@ -45,6 +45,7 @@
private static final String PROP_WORKING_DIR = "ode-axis2.working.dir";
private static final String PROP_REPLICATE_EMPTYNS = "ode-axis2.message.replicate.emptyns";
private static final String PROP_EVENT_LISTENERS = "ode-axis2.event.listeners";
+ private static final String PROP_PROCESS_DEHYDRATION = "ode-axis2.process.dehydration";
private File _installDir;
@@ -143,4 +144,7 @@
return getProperty(PROP_EVENT_LISTENERS);
}
+ public boolean isDehydrationEnabled() {
+ return Boolean.valueOf(getProperty(ODEConfigProperties.PROP_PROCESS_DEHYDRATION, "false"));
+ }
}
Modified: incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java?view=diff&rev=504332&r1=504331&r2=504332
==============================================================================
--- incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java (original)
+++ incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java Tue Feb 6 14:17:56 2007
@@ -35,12 +35,14 @@
import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
import org.apache.ode.bpel.dao.BpelDAOConnectionFactoryJDBC;
import org.apache.ode.bpel.engine.BpelServerImpl;
+import org.apache.ode.bpel.engine.CountLRUDehydrationPolicy;
import org.apache.ode.bpel.iapi.BpelEventListener;
import org.apache.ode.bpel.iapi.ProcessStore;
import org.apache.ode.bpel.iapi.ProcessStoreEvent;
import org.apache.ode.bpel.iapi.ProcessStoreListener;
import org.apache.ode.bpel.scheduler.quartz.QuartzSchedulerImpl;
import org.apache.ode.store.ProcessStoreImpl;
+import org.apache.ode.utils.LoggingDataSourceWrapper;
import org.apache.ode.utils.fs.TempFileManager;
import org.opentools.minerva.MinervaPool;
@@ -68,6 +70,7 @@
public class ODEServer {
private static final Log __log = LogFactory.getLog(ODEServer.class);
+ private static final Log __logSql = LogFactory.getLog("org.apache.ode.sql");
private static final Messages __msgs = Messages.getMessages(Messages.class);
@@ -386,7 +389,10 @@
private void initExternalDb() throws ServletException {
try {
- _datasource = lookupInJndi(_odeConfig.getDbDataSource());
+ if (__logSql.isDebugEnabled())
+ _datasource = new LoggingDataSourceWrapper((DataSource) lookupInJndi(_odeConfig.getDbDataSource()), __logSql);
+ else
+ _datasource = (DataSource) lookupInJndi(_odeConfig.getDbDataSource());
__log.info(__msgs.msgOdeUsingExternalDb(_odeConfig.getDbDataSource()));
} catch (Exception ex) {
String msg = __msgs.msgOdeInitExternalDbFailed(_odeConfig.getDbDataSource());
@@ -435,7 +441,9 @@
throw new ServletException(errmsg, ex);
}
- _datasource = _minervaPool.createDataSource();
+ if (__logSql.isDebugEnabled())
+ _datasource = new LoggingDataSourceWrapper(_minervaPool.createDataSource(), __logSql);
+ else _datasource = _minervaPool.createDataSource();
}
/**
@@ -497,6 +505,11 @@
_scheduler));
_server.setBindingContext(new BindingContextImpl(this, _store));
_server.setScheduler(_scheduler);
+ if (_odeConfig.isDehydrationEnabled()){
+ CountLRUDehydrationPolicy dehy = new CountLRUDehydrationPolicy();
+ // dehy.setProcessMaxAge(10000);
+ _server.setDehydrationPolicy(dehy);
+ }
_server.init();
}
Modified: incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java?view=diff&rev=504332&r1=504331&r2=504332
==============================================================================
--- incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java (original)
+++ incubator/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEService.java Tue Feb 6 14:17:56 2007
@@ -89,6 +89,7 @@
ResponseCallback callback = null;
try {
_txManager.begin();
+ if (__log.isDebugEnabled()) __log.debug("Starting transaction.");
// Creating mesage exchange
String messageId = new GUID().toString();
@@ -127,6 +128,7 @@
if (success) {
__log.debug("Commiting ODE MEX " + odeMex);
try {
+ if (__log.isDebugEnabled()) __log.debug("Commiting transaction.");
_txManager.commit();
} catch (Exception e) {
__log.error("COMMIT FAILED!", e);
@@ -168,6 +170,7 @@
} else {
boolean commit = false;
try {
+ if (__log.isDebugEnabled()) __log.debug("Starting transaction.");
_txManager.begin();
} catch (Exception ex) {
throw new AxisFault("Error starting transaction!", ex);
@@ -183,6 +186,7 @@
} finally {
if (commit)
try {
+ if (__log.isDebugEnabled()) __log.debug("Comitting transaction.");
_txManager.commit();
} catch (Exception e) {
throw new AxisFault("Commit failed!", e);
Modified: incubator/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BindingContext.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BindingContext.java?view=diff&rev=504332&r1=504331&r2=504332
==============================================================================
--- incubator/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BindingContext.java (original)
+++ incubator/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BindingContext.java Tue Feb 6 14:17:56 2007
@@ -57,16 +57,15 @@
* that would make this happen.
* @param processId
* @param myRoleEndpoint endpoint identifer (service qname + port)
- * @param portType port type of the endpoint
* @returns an endpoint reference in XML format.
*/
- EndpointReference activateMyRoleEndpoint(QName processId, Endpoint myRoleEndpoint, PortType portType);
+ EndpointReference activateMyRoleEndpoint(QName processId, Endpoint myRoleEndpoint);
/**
* Deactivate a "myRole" endpoint. This is a notification to the integration layer
* that the BPEL engine is no longer interested in receiving requests for the
* given endpoint and that the IL should tear down any communication mechanisms
- * created in {@link #activateMyRoleEndpoint(QName, Endpoint, PortType)}.
+ * created in {@link #activateMyRoleEndpoint(QName, Endpoint)}.
* @param myRoleEndpoint
*/
void deactivateMyRoleEndpoint(Endpoint myRoleEndpoint);
Modified: incubator/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelEngine.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelEngine.java?view=diff&rev=504332&r1=504331&r2=504332
==============================================================================
--- incubator/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelEngine.java (original)
+++ incubator/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/BpelEngine.java Tue Feb 6 14:17:56 2007
@@ -19,8 +19,6 @@
package org.apache.ode.bpel.iapi;
-import java.util.Map;
-
import javax.xml.namespace.QName;
/**
@@ -57,7 +55,5 @@
* @return associated message exchange
*/
MessageExchange getMessageExchange(String mexId);
-
- MessageExchange getMessageExchangeByClientKey(String clientKey);
}
Modified: incubator/ode/trunk/bpel-compiler/src/main/java/org/apache/ode/bpel/compiler/BpelCompiler.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-compiler/src/main/java/org/apache/ode/bpel/compiler/BpelCompiler.java?view=diff&rev=504332&r1=504331&r2=504332
==============================================================================
--- incubator/ode/trunk/bpel-compiler/src/main/java/org/apache/ode/bpel/compiler/BpelCompiler.java (original)
+++ incubator/ode/trunk/bpel-compiler/src/main/java/org/apache/ode/bpel/compiler/BpelCompiler.java Tue Feb 6 14:17:56 2007
@@ -18,90 +18,15 @@
*/
package org.apache.ode.bpel.compiler;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Stack;
-
-import javax.wsdl.Definition;
-import javax.wsdl.Message;
-import javax.wsdl.Operation;
-import javax.wsdl.Part;
-import javax.wsdl.PortType;
-import javax.wsdl.WSDLException;
-import javax.wsdl.xml.WSDLReader;
-import javax.xml.namespace.QName;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.ode.bpel.compiler.api.CompilationException;
-import org.apache.ode.bpel.compiler.api.CompilationMessage;
-import org.apache.ode.bpel.compiler.api.CompileListener;
-import org.apache.ode.bpel.compiler.api.CompilerContext;
-import org.apache.ode.bpel.compiler.api.ExpressionCompiler;
-import org.apache.ode.bpel.compiler.bom.Activity;
-import org.apache.ode.bpel.compiler.bom.Bpel11QNames;
-import org.apache.ode.bpel.compiler.bom.Bpel20QNames;
-import org.apache.ode.bpel.compiler.bom.BpelObject;
-import org.apache.ode.bpel.compiler.bom.Catch;
-import org.apache.ode.bpel.compiler.bom.CompensationHandler;
-import org.apache.ode.bpel.compiler.bom.Correlation;
-import org.apache.ode.bpel.compiler.bom.CorrelationSet;
-import org.apache.ode.bpel.compiler.bom.Expression;
-import org.apache.ode.bpel.compiler.bom.FaultHandler;
+import org.apache.ode.bpel.compiler.api.*;
+import org.apache.ode.bpel.compiler.bom.*;
import org.apache.ode.bpel.compiler.bom.Import;
-import org.apache.ode.bpel.compiler.bom.LinkSource;
-import org.apache.ode.bpel.compiler.bom.LinkTarget;
-import org.apache.ode.bpel.compiler.bom.OnAlarm;
-import org.apache.ode.bpel.compiler.bom.OnEvent;
-import org.apache.ode.bpel.compiler.bom.PartnerLink;
-import org.apache.ode.bpel.compiler.bom.PartnerLinkType;
import org.apache.ode.bpel.compiler.bom.Process;
-import org.apache.ode.bpel.compiler.bom.Property;
-import org.apache.ode.bpel.compiler.bom.PropertyAlias;
-import org.apache.ode.bpel.compiler.bom.Scope;
-import org.apache.ode.bpel.compiler.bom.ScopeActivity;
-import org.apache.ode.bpel.compiler.bom.ScopeLikeActivity;
-import org.apache.ode.bpel.compiler.bom.TerminationHandler;
-import org.apache.ode.bpel.compiler.bom.Variable;
import org.apache.ode.bpel.compiler.wsdl.Definition4BPEL;
import org.apache.ode.bpel.compiler.wsdl.WSDLFactory4BPEL;
-import org.apache.ode.bpel.o.DebugInfo;
-import org.apache.ode.bpel.o.OActivity;
-import org.apache.ode.bpel.o.OAssign;
-import org.apache.ode.bpel.o.OCatch;
-import org.apache.ode.bpel.o.OCompensate;
-import org.apache.ode.bpel.o.OCompensationHandler;
-import org.apache.ode.bpel.o.OConstantExpression;
-import org.apache.ode.bpel.o.OConstantVarType;
-import org.apache.ode.bpel.o.OConstants;
-import org.apache.ode.bpel.o.OElementVarType;
-import org.apache.ode.bpel.o.OEventHandler;
-import org.apache.ode.bpel.o.OExpression;
-import org.apache.ode.bpel.o.OExpressionLanguage;
-import org.apache.ode.bpel.o.OFaultHandler;
-import org.apache.ode.bpel.o.OFlow;
-import org.apache.ode.bpel.o.OLValueExpression;
-import org.apache.ode.bpel.o.OLink;
-import org.apache.ode.bpel.o.OMessageVarType;
-import org.apache.ode.bpel.o.OPartnerLink;
-import org.apache.ode.bpel.o.OProcess;
-import org.apache.ode.bpel.o.ORethrow;
-import org.apache.ode.bpel.o.OScope;
-import org.apache.ode.bpel.o.OSequence;
-import org.apache.ode.bpel.o.OTerminationHandler;
-import org.apache.ode.bpel.o.OVarType;
-import org.apache.ode.bpel.o.OXsdTypeVarType;
-import org.apache.ode.bpel.o.OXslSheet;
+import org.apache.ode.bpel.o.*;
import org.apache.ode.utils.GUID;
import org.apache.ode.utils.NSContext;
import org.apache.ode.utils.StreamUtils;
@@ -112,6 +37,16 @@
import org.apache.ode.utils.stl.UnaryFunction;
import org.w3c.dom.Node;
+import javax.wsdl.*;
+import javax.wsdl.xml.WSDLReader;
+import javax.xml.namespace.QName;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.*;
+
/**
* Compiler for converting BPEL process descriptions (and their associated WSDL
* and XSD documents) into compiled representations suitable for execution by
@@ -516,6 +451,10 @@
_oprocess.expressionLanguages.add(expLanguage);
}
oexpr.expressionLanguage = expLanguage;
+
+ // Cleaning up expression compiler for furter compilation
+ ec.setCompilerContext(null);
+
return oexpr;
} catch (CompilationException ce) {
if (ce.getCompilationMessage().source == null)
Modified: incubator/ode/trunk/bpel-obj/src/main/java/org/apache/ode/bpel/o/OProcess.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-obj/src/main/java/org/apache/ode/bpel/o/OProcess.java?view=diff&rev=504332&r1=504331&r2=504332
==============================================================================
--- incubator/ode/trunk/bpel-obj/src/main/java/org/apache/ode/bpel/o/OProcess.java (original)
+++ incubator/ode/trunk/bpel-obj/src/main/java/org/apache/ode/bpel/o/OProcess.java Tue Feb 6 14:17:56 2007
@@ -21,21 +21,23 @@
import org.apache.ode.utils.stl.CollectionsX;
import org.apache.ode.utils.stl.MemberOfFunction;
+import javax.wsdl.Operation;
import javax.xml.namespace.QName;
+import java.io.IOException;
+import java.io.ObjectInputStream;
import java.net.URI;
import java.util.*;
-import javax.wsdl.Operation;
-
/**
* Compiled BPEL process representation.
*/
public class OProcess extends OBase {
-
+
+ public static int instanceCount = 0;
static final long serialVersionUID = -1L ;
public String guid;
-
+
/** BPEL version. */
public final String version;
@@ -66,7 +68,7 @@
int _childIdCounter = 0;
List<OBase> _children = new ArrayList<OBase>();
-
+
public final HashSet<OExpressionLanguage> expressionLanguages = new HashSet<OExpressionLanguage>();
public final HashMap<QName, OMessageVarType> messageTypes = new HashMap<QName, OMessageVarType>();
@@ -80,6 +82,8 @@
public OProcess(String bpelVersion) {
super(null);
this.version = bpelVersion;
+ System.out.println("### Creating OProcess " + super.hashCode());
+ instanceCount++;
}
public OBase getChild(final int id) {
@@ -130,12 +134,12 @@
}
}
}
-
+
return correlators;
}
-
+
public static class OProperty extends OBase {
-
+
static final long serialVersionUID = -1L ;
public final List<OPropertyAlias> aliases = new ArrayList<OPropertyAlias>();
public QName name;
@@ -155,7 +159,7 @@
}
public static class OPropertyAlias extends OBase {
-
+
static final long serialVersionUID = -1L ;
public OVarType varType;
@@ -182,11 +186,21 @@
buf.append(']');
return buf.toString();
}
-
+
}
public QName getQName() {
return new QName(targetNamespace, processName);
}
+
+ protected void finalize() throws Throwable {
+ instanceCount--;
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ // our "pseudo-constructor"
+ in.defaultReadObject();
+ instanceCount++;
+ }
}
Modified: incubator/ode/trunk/bpel-obj/src/main/java/org/apache/ode/bpel/o/Serializer.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-obj/src/main/java/org/apache/ode/bpel/o/Serializer.java?view=diff&rev=504332&r1=504331&r2=504332
==============================================================================
--- incubator/ode/trunk/bpel-obj/src/main/java/org/apache/ode/bpel/o/Serializer.java (original)
+++ incubator/ode/trunk/bpel-obj/src/main/java/org/apache/ode/bpel/o/Serializer.java Tue Feb 6 14:17:56 2007
@@ -18,11 +18,10 @@
*/
package org.apache.ode.bpel.o;
+import javax.xml.namespace.QName;
import java.io.*;
import java.util.Arrays;
-import javax.xml.namespace.QName;
-
/**
* Header written at the beginning of every compiled BPEL object file.
*/
@@ -54,7 +53,7 @@
public String guid;
- public OProcess _oprocess;
+// public OProcess _oprocess;
public QName type;
@@ -74,6 +73,7 @@
read(inputStream);
}
+
public void read(InputStream is) throws IOException {
DataInputStream oin = new DataInputStream(is);
byte[] magic = new byte[MAGIC_NUMBER.length];
@@ -85,12 +85,13 @@
this.compileTime = oin.readLong();
oin.readInt();
ObjectInputStream ois = new CustomObjectInputStream(_inputStream);
+ OProcess oprocess;
try {
- _oprocess = (OProcess) ois.readObject();
+ oprocess = (OProcess) ois.readObject();
} catch (ClassNotFoundException e) {
throw new IOException("DataStream Error");
}
- this.type = new QName(_oprocess.targetNamespace, _oprocess.processName);
+ this.type = new QName(oprocess.targetNamespace, oprocess.processName);
this.guid = "OLD-FORMAT-NO-GUID";
return;
@@ -125,17 +126,18 @@
}
public OProcess readOProcess() throws IOException, ClassNotFoundException {
- if (_oprocess != null)
- return _oprocess;
+// if (_oprocess != null)
+// return _oprocess;
ObjectInputStream ois = new CustomObjectInputStream(_inputStream);
+ OProcess oprocess;
try {
- _oprocess = (OProcess) ois.readObject();
+ oprocess = (OProcess) ois.readObject();
} catch (ClassNotFoundException e) {
throw new IOException("DataStream Error");
}
- return _oprocess;
+ return oprocess;
}
static class CustomObjectOutputStream extends ObjectOutputStream {
@@ -204,9 +206,6 @@
final String uri;
final String prefix;
- /**
- * @param localPart
- */
OQName(String uri, String local, String prefix){
this.uri = uri;
this.local = local;
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java?view=diff&rev=504332&r1=504331&r2=504332
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java Tue Feb 6 14:17:56 2007
@@ -53,7 +53,7 @@
* transaction.
*
* @author mszefler
- *
+ * @author Matthieu Riou <mriou at apache dot org>
*/
public class BpelEngineImpl implements BpelEngine {
private static final Log __log = LogFactory.getLog(BpelEngineImpl.class);
@@ -144,7 +144,7 @@
throw new BpelEngineException(errmsg);
}
{
- OPartnerLink plink = (OPartnerLink) process._oprocess.getChild(mexdao.getPartnerLinkModelId());
+ OPartnerLink plink = (OPartnerLink) process.getOProcess().getChild(mexdao.getPartnerLinkModelId());
PortType ptype = plink.partnerRolePortType;
Operation op = plink.getPartnerRoleOperation(mexdao.getOperation());
// TODO: recover Partner's EPR
@@ -155,7 +155,7 @@
case MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE:
mex = new MyRoleMessageExchangeImpl(this, mexdao);
if (process != null) {
- OPartnerLink plink = (OPartnerLink) process._oprocess.getChild(mexdao.getPartnerLinkModelId());
+ OPartnerLink plink = (OPartnerLink) process.getOProcess().getChild(mexdao.getPartnerLinkModelId());
PortType ptype = plink.myRolePortType;
Operation op = plink.getMyRoleOperation(mexdao.getOperation());
mex.setPortOp(ptype, op);
@@ -170,7 +170,7 @@
return mex;
}
- boolean unregisterProcess(QName process) {
+ BpelProcess unregisterProcess(QName process) {
BpelProcess p = _activeProcesses.remove(process);
if (p != null) {
if (__log.isDebugEnabled())
@@ -180,7 +180,7 @@
while (_serviceMap.values().remove(p))
;
}
- return p != null;
+ return p;
}
boolean isProcessRegistered(QName pid) {
@@ -189,9 +189,7 @@
/**
* Register a process with the engine.
- *
- * @param process
- * the process to register
+ * @param process the process to register
*/
void registerProcess(BpelProcess process) {
_activeProcesses.put(process.getPID(), process);
@@ -233,7 +231,7 @@
if (process == null)
return null;
- return process._oprocess;
+ return process.getOProcess();
}
public void onScheduledJob(Scheduler.JobInfo jobInfo) throws Scheduler.JobProcessorException {
@@ -391,11 +389,6 @@
for (org.apache.ode.bpel.iapi.BpelEventListener l : _contexts.eventListeners) {
l.onEvent(event);
}
- }
-
- public MessageExchange getMessageExchangeByClientKey(String clientKey) {
- // TODO: implement me.
- throw new UnsupportedOperationException("Todo: implementme");
}
/**
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelManagementFacadeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelManagementFacadeImpl.java?view=diff&rev=504332&r1=504331&r2=504332
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelManagementFacadeImpl.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelManagementFacadeImpl.java Tue Feb 6 14:17:56 2007
@@ -190,7 +190,7 @@
if (process == null)
throw new InvalidRequestException("The process \"" + procid + "\" is not available. Please make sure it is deployed and encompassing System is activated." );
- return process._oprocess;
+ return process.getOProcess();
}
public void step(final Long iid) throws ManagementException {
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=504332&r1=504331&r2=504332
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Tue Feb 6 14:17:56 2007
@@ -20,30 +20,22 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.ode.bpel.common.CorrelationKey;
import org.apache.ode.bpel.common.FaultException;
-import org.apache.ode.bpel.common.InvalidMessageException;
-import org.apache.ode.bpel.dao.CorrelatorDAO;
-import org.apache.ode.bpel.dao.MessageRouteDAO;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
-import org.apache.ode.bpel.evt.*;
+import org.apache.ode.bpel.evt.ProcessInstanceEvent;
+import org.apache.ode.bpel.evt.ScopeEvent;
+import org.apache.ode.bpel.explang.ConfigurationException;
import org.apache.ode.bpel.explang.EvaluationException;
import org.apache.ode.bpel.iapi.*;
-import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
-import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
-import org.apache.ode.bpel.iapi.MessageExchange.Status;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
import org.apache.ode.bpel.intercept.InterceptorInvoker;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
import org.apache.ode.bpel.o.*;
import org.apache.ode.bpel.runtime.ExpressionLanguageRuntimeRegistry;
-import org.apache.ode.bpel.runtime.InvalidProcessException;
import org.apache.ode.bpel.runtime.PROCESS;
import org.apache.ode.bpel.runtime.PropertyAliasEvaluationContext;
import org.apache.ode.bpel.runtime.channels.FaultData;
import org.apache.ode.jacob.soup.ReplacementMap;
-import org.apache.ode.utils.ArrayUtils;
import org.apache.ode.utils.ObjectPrinter;
import org.apache.ode.utils.msg.MessageBundle;
import org.w3c.dom.Element;
@@ -51,118 +43,65 @@
import org.w3c.dom.NodeList;
import org.w3c.dom.Text;
-import javax.wsdl.Message;
-import javax.wsdl.Operation;
import javax.xml.namespace.QName;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
+import java.io.InputStream;
import java.util.*;
/**
* Entry point into the runtime of a BPEL process.
*
* @author mszefler
- * @author mriou <mriou at apache dot org>
+ * @author Matthieu Riou <mriou at apache dot org>
*/
public class BpelProcess {
static final Log __log = LogFactory.getLog(BpelProcess.class);
private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
- private final Map<OPartnerLink, PartnerLinkPartnerRoleImpl> _partnerRoles = new HashMap<OPartnerLink, PartnerLinkPartnerRoleImpl>();
-
- private Map<OPartnerLink, PartnerLinkMyRoleImpl> _myRoles = new HashMap<OPartnerLink, PartnerLinkMyRoleImpl>();
-
- BpelEngineImpl _engine;
-
- DebuggerSupport _debugger;
-
- volatile OProcess _oprocess;
-
- final ExpressionLanguageRuntimeRegistry _expLangRuntimeRegistry;
+ private volatile Map<OPartnerLink, PartnerLinkPartnerRoleImpl> _partnerRoles;
+ private volatile Map<OPartnerLink, PartnerLinkMyRoleImpl> _myRoles;
+ /** Mapping from {"Service Name" (QNAME) / port} to a myrole. */
+ private volatile Map<Endpoint, PartnerLinkMyRoleImpl> _endpointToMyRoleMap;
- final ReplacementMap _replacementMap;
+ // Backup hashmaps to keep initial endpoints handy after dehydration
+ private Map<Endpoint,EndpointReference> _myEprs =
+ new HashMap<Endpoint, EndpointReference>();
+ private Map<Endpoint,EndpointReference> _partnerEprs =
+ new HashMap<Endpoint, EndpointReference>();
+ private Map<Endpoint,PartnerRoleChannel> _partnerChannels =
+ new HashMap<Endpoint, PartnerRoleChannel>();
final QName _pid;
+ private volatile OProcess _oprocess;
+ // Has the process already been hydrated before?
+ private boolean _hydratedOnce = false;
+ /** Last time the process was used. */
+ private volatile long _lastUsed;
- /** Mapping from {"Service Name" (QNAME) / port} to a myrole. */
- private Map<Endpoint, PartnerLinkMyRoleImpl> _endpointToMyRoleMap = new HashMap<Endpoint, PartnerLinkMyRoleImpl>();
-
+ BpelEngineImpl _engine;
+ DebuggerSupport _debugger;
+ ExpressionLanguageRuntimeRegistry _expLangRuntimeRegistry;
+ ReplacementMap _replacementMap;
+ final ProcessConf _pconf;
+ // Notifying the server when a process hydrates
+ private ProcessLifecycleCallback _lifeCallback;
/** {@link MessageExchangeInterceptor}s registered for this process. */
private final List<MessageExchangeInterceptor> _mexInterceptors = new ArrayList<MessageExchangeInterceptor>();
- private final ProcessConf _pconf;
-
- /** Last time the process was used. */
- volatile long _lastUsed;
-
- public BpelProcess(ProcessConf conf, OProcess oprocess, BpelEventListener debugger,
- ExpressionLanguageRuntimeRegistry expLangRuntimeRegistry) {
+ public BpelProcess(ProcessConf conf, BpelEventListener debugger, ProcessLifecycleCallback lifeCallback) {
_pid = conf.getProcessId();
_pconf = conf;
-
- _replacementMap = new ReplacementMapImpl(oprocess);
- _oprocess = oprocess;
- _expLangRuntimeRegistry = expLangRuntimeRegistry;
-
- // Create myRole endpoint name mapping (from deployment descriptor)
- HashMap<OPartnerLink, Endpoint> myRoleEndpoints = new HashMap<OPartnerLink, Endpoint>();
- for (Map.Entry<String, Endpoint> provide : conf.getProvideEndpoints().entrySet()) {
- OPartnerLink plink = oprocess.getPartnerLink(provide.getKey());
- if (plink == null) {
- String errmsg = "Error in deployment descriptor for process " + _pid
- + "; reference to unknown partner link " + provide.getKey();
- __log.error(errmsg);
- throw new BpelEngineException(errmsg);
- }
- myRoleEndpoints.put(plink, provide.getValue());
- }
-
- // Create partnerRole initial value mapping
- HashMap<OPartnerLink, Endpoint> partnerRoleIntialValues = new HashMap<OPartnerLink, Endpoint>();
- for (Map.Entry<String, Endpoint> invoke : conf.getInvokeEndpoints().entrySet()) {
- OPartnerLink plink = oprocess.getPartnerLink(invoke.getKey());
- if (plink == null) {
- String errmsg = "Error in deployment descriptor for process " + _pid
- + "; reference to unknown partner link " + invoke.getKey();
- __log.error(errmsg);
- throw new BpelEngineException(errmsg);
- }
- __log.debug("Processing <invoke> element for process " + _pid + ": partnerlink " + invoke.getKey()
- + " --> " + invoke.getValue());
-
- partnerRoleIntialValues.put(plink, invoke.getValue());
- }
-
- for (OPartnerLink pl : _oprocess.getAllPartnerLinks()) {
- if (pl.hasMyRole()) {
- Endpoint endpoint = myRoleEndpoints.get(pl);
- if (endpoint == null)
- throw new IllegalArgumentException("No service name for myRole plink " + pl.getName());
- PartnerLinkMyRoleImpl myRole = new PartnerLinkMyRoleImpl(pl, endpoint);
- _myRoles.put(pl, myRole);
- _endpointToMyRoleMap.put(endpoint, myRole);
- }
-
- if (pl.hasPartnerRole()) {
- PartnerLinkPartnerRoleImpl partnerRole = new PartnerLinkPartnerRoleImpl(pl, conf.getInvokeEndpoints()
- .get(pl.getName()));
- _partnerRoles.put(pl, partnerRole);
- }
- }
+ _lifeCallback = lifeCallback;
}
public String toString() {
return "BpelProcess[" + _pid + "]";
}
- public void recoverActivity(ProcessInstanceDAO instanceDAO, String channel, long activityId, String action,
- FaultData fault) {
+ public void recoverActivity(ProcessInstanceDAO instanceDAO, String channel, long activityId,
+ String action, FaultData fault) {
if (__log.isDebugEnabled())
__log.debug("Recovering activity in process " + instanceDAO.getInstanceId() + " with action " + action);
-
markused();
BpelRuntimeContextImpl processInstance = createRuntimeContext(instanceDAO, null, null);
processInstance.recoverActivity(channel, activityId, action, fault);
@@ -177,13 +116,11 @@
/**
* Entry point for message exchanges aimed at the my role.
- *
* @param mex
*/
void invokeProcess(MyRoleMessageExchangeImpl mex) {
- PartnerLinkMyRoleImpl target = getMyRoleForService(mex.getServiceName());
-
markused();
+ PartnerLinkMyRoleImpl target = getMyRoleForService(mex.getServiceName());
if (target == null) {
String errmsg = __msgs.msgMyRoleRoutingFailure(mex.getMessageExchangeId());
__log.error(errmsg);
@@ -198,11 +135,13 @@
return;
}
+ markused();
target.invokeMyRole(mex);
+ markused();
}
private PartnerLinkMyRoleImpl getMyRoleForService(QName serviceName) {
- for (Map.Entry<Endpoint, PartnerLinkMyRoleImpl> e : _endpointToMyRoleMap.entrySet()) {
+ for (Map.Entry<Endpoint, PartnerLinkMyRoleImpl> e : getEndpointToMyRoleMap().entrySet()) {
if (e.getKey().serviceName.equals(serviceName))
return e.getValue();
}
@@ -212,9 +151,9 @@
void initMyRoleMex(MyRoleMessageExchangeImpl mex) {
markused();
PartnerLinkMyRoleImpl target = null;
- for (Endpoint endpoint : _endpointToMyRoleMap.keySet()) {
+ for (Endpoint endpoint : getEndpointToMyRoleMap().keySet()) {
if (endpoint.serviceName.equals(mex.getServiceName()))
- target = _endpointToMyRoleMap.get(endpoint);
+ target = getEndpointToMyRoleMap().get(endpoint);
}
if (target != null) {
mex.setPortOp(target._plinkDef.myRolePortType, target._plinkDef.getMyRoleOperation(mex.getOperationName()));
@@ -226,13 +165,9 @@
/**
* Extract the value of a BPEL property from a BPEL messsage variable.
- *
- * @param msgData
- * message variable data
- * @param alias
- * alias to apply
- * @param target
- * description of the data (for error logging only)
+ * @param msgData message variable data
+ * @param alias alias to apply
+ * @param target description of the data (for error logging only)
* @return value of the property
* @throws FaultException
*/
@@ -245,7 +180,7 @@
try {
lValue = _expLangRuntimeRegistry.evaluateNode(alias.location, ectx);
} catch (EvaluationException ec) {
- throw new FaultException(_oprocess.constants.qnSelectionFailure, alias.getDescription());
+ throw new FaultException(getOProcess().constants.qnSelectionFailure, alias.getDescription());
}
if (lValue == null) {
@@ -253,7 +188,7 @@
if (__log.isErrorEnabled()) {
__log.error(errmsg);
}
- throw new FaultException(_oprocess.constants.qnSelectionFailure, errmsg);
+ throw new FaultException(getOProcess().constants.qnSelectionFailure, errmsg);
}
if (lValue.getNodeType() == Node.ELEMENT_NODE) {
@@ -279,20 +214,12 @@
* <em>element</em> part, the name of that element is returned. If the
* part is an XML schema typed part, then the name of the part is returned
* in the null namespace.
- *
- * @param part
- * WSDL {@link javax.wsdl.Part}
+ * @param part WSDL {@link javax.wsdl.Part}
* @return name of element containing said part
*/
static QName getElementNameForPart(OMessageVarType.Part part) {
- return (part.type instanceof OElementVarType) ? ((OElementVarType) part.type).elementType : new QName(null,
- part.name);
- }
-
- /** Create a version-appropriate runtime context. */
- private BpelRuntimeContextImpl createRuntimeContext(ProcessInstanceDAO dao, PROCESS template,
- MyRoleMessageExchangeImpl instantiatingMessageExchange) {
- return new BpelRuntimeContextImpl(this, dao, template, instantiatingMessageExchange);
+ return (part.type instanceof OElementVarType) ? ((OElementVarType) part.type).elementType
+ : new QName(null, part.name);
}
/**
@@ -303,359 +230,20 @@
* @return <code>true</code> if execution should continue,
* <code>false</code> otherwise
*/
- private boolean processInterceptors(MyRoleMessageExchangeImpl mex, InterceptorInvoker invoker) {
- InterceptorContextImpl ictx = new InterceptorContextImpl(_engine._contexts.dao.getConnection(),
+ boolean processInterceptors(MyRoleMessageExchangeImpl mex, InterceptorInvoker invoker) {
+ InterceptorContextImpl ictx = new InterceptorContextImpl(getEngine()._contexts.dao.getConnection(),
getProcessDAO(), _pconf);
for (MessageExchangeInterceptor i : _mexInterceptors)
if (!mex.processInterceptor(i, mex, ictx, invoker))
return false;
- for (MessageExchangeInterceptor i : _engine.getGlobalInterceptors())
+ for (MessageExchangeInterceptor i : getEngine().getGlobalInterceptors())
if (!mex.processInterceptor(i, mex, ictx, invoker))
return false;
return true;
-
- }
-
- /**
- * Replacement object for serializtation of the {@link OBase} (compiled
- * BPEL) objects in the JACOB VPU.
- */
- public static final class OBaseReplacementImpl implements Externalizable {
- private static final long serialVersionUID = 1L;
-
- int _id;
-
- public OBaseReplacementImpl() {
- }
-
- public OBaseReplacementImpl(int id) {
- _id = id;
- }
-
- public void readExternal(ObjectInput in) throws IOException {
- _id = in.readInt();
- }
-
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeInt(_id);
- }
-
- }
-
- private abstract class PartnerLinkRoleImpl {
- protected OPartnerLink _plinkDef;
-
- protected EndpointReference _initialEPR;
-
- PartnerLinkRoleImpl(OPartnerLink plink) {
- _plinkDef = plink;
- }
-
- String getPartnerLinkName() {
- return _plinkDef.name;
- }
-
- /**
- * Get the initial value of this role's EPR. This value is obtained from
- * the integration layer when the process is enabled on the server.
- *
- * @return initial epr
- */
- EndpointReference getInitialEPR() {
- return _initialEPR;
- }
-
- }
-
- class PartnerLinkMyRoleImpl extends PartnerLinkRoleImpl {
-
- /** The local endpoint for this "myrole". */
- public Endpoint _endpoint;
-
- PartnerLinkMyRoleImpl(OPartnerLink plink, Endpoint endpoint) {
- super(plink);
- _endpoint = endpoint;
- }
-
- public String toString() {
- StringBuffer buf = new StringBuffer("{PartnerLinkRole-");
- buf.append(_plinkDef.name);
- buf.append('.');
- buf.append(_plinkDef.myRoleName);
- buf.append(" on ");
- buf.append(_endpoint);
- buf.append('}');
-
- return buf.toString();
- }
-
- /**
- * Called when an input message has been received.
- *
- * @param mex
- * exchange to which the message is related
- */
- public void invokeMyRole(MyRoleMessageExchangeImpl mex) {
- if (__log.isTraceEnabled()) {
- __log.trace(ObjectPrinter.stringifyMethodEnter(this + ":inputMsgRcvd", new Object[] {
- "messageExchange", mex }));
- }
-
- Operation operation = getMyRoleOperation(mex.getOperationName());
- if (operation == null) {
- __log.error(__msgs.msgUnknownOperation(mex.getOperationName(), _plinkDef.myRolePortType.getQName()));
- mex.setFailure(FailureType.UNKNOWN_OPERATION, mex.getOperationName(), null);
- return;
- }
-
- mex.getDAO().setPartnerLinkModelId(_plinkDef.getId());
- mex.setPortOp(_plinkDef.myRolePortType, operation);
- mex.setPattern(operation.getOutput() == null ? MessageExchangePattern.REQUEST_ONLY
- : MessageExchangePattern.REQUEST_RESPONSE);
-
- // Is this a /possible/ createInstance Operation?
- boolean isCreateInstnace = _plinkDef.isCreateInstanceOperation(operation);
-
- // now, the tricks begin: when a message arrives we have to see if
- // there
- // is anyone waiting for it.
- // Get the correlator, a persisted communnication-reduction data
- // structure
- // supporting correlation correlationKey matching!
- String correlatorId = genCorrelatorId(_plinkDef, operation.getName());
-
- CorrelatorDAO correlator = getProcessDAO().getCorrelator(correlatorId);
-
- CorrelationKey[] keys;
- MessageRouteDAO messageRoute = null;
-
- // We need to compute the correlation keys (based on the operation
- // we can infer which correlation keys to compute - this is merely a set
- // consisting of each correlationKey used in each correlation sets
- // that is ever referenced in an <receive>/<onMessage> on this
- // partnerlink/operation.
- try {
- keys = computeCorrelationKeys(mex);
- } catch (InvalidMessageException ime) {
- // We'd like to do a graceful exit here, no sense in rolling back due to a
- // a message format problem.
- __log.debug("Unable to evaluate correlation keys, invalid message format. ",ime);
- mex.setFailure(FailureType.FORMAT_ERROR, ime.getMessage(), null);
- return;
- }
-
- String mySessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID);
- String partnerSessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID);
- if (__log.isDebugEnabled()) {
- __log.debug("INPUTMSG: " + correlatorId + ": MSG RCVD keys="
- + ArrayUtils.makeCollection(HashSet.class, keys) + " mySessionId=" + mySessionId
- + " partnerSessionId=" + partnerSessionId);
- }
-
- CorrelationKey matchedKey = null;
-
- // Try to find a route for one of our keys.
- for (CorrelationKey key : keys) {
- messageRoute = correlator.findRoute(key);
- if (messageRoute != null) {
- if (__log.isDebugEnabled()) {
- __log.debug("INPUTMSG: " + correlatorId + ": ckey " + key + " route is to " + messageRoute);
- }
- matchedKey = key;
- break;
- }
- }
-
- // TODO - ODE-58
-
- // If no luck, and this operation qualifies for create-instance
- // treatment, then create a new process
- // instance.
- if (messageRoute == null && isCreateInstnace) {
- if (__log.isDebugEnabled()) {
- __log.debug("INPUTMSG: " + correlatorId + ": routing failed, CREATING NEW INSTANCE");
- }
- ProcessDAO processDAO = getProcessDAO();
-
- if (_pconf.getState() == ProcessState.RETIRED) {
- throw new InvalidProcessException("Process is retired.", InvalidProcessException.RETIRED_CAUSE_CODE);
- }
-
- if (!processInterceptors(mex, InterceptorInvoker.__onNewInstanceInvoked)) {
- __log.debug("Not creating a new instance for mex " + mex + "; interceptor prevented!");
- return;
- }
-
- ProcessInstanceDAO newInstance = processDAO.createInstance(correlator);
-
- BpelRuntimeContextImpl instance = createRuntimeContext(newInstance, new PROCESS(_oprocess), mex);
-
- // send process instance event
- NewProcessInstanceEvent evt = new NewProcessInstanceEvent(new QName(_oprocess.targetNamespace,
- _oprocess.getName()), getProcessDAO().getProcessId(), newInstance.getInstanceId());
- evt.setPortType(mex.getPortType().getQName());
- evt.setOperation(operation.getName());
- evt.setMexId(mex.getMessageExchangeId());
- _debugger.onEvent(evt);
- saveEvent(evt, newInstance);
- mex.setCorrelationStatus(CorrelationStatus.CREATE_INSTANCE);
- mex.getDAO().setInstance(newInstance);
-
- instance.execute();
- } else if (messageRoute != null) {
- if (__log.isDebugEnabled()) {
- __log.debug("INPUTMSG: " + correlatorId + ": ROUTING to instance "
- + messageRoute.getTargetInstance().getInstanceId());
- }
-
- ProcessInstanceDAO instanceDao = messageRoute.getTargetInstance();
-
- // Reload process instance for DAO.
- BpelRuntimeContextImpl instance = createRuntimeContext(instanceDao, null, null);
- instance.inputMsgMatch(messageRoute.getGroupId(), messageRoute.getIndex(), mex);
-
- // Kill the route so some new message does not get routed to
- // same process instance.
- correlator.removeRoutes(messageRoute.getGroupId(), instanceDao);
-
- // send process instance event
- CorrelationMatchEvent evt = new CorrelationMatchEvent(new QName(_oprocess.targetNamespace, _oprocess
- .getName()), getProcessDAO().getProcessId(), instanceDao.getInstanceId(), matchedKey);
- evt.setPortType(mex.getPortType().getQName());
- evt.setOperation(operation.getName());
- evt.setMexId(mex.getMessageExchangeId());
-
- _debugger.onEvent(evt);
- // store event
- saveEvent(evt, instanceDao);
-
- // EXPERIMENTAL -- LOCK
- // instanceDao.lock();
-
- mex.setCorrelationStatus(CorrelationStatus.MATCHED);
- mex.getDAO().setInstance(messageRoute.getTargetInstance());
- instance.execute();
- } else {
- if (__log.isDebugEnabled()) {
- __log.debug("INPUTMSG: " + correlatorId + ": SAVING to DB (no match) ");
- }
-
- if (!mex.isAsynchronous()) {
- mex.setFailure(FailureType.NOMATCH, "No process instance matching correlation keys.", null);
-
- } else {
- // send event
- CorrelationNoMatchEvent evt = new CorrelationNoMatchEvent(mex.getPortType().getQName(), mex
- .getOperation().getName(), mex.getMessageExchangeId(), keys);
-
- evt.setProcessId(getProcessDAO().getProcessId());
- evt.setProcessName(new QName(_oprocess.targetNamespace, _oprocess.getName()));
- _debugger.onEvent(evt);
-
- mex.setCorrelationStatus(CorrelationStatus.QUEUED);
-
- // No match, means we add message exchange to the queue.
- correlator.enqueueMessage(mex.getDAO(), keys);
-
- }
- }
-
- // Now we have to update our message exchange status. If the <reply>
- // was not hit during the
- // invocation, then we will be in the "REQUEST" phase which means
- // that either this was a one-way
- // or a two-way that needs to delivery the reply asynchronously.
- if (mex.getStatus() == Status.REQUEST) {
- mex.setStatus(Status.ASYNC);
- }
-
- }
-
- @SuppressWarnings("unchecked")
- private Operation getMyRoleOperation(String operationName) {
- Operation op = _plinkDef.getMyRoleOperation(operationName);
- return op;
- }
-
- private CorrelationKey[] computeCorrelationKeys(MyRoleMessageExchangeImpl mex) {
- Operation operation = mex.getOperation();
- Element msg = mex.getRequest().getMessage();
- Message msgDescription = operation.getInput().getMessage();
- List<CorrelationKey> keys = new ArrayList<CorrelationKey>();
-
- Set<OScope.CorrelationSet> csets = _plinkDef.getCorrelationSetsForOperation(operation);
-
- for (OScope.CorrelationSet cset : csets) {
- CorrelationKey key = computeCorrelationKey(cset, _oprocess.messageTypes.get(msgDescription.getQName()),
- msg);
- keys.add(key);
- }
-
- // Let's creata a key based on the sessionId
- String mySessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID);
- if (mySessionId != null)
- keys.add(new CorrelationKey(-1, new String[] { mySessionId }));
-
- return keys.toArray(new CorrelationKey[keys.size()]);
- }
-
- private CorrelationKey computeCorrelationKey(OScope.CorrelationSet cset, OMessageVarType messagetype,
- Element msg) {
- String[] values = new String[cset.properties.size()];
-
- int jIdx = 0;
- for (Iterator j = cset.properties.iterator(); j.hasNext(); ++jIdx) {
- OProcess.OProperty property = (OProcess.OProperty) j.next();
- OProcess.OPropertyAlias alias = property.getAlias(messagetype);
-
- if (alias == null) {
- // TODO: Throw a real exception! And catch this at compile
- // time.
- throw new IllegalArgumentException("No alias matching property '" + property.name
- + "' with message type '" + messagetype + "'");
- }
-
- String value;
- try {
- value = extractProperty(msg, alias, msg.toString());
- } catch (FaultException fe) {
- String emsg = __msgs.msgPropertyAliasDerefFailedOnMessage(alias.getDescription(), fe.getMessage());
- __log.error(emsg, fe);
- throw new InvalidMessageException(emsg, fe);
- }
- values[jIdx] = value;
- }
-
- CorrelationKey key = new CorrelationKey(cset.getId(), values);
- return key;
- }
-
}
- private class PartnerLinkPartnerRoleImpl extends PartnerLinkRoleImpl {
- Endpoint _initialPartner;
-
- public PartnerRoleChannel _channel;
-
- private PartnerLinkPartnerRoleImpl(OPartnerLink plink, Endpoint initialPartner) {
- super(plink);
- _initialPartner = initialPartner;
- }
-
- public void processPartnerResponse(PartnerRoleMessageExchangeImpl messageExchange) {
- if (__log.isDebugEnabled()) {
- __log.debug("Processing partner's response for partnerLink: " + messageExchange);
- }
-
- BpelRuntimeContextImpl processInstance = createRuntimeContext(messageExchange.getDAO().getInstance(), null,
- null);
- processInstance.invocationResponse(messageExchange);
- processInstance.execute();
- }
-
- }
/**
* @see org.apache.ode.bpel.engine.BpelProcess#handleWorkEvent(java.util.Map<java.lang.String,java.lang.Object>)
@@ -711,9 +299,58 @@
}
}
+ private void setRoles(OProcess oprocess) {
+ _partnerRoles = new HashMap<OPartnerLink, PartnerLinkPartnerRoleImpl>();
+ _myRoles = new HashMap<OPartnerLink, PartnerLinkMyRoleImpl>();
+ _endpointToMyRoleMap = new HashMap<Endpoint, PartnerLinkMyRoleImpl>();
+
+ // Create myRole endpoint name mapping (from deployment descriptor)
+ HashMap<OPartnerLink, Endpoint> myRoleEndpoints = new HashMap<OPartnerLink, Endpoint>();
+ for (Map.Entry<String, Endpoint> provide : _pconf.getProvideEndpoints().entrySet()) {
+ OPartnerLink plink = oprocess.getPartnerLink(provide.getKey());
+ if (plink == null) {
+ String errmsg = "Error in deployment descriptor for process " + _pid
+ + "; reference to unknown partner link " + provide.getKey();
+ __log.error(errmsg);
+ throw new BpelEngineException(errmsg);
+ }
+ myRoleEndpoints.put(plink, provide.getValue());
+ }
+
+ // Create partnerRole initial value mapping
+ for (Map.Entry<String, Endpoint> invoke : _pconf.getInvokeEndpoints().entrySet()) {
+ OPartnerLink plink = oprocess.getPartnerLink(invoke.getKey());
+ if (plink == null) {
+ String errmsg = "Error in deployment descriptor for process " + _pid
+ + "; reference to unknown partner link " + invoke.getKey();
+ __log.error(errmsg);
+ throw new BpelEngineException(errmsg);
+ }
+ __log.debug("Processing <invoke> element for process " + _pid + ": partnerlink " + invoke.getKey()
+ + " --> " + invoke.getValue());
+ }
+
+ for (OPartnerLink pl : oprocess.getAllPartnerLinks()) {
+ if (pl.hasMyRole()) {
+ Endpoint endpoint = myRoleEndpoints.get(pl);
+ if (endpoint == null)
+ throw new IllegalArgumentException("No service name for myRole plink " + pl.getName());
+ PartnerLinkMyRoleImpl myRole = new PartnerLinkMyRoleImpl(this, pl, endpoint);
+ _myRoles.put(pl, myRole);
+ _endpointToMyRoleMap.put(endpoint, myRole);
+ }
+
+ if (pl.hasPartnerRole()) {
+ PartnerLinkPartnerRoleImpl partnerRole = new PartnerLinkPartnerRoleImpl(
+ this, pl, _pconf.getInvokeEndpoints().get(pl.getName()));
+ _partnerRoles.put(pl, partnerRole);
+ }
+ }
+ }
+
ProcessDAO getProcessDAO() {
- return _pconf.isTransient() ? _engine._contexts.inMemDao.getConnection().getProcess(_pid)
- : _engine._contexts.dao.getConnection().getProcess(_pid);
+ return _pconf.isTransient() ? getEngine()._contexts.inMemDao.getConnection().getProcess(_pid)
+ : getEngine()._contexts.dao.getConnection().getProcess(_pid);
}
static String genCorrelatorId(OPartnerLink plink, String opName) {
@@ -721,12 +358,30 @@
}
/**
+ * De-serialize the compiled process representation from a stream.
+ *
+ * @param is
+ * input stream
+ * @return process information from configuration database
+ */
+ private OProcess deserializeCompiledProcess(InputStream is) throws Exception {
+ OProcess compiledProcess;
+ Serializer ofh = new Serializer(is);
+ compiledProcess = ofh.readOProcess();
+ return compiledProcess;
+ }
+
+ /**
* Get all the services that are implemented by this process.
*
* @return list of qualified names corresponding to the myroles.
*/
public Set<Endpoint> getServiceNames() {
- return _endpointToMyRoleMap.keySet();
+ Set<Endpoint> endpoints = new HashSet<Endpoint>();
+ for (Endpoint provide : _pconf.getProvideEndpoints().values()) {
+ endpoints.add(provide);
+ }
+ return endpoints;
}
void activate(BpelEngineImpl engine) {
@@ -735,28 +390,12 @@
__log.debug("Activating " + _pid);
// Activate all the my-role endpoints.
- for (PartnerLinkMyRoleImpl myrole : _myRoles.values()) {
- myrole._initialEPR = _engine._contexts.bindingContext.activateMyRoleEndpoint(_pid, myrole._endpoint,
- myrole._plinkDef.myRolePortType);
-
- __log.debug("Activated " + _pid + " myrole " + myrole.getPartnerLinkName() + ": EPR is "
- + myrole._initialEPR);
- }
-
- for (PartnerLinkPartnerRoleImpl prole : _partnerRoles.values()) {
- PartnerRoleChannel channel = _engine._contexts.bindingContext.createPartnerRoleChannel(_pid,
- prole._plinkDef.partnerRolePortType, prole._initialPartner);
- prole._channel = channel;
- EndpointReference epr = channel.getInitialEndpointReference();
- if (epr != null) {
- prole._initialEPR = epr;
- }
-
- __log.debug("Activated " + _pid + " partnerrole " + prole.getPartnerLinkName() + ": EPR is "
- + prole._initialEPR);
-
+ for (Map.Entry<String, Endpoint> entry : _pconf.getProvideEndpoints().entrySet()) {
+ EndpointReference initialEPR = getEngine()._contexts
+ .bindingContext.activateMyRoleEndpoint(_pid, entry.getValue());
+ __log.debug("Activated " + _pid + " myrole " + entry.getKey() + ": EPR is " + initialEPR);
+ _myEprs.put(entry.getValue(), initialEPR);
}
-
__log.debug("Activated " + _pid);
markused();
@@ -764,22 +403,21 @@
void deactivate() {
// Deactivate all the my-role endpoints.
- for (Endpoint endpoint : _endpointToMyRoleMap.keySet())
- _engine._contexts.bindingContext.deactivateMyRoleEndpoint(endpoint);
-
- // TODO Deactivate all the partner-role channels
+ for (Endpoint endpoint : getEndpointToMyRoleMap().keySet())
+ getEngine()._contexts.bindingContext.deactivateMyRoleEndpoint(endpoint);
+ // TODO Deactivate all the partner-role channels
}
EndpointReference getInitialPartnerRoleEPR(OPartnerLink link) {
- PartnerLinkPartnerRoleImpl prole = _partnerRoles.get(link);
+ PartnerLinkPartnerRoleImpl prole = getPartnerRoles().get(link);
if (prole == null)
throw new IllegalStateException("Unknown partner link " + link);
return prole.getInitialEPR();
}
EndpointReference getInitialMyRoleEPR(OPartnerLink link) {
- PartnerLinkMyRoleImpl myRole = _myRoles.get(link);
+ PartnerLinkMyRoleImpl myRole = getMyRoles().get(link);
if (myRole == null)
throw new IllegalStateException("Unknown partner link " + link);
return myRole.getInitialEPR();
@@ -790,7 +428,7 @@
}
PartnerRoleChannel getPartnerRoleChannel(OPartnerLink partnerLink) {
- PartnerLinkPartnerRoleImpl prole = _partnerRoles.get(partnerLink);
+ PartnerLinkPartnerRoleImpl prole = getPartnerRoles().get(partnerLink);
if (prole == null)
throw new IllegalStateException("Unknown partner link " + partnerLink);
return prole._channel;
@@ -815,6 +453,102 @@
instanceDao.insertBpelEvent(event);
}
+ void dehydrate() {
+ _oprocess = null;
+ _partnerRoles = null;
+ _myRoles = null;
+ _endpointToMyRoleMap = null;
+ _replacementMap = null;
+ _expLangRuntimeRegistry = null;
+ }
+
+ private void hydrate() {
+ __log.debug("Rehydrating process " + _pconf.getProcessId());
+ try {
+ _oprocess = deserializeCompiledProcess(_pconf.getCBPInputStream());
+ } catch (Exception e) {
+ String errmsg = "Error reloading compiled process " + _pid + "; the file appears to be corrupted.";
+ __log.error(errmsg);
+ throw new BpelEngineException(errmsg, e);
+ }
+
+ _replacementMap = new ReplacementMapImpl(_oprocess);
+
+ // Create an expression language registry for this process
+ ExpressionLanguageRuntimeRegistry elangRegistry = new ExpressionLanguageRuntimeRegistry();
+ for (OExpressionLanguage elang : _oprocess.expressionLanguages) {
+ try {
+ elangRegistry.registerRuntime(elang);
+ } catch (ConfigurationException e) {
+ String msg = __msgs.msgExpLangRegistrationError(elang.expressionLanguageUri, elang.properties);
+ __log.error(msg, e);
+ throw new BpelEngineException(msg, e);
+ }
+ }
+ _expLangRuntimeRegistry = elangRegistry;
+
+ setRoles(getOProcess());
+
+ if (!_hydratedOnce) {
+ for (PartnerLinkPartnerRoleImpl prole : getPartnerRoles().values()) {
+ PartnerRoleChannel channel = getEngine()._contexts.bindingContext.createPartnerRoleChannel(_pid,
+ prole._plinkDef.partnerRolePortType, prole._initialPartner);
+ prole._channel = channel;
+ _partnerChannels.put(prole._initialPartner, prole._channel);
+ EndpointReference epr = channel.getInitialEndpointReference();
+ if (epr != null) {
+ prole._initialEPR = epr;
+ _partnerEprs.put(prole._initialPartner, epr);
+ }
+ __log.debug("Activated " + _pid + " partnerrole " + prole.getPartnerLinkName() + ": EPR is "
+ + prole._initialEPR);
+ }
+ _hydratedOnce = true;
+ }
+
+ for (PartnerLinkMyRoleImpl myrole : getMyRoles().values()) {
+ myrole._initialEPR = _myEprs.get(myrole._endpoint);
+ }
+
+ for (PartnerLinkPartnerRoleImpl prole : getPartnerRoles().values()) {
+ prole._channel = _partnerChannels.get(prole._initialPartner);
+ if (_partnerEprs.get(prole._initialPartner) != null) {
+ prole._initialEPR = _partnerEprs.get(prole._initialPartner);
+ }
+ }
+
+ _lifeCallback.hydrated(this);
+ }
+
+ OProcess getOProcess() {
+ if (_oprocess == null) hydrate();
+ return _oprocess;
+ }
+
+ public Map<OPartnerLink, PartnerLinkMyRoleImpl> getMyRoles() {
+ if (_myRoles == null) hydrate();
+ return _myRoles;
+ }
+
+ public Map<OPartnerLink, PartnerLinkPartnerRoleImpl> getPartnerRoles() {
+ if (_partnerRoles == null) hydrate();
+ return _partnerRoles;
+ }
+
+ private Map<Endpoint, PartnerLinkMyRoleImpl> getEndpointToMyRoleMap() {
+ if (_endpointToMyRoleMap == null) hydrate();
+ return _endpointToMyRoleMap;
+ }
+
+ public ReplacementMap getReplacementMap() {
+ if (_replacementMap == null) hydrate();
+ return _replacementMap;
+ }
+
+ BpelEngineImpl getEngine() {
+ return _engine;
+ }
+
public boolean isInMemory() {
return _pconf.isTransient();
}
@@ -826,5 +560,11 @@
/** Keep track of the time the process was last used. */
private final void markused() {
_lastUsed = System.currentTimeMillis();
+ }
+
+ /** Create a version-appropriate runtime context. */
+ BpelRuntimeContextImpl createRuntimeContext(ProcessInstanceDAO dao, PROCESS template,
+ MyRoleMessageExchangeImpl instantiatingMessageExchange) {
+ return new BpelRuntimeContextImpl(this, dao, template, instantiatingMessageExchange);
}
}
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=504332&r1=504331&r2=504332
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Tue Feb 6 14:17:56 2007
@@ -259,7 +259,7 @@
int conflict = _outstandingRequests.findConflict(selectors);
if (conflict != -1)
- throw new FaultException(_bpelProcess._oprocess.constants.qnConflictingReceive, selectors[conflict]
+ throw new FaultException(_bpelProcess.getOProcess().constants.qnConflictingReceive, selectors[conflict]
.toString());
_outstandingRequests.register(pickResponseChannelStr, selectors);
@@ -353,7 +353,7 @@
XmlDataDAO dataDAO = scopeDAO.getVariable(variable.declaration.name);
if (dataDAO.isNull()) {
- throw new FaultException(_bpelProcess._oprocess.constants.qnUninitializedVariable,
+ throw new FaultException(_bpelProcess.getOProcess().constants.qnUninitializedVariable,
"The variable " + variable.declaration.name + " isn't properly initialized.");
}
@@ -383,7 +383,7 @@
}
if (epr == null) {
- throw new FaultException(_bpelProcess._oprocess.constants.qnUninitializedPartnerRole);
+ throw new FaultException(_bpelProcess.getOProcess().constants.qnUninitializedPartnerRole);
}
return epr;
@@ -474,7 +474,7 @@
String mexRef = _outstandingRequests.release(plinkInstnace, opName, mexId);
if (mexRef == null) {
- throw new FaultException(_bpelProcess._oprocess.constants.qnMissingRequest);
+ throw new FaultException(_bpelProcess.getOProcess().constants.qnMissingRequest);
}
// prepare event
@@ -517,7 +517,7 @@
public void writeCorrelation(CorrelationSetInstance cset, CorrelationKey correlation) {
ScopeDAO scopeDAO = _dao.getScope(cset.scopeInstance);
CorrelationSetDAO cs = scopeDAO.getCorrelationSet(cset.declaration.name);
- OScope.CorrelationSet csetdef = (OScope.CorrelationSet) _bpelProcess._oprocess
+ OScope.CorrelationSet csetdef = (OScope.CorrelationSet) _bpelProcess.getOProcess()
.getChild(correlation.getCSetId());
QName[] propNames = new QName[csetdef.properties.size()];
for (int m = 0; m < csetdef.properties.size(); m++) {