You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/07/24 22:58:14 UTC
svn commit: r559204 [1/3] - in /incubator/ode/branches/bart: ./
axis2/src/main/java/org/apache/ode/axis2/
bpel-api/src/main/java/org/apache/ode/bpel/iapi/
bpel-epr/src/main/java/org/apache/ode/il/
bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ ...
Author: mszefler
Date: Tue Jul 24 13:58:12 2007
New Revision: 559204
URL: http://svn.apache.org/viewvc?view=rev&rev=559204
Log:
BART, scheduling / transaction management refactor.
Modified:
incubator/ode/branches/bart/Rakefile
incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java
incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
incubator/ode/branches/bart/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelDatabase.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcessDatabase.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionFactoryImpl.java
incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java
incubator/ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
incubator/ode/branches/bart/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
incubator/ode/branches/bart/bpel-test/src/main/java/org/apache/ode/test/MessageExchangeContextImpl.java
incubator/ode/branches/bart/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java
incubator/ode/branches/bart/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessageExchange.java
incubator/ode/branches/bart/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeContext.java
incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java
Modified: incubator/ode/branches/bart/Rakefile
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/Rakefile?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/Rakefile (original)
+++ incubator/ode/branches/bart/Rakefile Tue Jul 24 13:58:12 2007
@@ -15,7 +15,7 @@
# limitations under the License.
#
-gem "buildr", "~>1.1"
+gem "buildr", "=1.1.3"
require "buildr"
# require "buildr/lib/buildr"
require "buildr/xmlbeans.rb"
@@ -33,7 +33,7 @@
AXIOM = group("axiom-api", "axiom-impl", "axiom-dom", :under=>"org.apache.ws.commons.axiom", :version=>"1.2.4")
AXIS2 = "org.apache.axis2:axis2:jar:1.1.1"
AXIS2_WAR = "org.apache.axis2:axis2:war:1.1.1"
-AXIS2_ALL = group("axis2", "axis2-adb", "axis2-codegen", "axis2-tools",
+AXIS2_ALL = group("axis2-adb", "axis2-codegen", "axis2-tools",
"axis2-java2wsdl", "axis2-jibx", "axis2-saaj", "axis2-xmlbeans",
:under=>"org.apache.axis2", :version=>"1.1.1")
AXIS2_PATCHED = "org.apache.axis2:axis2-kernel-intalio:jar:1.1.1b"
@@ -71,7 +71,8 @@
:persistence =>"javax.persistence:persistence-api:jar:1.0",
:servlet =>"org.apache.geronimo.specs:geronimo-servlet_2.4_spec:jar:1.0",
:stream =>"stax:stax-api:jar:1.0.1",
- :transaction =>"org.apache.geronimo.specs:geronimo-jta_1.0.1B_spec:jar:1.0"
+ :transaction =>"org.apache.geronimo.specs:geronimo-jta_1.0.1B_spec:jar:1.0",
+ :resource =>"javax.resource:connector:jar:1.0"
)
JAXEN = "jaxen:jaxen:jar:1.1-beta-8"
JBI = "org.apache.servicemix:servicemix-jbi:jar:3.1-incubating"
@@ -80,7 +81,6 @@
LOG4J = "log4j:log4j:jar:1.2.13"
OPENJPA = ["org.apache.openjpa:openjpa-all:jar:#{Buildr::OpenJPA::VERSION}",
"net.sourceforge.serp:serp:jar:1.12.0"]
-QUARTZ = "quartz:quartz:jar:1.5.2"
SAXON = group("saxon", "saxon-xpath", "saxon-dom", :under=>"net.sf.saxon", :version=>"8.7")
SERVICEMIX = group("servicemix-core", "servicemix-shared", "servicemix-services",
:under=>"org.apache.servicemix", :version=>"3.1-incubating")
@@ -100,12 +100,19 @@
XBEAN = group("xbean-classloader", "xbean-kernel", "xbean-server", "xbean-spring",
:under=>"org.apache.xbean", :version=>"2.8")
XMLBEANS = "xmlbeans:xbean:jar:2.2.0"
+JOTM = struct(
+ :jotm =>"jotm:jotm:jar:2.0.10",
+ :carol =>"org.objectweb.carol:carol:jar:2.0.5",
+ :jrmp =>"jotm:jotm_jrmp_stubs:jar:2.0.10",
+ :howl =>"howl:howl-logger:jar:0.1.11"
+)
repositories.remote << "http://pxe.intalio.org/public/maven2"
repositories.remote << "http://people.apache.org/repo/m2-incubating-repository"
repositories.remote << "http://repo1.maven.org/maven2"
repositories.remote << "http://people.apache.org/repo/m2-snapshot-repository"
+repositories.remote << "http://download.java.net/maven/2"
repositories.deploy_to[:url] ||= "sftp://guest@localhost/home/guest"
# Changing releases tag names
@@ -132,7 +139,7 @@
desc "ODE Axis Integration Layer"
define "axis2" do
compile.with projects("bpel-api", "bpel-connector", "bpel-dao", "bpel-epr", "bpel-runtime",
- "bpel-scheduler-quartz", "bpel-schemas", "bpel-store", "utils"),
+ "scheduler-simple", "bpel-schemas", "bpel-store", "utils"),
AXIOM, AXIS2, COMMONS.logging, COMMONS.collections, DERBY, GERONIMO.kernel, GERONIMO.transaction,
JAVAX.activation, JAVAX.servlet, JAVAX.stream, JAVAX.transaction, JENCKS, WSDL4J, WS_COMMONS.xml_schema,
XMLBEANS
@@ -146,13 +153,13 @@
desc "ODE Axis2 Based Web Application"
define "axis2-war" do
libs = projects("axis2", "bpel-api", "bpel-compiler", "bpel-connector", "bpel-dao",
- "bpel-epr", "bpel-obj", "bpel-ql", "bpel-runtime", "bpel-scheduler-quartz",
+ "bpel-epr", "bpel-obj", "bpel-ql", "bpel-runtime", "scheduler-simple",
"bpel-schemas", "bpel-store", "dao-hibernate", "jacob", "jca-ra", "jca-server",
"utils", "dao-jpa"),
AXIS2_ALL, AXIS2_PATCHED, ANNONGEN, BACKPORT, COMMONS.codec, COMMONS.collections, COMMONS.fileupload, COMMONS.httpclient,
COMMONS.lang, COMMONS.logging, COMMONS.pool, DERBY, DERBY_TOOLS, JAXEN, JAVAX.activation, JAVAX.ejb, JAVAX.javamail,
JAVAX.connector, JAVAX.jms, JAVAX.persistence, JAVAX.transaction, JAVAX.stream, JIBX,
- GERONIMO.connector, GERONIMO.kernel, GERONIMO.transaction, LOG4J, OPENJPA, QUARTZ, SAXON, TRANQL,
+ GERONIMO.connector, GERONIMO.kernel, GERONIMO.transaction, LOG4J, OPENJPA, SAXON, TRANQL,
WOODSTOX, WSDL4J, WS_COMMONS.axiom, WS_COMMONS.neethi, WS_COMMONS.xml_schema, XALAN, XERCES, XMLBEANS
package(:war).with(:libs=>libs).path("WEB-INF").tap do |web_inf|
@@ -214,7 +221,7 @@
desc "ODE Interface Layers Common"
define "bpel-epr" do
compile.with projects("utils", "bpel-dao", "bpel-api"),
- COMMONS.logging, DERBY, JAVAX.transaction, GERONIMO.transaction, GERONIMO.connector, TRANQL, JAVAX.connector
+ COMMONS.logging, DERBY, JAVAX.transaction, GERONIMO.transaction, GERONIMO.connector, TRANQL, JAVAX.connector, COMMONS.lang
package :jar
end
@@ -241,18 +248,20 @@
"bpel-store", "jacob", "jacob-ap", "utils"),
COMMONS.logging, COMMONS.collections, JAXEN, JAVAX.persistence, JAVAX.stream, SAXON, WSDL4J, XMLBEANS
- test.compile.with projects("bpel-scheduler-quartz", "dao-jpa", "dao-hibernate", "bpel-epr"),
+ test.compile.with projects("scheduler-simple", "dao-jpa", "dao-hibernate", "bpel-epr"),
BACKPORT, COMMONS.pool, COMMONS.lang, DERBY, JAVAX.connector, JAVAX.transaction,
GERONIMO.transaction, GERONIMO.kernel, GERONIMO.connector, TRANQL, HSQLDB, JAVAX.ejb,
- LOG4J, XERCES, Buildr::OpenJPA::REQUIRES, QUARTZ, XALAN
+ LOG4J, XERCES, Buildr::OpenJPA::REQUIRES, XALAN
test.junit.with HIBERNATE, DOM4J
package :jar
end
- desc "ODE Quartz Interface"
- define "bpel-scheduler-quartz" do
- compile.with projects("bpel-api", "utils"), COMMONS.collections, COMMONS.logging, JAVAX.transaction, QUARTZ
+ desc "ODE Simple Scheduler"
+ define "scheduler-simple" do
+ compile.with projects("bpel-api", "utils"), COMMONS.collections, COMMONS.logging, JAVAX.transaction
+ test.compile.with HSQLDB, JOTM.jotm
+ test.junit.with HSQLDB, JOTM.jotm, JOTM.carol, JOTM.jrmp, JAVAX.transaction, JOTM.howl, JAVAX.resource, JAVAX.connector, LOG4J
package :jar
end
@@ -287,7 +296,7 @@
DERBY, Java::JUNIT_REQUIRES, JAVAX.persistence, OPENJPA, WSDL4J
test.with projects("bpel-obj", "jacob", "bpel-schemas",
- "bpel-scripts", "bpel-scheduler-quartz"),
+ "bpel-scripts", "scheduler-simple"),
COMMONS.collections, COMMONS.lang, COMMONS.logging, DERBY, JAVAX.connector,
JAVAX.stream, JAVAX.transaction, JAXEN, HSQLDB, LOG4J, SAXON, XERCES, XMLBEANS, XALAN
@@ -308,7 +317,7 @@
desc "ODE Hibernate Compatible Databases"
define "dao-hibernate-db" do
- predefined_for = lambda { |name| _("src/main/sql/tables_#{name}.sql") }
+ predefined_for = lambda { |name| _("src/main/sql/simplesched-#{name}.sql") }
properties_for = lambda { |name| _("src/main/sql/ode.#{name}.properties") }
dao_hibernate = project("dao-hibernate").compile.target
@@ -352,13 +361,13 @@
define "dao-jpa-ojpa-derby" do
%w{ derby mysql }.each do |db|
db_xml = _("src/main/descriptors/persistence.#{db}.xml")
- quartz_sql = _("src/main/scripts/quartz-#{db}.sql")
+ scheduler_sql = _("src/main/scripts/simplesched-#{db}.sql")
partial_sql = file("target/partial.#{db}.sql"=>db_xml) do |task|
mkpath _("target"), :verbose=>false
Buildr::OpenJPA.mapping_tool :properties=>db_xml, :action=>"build", :sql=>task.name,
:classpath=>projects("bpel-store", "dao-jpa", "bpel-api", "bpel-dao", "utils" )
end
- sql = concat(_("target/#{db}.sql")=>[partial_sql, quartz_sql])
+ sql = concat(_("target/#{db}.sql")=>[partial_sql, scheduler_sql])
build sql
end
derby_db = Derby.create(_("target/derby/jpadb")=>_("target/derby.sql"))
@@ -389,7 +398,7 @@
desc "ODE JBI Integration Layer"
define "jbi" do
compile.with projects("bpel-api", "bpel-connector", "bpel-dao", "bpel-epr", "bpel-obj",
- "bpel-runtime", "bpel-scheduler-quartz", "bpel-schemas", "bpel-store", "utils"),
+ "bpel-runtime", "scheduler-simple", "bpel-schemas", "bpel-store", "utils"),
COMMONS.logging, COMMONS.pool, JAVAX.transaction, JBI, LOG4J, WSDL4J, XERCES
package(:jar)
@@ -397,11 +406,11 @@
libs = artifacts(package(:jar),
projects("bpel-api", "bpel-api-jca", "bpel-compiler", "bpel-connector", "bpel-dao",
"bpel-epr", "jca-ra", "jca-server", "bpel-obj", "bpel-ql", "bpel-runtime",
- "bpel-scheduler-quartz", "bpel-schemas", "bpel-store", "dao-hibernate", "dao-jpa",
+ "scheduler-simple", "bpel-schemas", "bpel-store", "dao-hibernate", "dao-jpa",
"jacob", "jacob-ap", "utils"),
ANT, BACKPORT, COMMONS.codec, COMMONS.collections, COMMONS.dbcp, COMMONS.lang, COMMONS.pool,
COMMONS.primitives, JAXEN, JAVAX.connector, JAVAX.ejb, JAVAX.jms,
- JAVAX.persistence, JAVAX.stream, JAVAX.transaction, LOG4J, OPENJPA, QUARTZ, SAXON, TRANQL,
+ JAVAX.persistence, JAVAX.stream, JAVAX.transaction, LOG4J, OPENJPA, SAXON, TRANQL,
XALAN, XMLBEANS, XSTREAM, WSDL4J)
jbi.component :type=>:service_engine, :name=>"OdeBpelEngine", :description=>self.comment
@@ -416,7 +425,7 @@
"jca-server", "jacob"),
BACKPORT, COMMONS.lang, COMMONS.collections, DERBY, GERONIMO.connector, GERONIMO.kernel,
GERONIMO.transaction, JAVAX.connector, JAVAX.ejb, JAVAX.persistence, JAVAX.stream,
- JAVAX.transaction, JAXEN, JBI, OPENJPA, QUARTZ, SAXON, SERVICEMIX, SPRING, TRANQL,
+ JAVAX.transaction, JAXEN, JBI, OPENJPA, SAXON, SERVICEMIX, SPRING, TRANQL,
XALAN, XBEAN, XMLBEANS, XSTREAM
test.junit.using :properties=>{ "jbi.install"=>_("target/smixInstallDir"), "jbi.examples"=>_("../distro-jbi/src/examples") }
test.setup unzip(_("target/smixInstallDir/install/ODE")=>project("dao-jpa-ojpa-derby").package(:zip))
@@ -462,3 +471,54 @@
end
+define "apache-ode" do
+ [:version, :group, :manifest, :meta_inf].each { |prop| send "#{prop}=", project("ode").send(prop) }
+
+ def distro(project, id)
+ project.package(:zip, :id=>id).path("#{id}-#{version}").tap do |zip|
+ zip.include meta_inf + ["RELEASE_NOTES", "README"].map { |f| path_to(f) }
+ zip.path("examples").include project.path_to("src/examples"), :as=>"."
+ zip.merge project("ode:tools-bin").package(:zip)
+ zip.path("lib").include artifacts(COMMONS.logging, COMMONS.codec, COMMONS.httpclient,
+ COMMONS.pool, COMMONS.collections, JAXEN,
+ SAXON, LOG4J, WSDL4J, XALAN, XERCES)
+ project("ode").projects("utils", "tools", "bpel-compiler", "bpel-api", "bpel-obj", "bpel-schemas").
+ map(&:packages).flatten.each do |pkg|
+ zip.include(pkg.to_s, :as=>"#{pkg.id}.#{pkg.type}", :path=>"lib")
+ end
+ yield zip
+ end
+ end
+
+ desc "ODE Axis2 Based Distribution"
+ define "distro-axis2" do
+ parent.distro(self, "#{parent.id}-war") { |zip| zip.include project("ode:axis2-war").package(:war), :as=>"ode.war" }
+
+ project("ode:axis2-war").task("start").enhance do |task|
+ target = "#{task.path}/webapp/WEB-INF/processes"
+ puts "Deploying processes to #{target}" if verbose
+ verbose(false) do
+ mkpath target
+ cp_r FileList[_("src/examples/*")].to_a, target
+ rm Dir.glob("#{target}/*.deployed")
+ end
+ end
+ end
+
+ desc "ODE JBI Based Distribution"
+ define "distro-jbi" do
+ parent.distro(self, "#{parent.id}-jbi") { |zip| zip.include project("ode:jbi").package(:zip) }
+ end
+
+ package(:zip, :id=>"#{id}-sources").path("#{id}-sources-#{version}").tap do |zip|
+ if File.exist?(".svn")
+ `svn status -v`.reject { |l| l[0] == ?? || l[0] == ?D }.
+ map { |l| l.split.last }.reject { |f| File.directory?(f) }.
+ each { |f| zip.include f, :as=>f }
+ else
+ zip.include Dir.pwd, :as=>"."
+ end
+ end
+
+ package :zip, :id=>"#{id}-docs", :include=>javadoc(project("ode").projects).target
+end
Modified: incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java (original)
+++ incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java Tue Jul 24 13:58:12 2007
@@ -19,13 +19,19 @@
package org.apache.ode.axis2;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.ContextException;
+import org.apache.ode.bpel.iapi.EndpointReference;
import org.apache.ode.bpel.iapi.InvocationStyle;
import org.apache.ode.bpel.iapi.MessageExchangeContext;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
+import org.apache.ode.bpel.iapi.PartnerRoleChannel;
import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
/**
@@ -36,7 +42,17 @@
public class MessageExchangeContextImpl implements MessageExchangeContext {
private static final Log __log = LogFactory.getLog(MessageExchangeContextImpl.class);
-
+
+ /** The currently supported invocation styles. */
+ private static final Set<InvocationStyle> __supportedInvocationStyles;
+
+ static {
+ HashSet<InvocationStyle> styles = new HashSet<InvocationStyle>();
+ styles.add(InvocationStyle.ASYNC);
+ styles.add(InvocationStyle.BLOCKING);
+ __supportedInvocationStyles = Collections.unmodifiableSet(styles);
+ }
+
public MessageExchangeContextImpl(ODEServer server) {
}
@@ -75,7 +91,20 @@
}
public void onReliableReply(MyRoleMessageExchange myRoleMex) throws BpelEngineException {
+ __log.error("RELIABLE reply from service " + myRoleMex.getServiceName() +"; RELIABLE IS NOT SUPPORTED!");
+
// We don't support this yet, so not much to do here.
+ }
+
+
+ public void cancel(PartnerRoleMessageExchange mex) throws ContextException {
+ // TODO Auto-generated method stub
+
+ }
+
+
+ public Set<InvocationStyle> getSupportedInvocationStyle(PartnerRoleChannel prc, EndpointReference partnerEpr) {
+ return __supportedInvocationStyles;
}
}
Modified: incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java (original)
+++ incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java Tue Jul 24 13:58:12 2007
@@ -52,10 +52,11 @@
import org.apache.ode.bpel.iapi.ProcessStoreEvent;
import org.apache.ode.bpel.iapi.ProcessStoreListener;
import org.apache.ode.bpel.iapi.Scheduler;
-import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl;
-import org.apache.ode.bpel.scheduler.quartz.QuartzSchedulerImpl;
import org.apache.ode.il.dbutil.Database;
+import org.apache.ode.scheduler.simple.JdbcDelegate;
+import org.apache.ode.scheduler.simple.SimpleScheduler;
import org.apache.ode.store.ProcessStoreImpl;
+import org.apache.ode.utils.GUID;
import org.apache.ode.utils.fs.TempFileManager;
/**
@@ -291,7 +292,7 @@
public ODEService createService(ProcessConf pconf, QName serviceName, String portName) throws AxisFault {
destroyService(serviceName, portName);
AxisService axisService = ODEAxisService.createService(_axisConfig, pconf, serviceName, portName);
- ODEService odeService = new ODEService(axisService, pconf.getDefinitionForService(serviceName), serviceName, portName, _server, _txMgr);
+ ODEService odeService = new ODEService(axisService, pconf.getDefinitionForService(serviceName), serviceName, portName, _server);
if (_odeConfig.isReplicateEmptyNS()) {
__log.debug("Setting service with empty namespace replication");
odeService.setReplicateEmptyNS(true);
@@ -425,10 +426,9 @@
}
protected Scheduler createScheduler() {
- QuartzSchedulerImpl scheduler = new QuartzSchedulerImpl();
+ SimpleScheduler scheduler = new SimpleScheduler(new GUID().toString(),new JdbcDelegate(_db.getDataSource()));
scheduler.setTransactionManager(_txMgr);
- scheduler.setDataSource(_db.getDataSource());
- scheduler.init();
+
return scheduler;
}
@@ -442,7 +442,6 @@
_scheduler.setJobProcessor(_server);
_server.setDaoConnectionFactory(_daoCF);
- _server.setInMemDaoConnectionFactory(new BpelDAOConnectionFactoryImpl(_scheduler));
_server.setEndpointReferenceContext(new EndpointReferenceContextImpl(this));
_server.setMessageExchangeContext(new MessageExchangeContextImpl(this));
_server.setBindingContext(new BindingContextImpl(this, _store));
Modified: incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ODEService.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ODEService.java (original)
+++ incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ODEService.java Tue Jul 24 13:58:12 2007
@@ -19,10 +19,6 @@
package org.apache.ode.axis2;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import javax.transaction.TransactionManager;
import javax.wsdl.Definition;
import javax.wsdl.Port;
import javax.wsdl.Service;
@@ -44,6 +40,7 @@
import org.apache.ode.bpel.epr.WSAEndpoint;
import org.apache.ode.bpel.iapi.BpelServer;
import org.apache.ode.bpel.iapi.EndpointReference;
+import org.apache.ode.bpel.iapi.InvocationStyle;
import org.apache.ode.bpel.iapi.Message;
import org.apache.ode.bpel.iapi.MessageExchange;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
@@ -54,8 +51,8 @@
import org.w3c.dom.Element;
/**
- * A running service, encapsulates the Axis service, its receivers and our
- * receivers as well.
+ * A running service, encapsulates the Axis service, its receivers and our receivers as well.
+ *
* @author Matthieu Riou <mriou at apache dot org>
*/
public class ODEService {
@@ -65,7 +62,6 @@
private AxisService _axisService;
private BpelServer _server;
- private TransactionManager _txManager;
private Definition _wsdlDef;
private QName _serviceName;
private String _portName;
@@ -73,11 +69,9 @@
private boolean _isReplicateEmptyNS = false;
private SoapMessageConverter _converter;
- public ODEService(AxisService axisService, Definition def, QName serviceName, String portName, BpelServer server,
- TransactionManager txManager) throws AxisFault {
+ public ODEService(AxisService axisService, Definition def, QName serviceName, String portName, BpelServer server) throws AxisFault {
_axisService = axisService;
_server = server;
- _txManager = txManager;
_wsdlDef = def;
_serviceName = serviceName;
_portName = portName;
@@ -88,119 +82,64 @@
public void onAxisMessageExchange(MessageContext msgContext, MessageContext outMsgContext, SOAPFactory soapFactory)
throws AxisFault {
- boolean success = true;
MyRoleMessageExchange odeMex = null;
- Future responseFuture = null;
try {
- _txManager.begin();
- if (__log.isDebugEnabled()) __log.debug("Starting transaction.");
-
// Creating mesage exchange
String messageId = new GUID().toString();
- odeMex = _server.getEngine().createMessageExchange("" + messageId, _serviceName,
- msgContext.getAxisOperation().getName().getLocalPart());
+ odeMex = _server.createMessageExchange(InvocationStyle.BLOCKING, _serviceName,
+ msgContext.getAxisOperation().getName().getLocalPart(), "" + messageId);
+
__log.debug("ODE routed to operation " + odeMex.getOperation() + " from service " + _serviceName);
- if (odeMex.getOperation() != null) {
- // Preparing message to send to ODE
- Element msgEl = DOMUtils.newDocument().createElementNS(null, "message");
- msgEl.getOwnerDocument().appendChild(msgEl);
- _converter.parseSoapRequest(msgEl, msgContext.getEnvelope(), odeMex.getOperation());
- Message odeRequest = odeMex.createMessage(odeMex.getOperation().getInput().getMessage().getQName());
- readHeader(msgContext, odeMex);
- odeRequest.setMessage(msgEl);
-
- if (__log.isDebugEnabled()) {
- __log.debug("Invoking ODE using MEX " + odeMex);
- __log.debug("Message content: " + DOMUtils.domToString(odeRequest.getMessage()));
- }
-
- // Invoke ODE
- responseFuture = odeMex.invoke(odeRequest);
-
- __log.debug("Commiting ODE MEX " + odeMex);
- try {
- if (__log.isDebugEnabled()) __log.debug("Commiting transaction.");
- _txManager.commit();
- } catch (Exception e) {
- __log.error("Commit failed", e);
- success = false;
- }
- } else {
- success = false;
+ if (odeMex.getOperation() == null) {
+ String errmsg = "Call to " + _serviceName + "." + odeMex.getOperationName() + " was not routable.";
+ __log.error(errmsg);
+ throw new OdeFault(errmsg);
+ }
+
+ // Preparing message to send to ODE
+ Element msgEl = DOMUtils.newDocument().createElementNS(null, "message");
+ msgEl.getOwnerDocument().appendChild(msgEl);
+ _converter.parseSoapRequest(msgEl, msgContext.getEnvelope(), odeMex.getOperation());
+ Message odeRequest = odeMex.createMessage(odeMex.getOperation().getInput().getMessage().getQName());
+ readHeader(msgContext, odeMex);
+ odeRequest.setMessage(msgEl);
+
+ if (__log.isDebugEnabled()) {
+ __log.debug("Invoking ODE using MEX " + odeMex);
+ __log.debug("Message content: " + DOMUtils.domToString(odeRequest.getMessage()));
}
- } catch (Exception e) {
- __log.error("Exception occured while invoking ODE", e);
- success = false;
- throw new OdeFault("An exception occured while invoking ODE.", e);
- } finally {
- if (!success) {
- if (odeMex != null) odeMex.release();
- try {
- _txManager.rollback();
- } catch (Exception e) {
- throw new OdeFault("Rollback failed", e);
- }
- }
- }
- if (odeMex.getOperation().getOutput() != null) {
- // Waits for the response to arrive
+ odeMex.setRequest(odeRequest);
+ // odeMex.setTimeout(TIMEOUT);
try {
- responseFuture.get(TIMEOUT, TimeUnit.MILLISECONDS);
- } catch (Exception e) {
- String errorMsg = "Timeout or execution error when waiting for response to MEX "
- + odeMex + " " + e.toString();
- __log.error(errorMsg);
- throw new OdeFault(errorMsg);
+ odeMex.invokeBlocking();
+ } catch (java.util.concurrent.TimeoutException te) {
+ String errmsg = "Call to " + _serviceName + "." + odeMex.getOperationName() + " timed out.";
+ __log.error(errmsg);
+ throw new OdeFault(errmsg);
}
-
- if (outMsgContext != null) {
+
+ if (odeMex.getOperation().getOutput() != null && outMsgContext != null) {
SOAPEnvelope envelope = soapFactory.getDefaultEnvelope();
outMsgContext.setEnvelope(envelope);
// Hopefully we have a response
__log.debug("Handling response for MEX " + odeMex);
- boolean commit = false;
- try {
- if (__log.isDebugEnabled()) __log.debug("Starting transaction.");
- _txManager.begin();
- } catch (Exception ex) {
- throw new OdeFault("Error starting transaction!", ex);
- }
+ onResponse(odeMex, outMsgContext);
+ }
+
+ } catch (Exception e) {
+ String errmsg = "Call to " + _serviceName + "." + odeMex.getOperationName() + " caused an exception.";
+ __log.error(errmsg, e);
+ throw new OdeFault(errmsg, e);
+ } finally {
+ if (odeMex != null)
try {
- // Refreshing the message exchange
- odeMex = (MyRoleMessageExchange) _server.getEngine().getMessageExchange(odeMex.getMessageExchangeId());
- onResponse(odeMex, outMsgContext);
- commit = true;
- } catch (AxisFault af) {
- __log.error("Error processing response for MEX " + odeMex, af);
- commit = true;
- throw af;
- } catch (Exception e) {
- __log.error("Error processing response for MEX " + odeMex, e);
- throw new OdeFault("An exception occured when invoking ODE.", e);
- } finally {
odeMex.release();
- if (commit) {
- try {
- if (__log.isDebugEnabled()) __log.debug("Comitting transaction.");
- _txManager.commit();
- } catch (Exception e) {
- throw new OdeFault("Commit failed!", e);
- }
- } else {
- try {
- _txManager.rollback();
- } catch (Exception ex) {
- throw new OdeFault("Rollback failed!", ex);
- }
- }
+ } catch (Exception ex) {
+ __log.error("Error releasing message exchange: " + odeMex.getMessageExchangeId());
}
- }
- if (!success) {
- throw new OdeFault("Message was either unroutable or timed out!");
- }
}
}
@@ -235,8 +174,7 @@
}
/**
- * Extracts endpoint information from Axis MessageContext (taken from WSA
- * headers) to stuff them into ODE mesage exchange.
+ * Extracts endpoint information from Axis MessageContext (taken from WSA headers) to stuff them into ODE mesage exchange.
*/
private void readHeader(MessageContext msgContext, MyRoleMessageExchange odeMex) {
Object otse = msgContext.getProperty("targetSessionEndpoint");
@@ -262,9 +200,8 @@
}
/**
- * Handle callback endpoints for the case where partner contact process
- * my-role which results in an "updated" my-role EPR due to session id
- * injection.
+ * Handle callback endpoints for the case where partner contact process my-role which results in an "updated" my-role EPR due to
+ * session id injection.
*/
private void writeHeader(MessageContext msgContext, MyRoleMessageExchange odeMex) {
EndpointReference targetEPR = odeMex.getEndpointReference();
@@ -286,8 +223,8 @@
}
/**
- * Return the service-ref element that will be used to represent this
- * endpoint.
+ * Return the service-ref element that will be used to represent this endpoint.
+ *
* @return my service endpoint
*/
public EndpointReference getMyServiceRef() {
@@ -296,7 +233,7 @@
/**
* Get the EPR of this service from the WSDL.
- *
+ *
* @param name
* service name
* @param portName
@@ -335,7 +272,7 @@
/**
* Create-and-copy a service-ref element.
- *
+ *
* @param elmt
* @return wrapped element
*/
Modified: incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java (original)
+++ incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java Tue Jul 24 13:58:12 2007
@@ -45,14 +45,7 @@
throws ContextException ;
- /**
- * Schedule a volatile (non-persisted) job. Volatile jobs should not be
- * saved in the database and should not survive system crash.
- *
- * @param transacted
- * @return unique (as far as the scheduler is concerned) job identifier
- */
- <T> Future<T> scheduleVolatileJob(boolean transacted, final Callable<T> call) throws ContextException;
+ void jobCompleted(String jobId);
/**
* Make a good effort to cancel the job. If its already running no big
@@ -61,62 +54,12 @@
*/
void cancelJob(String jobId) throws ContextException;
- /**
- * Execute a {@link Callable} in a transactional context. If the callable
- * throws an exception, then the transaction will be rolled back, otherwise
- * the transaction will commit.
- *
- * @param <T> return type
- * @param transaction transaction to execute
- * @return result
- * @throws Exception
- */
- <T> T execTransaction(Callable<T> transaction)
- throws Exception, ContextException;
-
- /**
- * Same as execTransaction but executes in a different thread to guarantee
- * isolation from the main execution thread.
- * @param transaction
- * @return
- * @throws Exception
- * @throws ContextException
- */
- <T> Future<T> execIsolatedTransaction(final Callable<T> transaction)
- throws Exception, ContextException;
-
- /**
- * @return true if the current thread is associated with a transaction.
- */
- boolean isTransacted();
-
- /**
- * Register a transaction synchronizer.
- * @param synch synchronizer
- * @throws ContextException
- */
- void registerSynchronizer(Synchronizer synch) throws ContextException;
-
void start();
void stop();
void shutdown();
- public interface Synchronizer {
- /**
- * Called after the transaction is completed.
- * @param success indicates whether the transaction was comitted
- */
- void afterCompletion(boolean success);
-
- /**
- * Called before the transaction is completed.
- */
- void beforeCompletion();
-
- }
-
/**
* Interface implemented by the object responsible for job execution.
* @author mszefler
@@ -166,6 +109,7 @@
}
}
+
}
Modified: incubator/ode/branches/bart/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java (original)
+++ incubator/ode/branches/bart/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java Tue Jul 24 13:58:12 2007
@@ -19,123 +19,72 @@
package org.apache.ode.il;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.ode.bpel.iapi.ContextException;
-import org.apache.ode.bpel.iapi.Scheduler;
-
-import javax.transaction.Status;
-import javax.transaction.Synchronization;
-import javax.transaction.SystemException;
-import javax.transaction.TransactionManager;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.transaction.Synchronization;
+import javax.transaction.TransactionManager;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.iapi.ContextException;
+import org.apache.ode.bpel.iapi.Scheduler;
/**
+ *
* @author Matthieu Riou <mriou at apache dot org>
+ *
+ * - BART refactor: Removed transaction management logic.
+ * @author Maciej Szefler <mszefler at gmail dot com>
*/
public class MockScheduler implements Scheduler {
private static final Log __log = LogFactory.getLog(MockScheduler.class);
private JobProcessor _processor;
- private ExecutorService _executorSvc = Executors.newCachedThreadPool();
- private ThreadLocal<Boolean> _transacted = new ThreadLocal<Boolean>();
- private TransactionManager _txm;
- public MockScheduler() {
- _transacted.set(false);
- }
+ private ScheduledExecutorService _exec;
+
+ private TransactionManager _txm;
public MockScheduler(TransactionManager txm) {
_txm = txm;
- _transacted.set(false);
}
- ThreadLocal<List<Synchronizer>> _synchros = new ThreadLocal<List<Scheduler.Synchronizer>>() {
+ ThreadLocal<List<Synchronization>> _synchros = new ThreadLocal<List<Synchronization>>() {
@Override
- protected List<Synchronizer> initialValue() {
- return new ArrayList<Synchronizer>();
+ protected List<Synchronization> initialValue() {
+ return new ArrayList<Synchronization>();
}
};
- public String schedulePersistedJob(Map<String, Object> detail, Date date) throws ContextException {
- if (date != null) {
- try {
- while(new Date().before(date)) { Thread.sleep(100); }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- return scheduleVolatileJob(true, detail);
- }
-
- public String scheduleVolatileJob(final boolean transacted, final Map<String, Object> detail) throws ContextException {
- registerSynchronizer(new Synchronizer() {
- public void afterCompletion(boolean success) {
- try {
- if (transacted) {
- execIsolatedTransaction(new Callable() {
- public Object call() throws Exception {
- JobInfo ji = new JobInfo("volatileJob", detail, 0);
- doExecute(ji);
- return null;
- }
- });
- } else {
- JobInfo ji = new JobInfo("volatileJob", detail, 0);
+ public String schedulePersistedJob(final Map<String, Object> detail, final Date date) throws ContextException {
+ registerSynchronizer(new Synchronization() {
+ public void afterCompletion(int status) {
+ long delay = Math.max(0, date.getTime() - System.currentTimeMillis());
+ _exec.schedule(new Callable<Void>() {
+ public Void call() throws Exception {
+ JobInfo ji = new JobInfo("job" + System.currentTimeMillis(), detail, 0);
doExecute(ji);
+ return null;
}
- } catch (Exception e) {
- throw new ContextException("Failure when starting a new volatile job.", e);
- }
+ }, delay, TimeUnit.MILLISECONDS);
+ }
+
+ public void beforeCompletion() {
}
- public void beforeCompletion() { }
});
return null;
- }
-
- public void cancelJob(String arg0) throws ContextException {
}
- public <T> T execTransaction(Callable<T> transaction) throws Exception, ContextException {
- begin();
- try {
- T retval = transaction.call();
- commit();
- return retval;
- } catch (Throwable t) {
- __log.error("Caught an exception during transaction", t);
- rollback();
- throw new ContextException("Error in tx", t);
- }
- }
-
- public <T> Future<T> execIsolatedTransaction(final Callable<T> transaction) throws Exception, ContextException {
- return _executorSvc.submit(new Callable<T>() {
- public T call() throws Exception {
- return execTransaction(transaction);
- }
- });
- }
+ public void cancelJob(String arg0) throws ContextException {
- public boolean isTransacted() {
- if (_txm != null) {
- try {
- return _txm.getTransaction() != null;
- } catch (SystemException e) {
- __log.error("Exception in mock scheduler isTransacted.", e);
- throw new RuntimeException(e);
- }
- }
- else return _transacted.get();
}
public void start() {
@@ -147,90 +96,15 @@
public void shutdown() {
}
- public void registerSynchronizer(final Synchronizer synch) throws ContextException {
- if (_txm != null) {
- try {
- _txm.getTransaction().registerSynchronization(new Synchronization() {
- public void beforeCompletion() {
- synch.beforeCompletion();
- }
- public void afterCompletion(int status) {
- synch.afterCompletion(status == Status.STATUS_COMMITTED);
- }
- });
- } catch (Exception e) {
- __log.error("Exception in mock scheduler sync registration.", e);
- throw new RuntimeException(e);
- }
- } else {
- _synchros.get().add(synch);
- }
- }
-
- public void begin() {
- if (_txm != null) {
- try {
- _txm.begin();
- } catch (Exception e) {
- __log.error("Exception in mock scheduler begin.", e);
- throw new RuntimeException(e);
- }
- } else {
- if (_transacted.get() == Boolean.TRUE)
- throw new RuntimeException("Transaction active.");
- _synchros.get().clear();
- }
- _transacted.set(Boolean.TRUE);
- }
-
- public void commit() {
- if (_txm != null) {
- try {
- _txm.commit();
- } catch (Exception e) {
- __log.error("Exception in mock scheduler commit.", e);
- throw new RuntimeException(e);
- }
- } else {
- for (Synchronizer s : _synchros.get())
- try {
- s.beforeCompletion();
- } catch (Throwable t) {
- }
- for (Synchronizer s : _synchros.get())
- try {
- s.afterCompletion(true);
- } catch (Throwable t) {
- }
-
- _synchros.get().clear();
+ private void registerSynchronizer(final Synchronization synch) throws ContextException {
+ try {
+ _txm.getTransaction().registerSynchronization(synch);
+ } catch (Exception e) {
+ __log.error("Exception in mock scheduler sync registration.", e);
+ throw new RuntimeException(e);
}
- _transacted.set(Boolean.FALSE);
}
- public void rollback() {
- if (_txm != null) {
- try {
- _txm.rollback();
- } catch (Exception e) {
- __log.error("Exception in mock scheduler rollback.", e);
- throw new RuntimeException(e);
- }
- } else {
- for (Synchronizer s : _synchros.get())
- try {
- s.beforeCompletion();
- } catch (Throwable t) {
- }
- for (Synchronizer s : _synchros.get())
- try {
- s.afterCompletion(false);
- } catch (Throwable t) {
- }
- _synchros.get().clear();
- }
- _transacted.set(Boolean.FALSE);
- }
private void doExecute(JobInfo ji) {
JobProcessor processor = _processor;
@@ -247,7 +121,7 @@
_processor = processor;
}
- public void setExecutorSvc(ExecutorService executorSvc) {
- _executorSvc = executorSvc;
+ public void jobCompleted(String jobId) {
+
}
}
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java Tue Jul 24 13:58:12 2007
@@ -32,7 +32,7 @@
@Override
protected void resumeInstance() {
- assert !_contexts.scheduler.isTransacted() : "checkReplyContext() should have prevented us from getting here.";
+ assert !_contexts.isTransacted() : "checkReplyContext() should have prevented us from getting here.";
assert !_process.isInMemory() : "resumeInstance() for in-mem processes makes no sense.";
final WorkEvent we = generateInvokeResponseWorkEvent();
@@ -56,7 +56,7 @@
super.checkReplyContextOk();
// Prevent user from attempting the replyXXXX calls while a transaction is active.
- if (!_ownerThread.get() && _contexts.scheduler.isTransacted())
+ if (!_ownerThread.get() && _contexts.isTransacted())
throw new BpelEngineException("Cannot reply to ASYNC style invocation from a transactional context!");
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelDatabase.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelDatabase.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelDatabase.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelDatabase.java Tue Jul 24 13:58:12 2007
@@ -18,8 +18,6 @@
*/
package org.apache.ode.bpel.engine;
-
-
import javax.xml.namespace.QName;
import org.apache.ode.bpel.dao.BpelDAOConnection;
@@ -33,52 +31,54 @@
* Encapsulates transactional access to the BPEL database.
*/
class BpelDatabase {
- static Log __log = LogFactory.getLog(BpelDatabase.class);
+ static Log __log = LogFactory.getLog(BpelDatabase.class);
+
+ protected BpelDAOConnectionFactory _sscf;
- protected BpelDAOConnectionFactory _sscf;
- protected Scheduler _scheduler;
+ protected Contexts _contexts;
- BpelDatabase(BpelDAOConnectionFactory sscf, Scheduler scheduler) {
- if (sscf == null)
- throw new NullPointerException("sscf is null!");
- if (scheduler == null)
- throw new NullPointerException("scheduler is null!");
-
- _sscf = sscf;
- _scheduler = scheduler;
-
- }
-
- /**
- * Get a connection to the database with the correct store identifier.
- * @return a state store connection
- * @throws org.apache.ode.utils.dao.DConnectionException
- */
- BpelDAOConnection getConnection() {
- // Note: this will give us a connection associated with the current
- // transaction, so no need to worry about closing it.
- return _sscf.getConnection();
- }
-
- BpelProcessDatabase getProcessDb(QName pid) {
- return new BpelProcessDatabase(_sscf, _scheduler, pid);
- }
-
- /**
- * Execute a self-contained database transaction.
- * @param callable database transaction
- * @return
- * @throws DConnectionException
- */
- <T> T exec(final Callable<T> callable) throws Exception {
- return _scheduler.execTransaction(new java.util.concurrent.Callable<T>() {
- public T call() throws Exception {
- return callable.run(_sscf.getConnection());
- }
- });
- }
-
- interface Callable<T> {
- public T run(BpelDAOConnection conn) throws Exception;
- }
+ BpelDatabase(Contexts contexts) {
+ if (contexts == null)
+ throw new NullPointerException("scheduler is null!");
+
+ _sscf = contexts.dao;
+ _contexts = contexts;
+
+ }
+
+ /**
+ * Get a connection to the database with the correct store identifier.
+ *
+ * @return a state store connection
+ * @throws org.apache.ode.utils.dao.DConnectionException
+ */
+ BpelDAOConnection getConnection() {
+ // Note: this will give us a connection associated with the current
+ // transaction, so no need to worry about closing it.
+ return _sscf.getConnection();
+ }
+
+ BpelProcessDatabase getProcessDb(QName pid) {
+ return new BpelProcessDatabase(_contexts, pid);
+ }
+
+ /**
+ * Execute a self-contained database transaction.
+ *
+ * @param callable
+ * database transaction
+ * @return
+ * @throws DConnectionException
+ */
+ <T> T exec(final Callable<T> callable) throws Exception {
+ return _contexts.execTransaction(new java.util.concurrent.Callable<T>() {
+ public T call() throws Exception {
+ return callable.run(_sscf.getConnection());
+ }
+ });
+ }
+
+ interface Callable<T> {
+ public T run(BpelDAOConnection conn) throws Exception;
+ }
}
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Tue Jul 24 13:58:12 2007
@@ -19,7 +19,6 @@
package org.apache.ode.bpel.engine;
import java.io.InputStream;
-import java.sql.Date;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -29,7 +28,7 @@
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Future;
import javax.wsdl.Operation;
import javax.wsdl.PortType;
@@ -53,9 +52,9 @@
import org.apache.ode.bpel.iapi.MessageExchange;
import org.apache.ode.bpel.iapi.PartnerRoleChannel;
import org.apache.ode.bpel.iapi.ProcessConf;
-import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
import org.apache.ode.bpel.iapi.MessageExchange.Status;
+import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
import org.apache.ode.bpel.intercept.InterceptorInvoker;
@@ -126,12 +125,11 @@
private final List<MessageExchangeInterceptor> _mexInterceptors = new ArrayList<MessageExchangeInterceptor>();
/** Latch-like thing to control hydration/dehydration. */
- private HydrationLatch _hydrationLatch;
+ HydrationLatch _hydrationLatch;
protected Contexts _contexts;
- /** Manage instance-level locks. */
- private final InstanceLockManager _instanceLockManager = new InstanceLockManager();
+ final BpelInstanceWorkerCache _instanceWorkerCache = new BpelInstanceWorkerCache(this);
private final Set<InvocationStyle> _invocationStyles;
@@ -141,12 +139,20 @@
final BpelServerImpl _server;
+ /** Indicates whether we are operating in a server-managed transaction. */
+ private ThreadLocal<Boolean> _serverTx = new ThreadLocal<Boolean>() {
+ @Override
+ protected Boolean initialValue() {
+ return Boolean.FALSE;
+ }
+ };
+
public BpelProcess(BpelServerImpl server, ProcessConf conf, BpelEventListener debugger) {
_server = server;
_pid = conf.getProcessId();
_pconf = conf;
_hydrationLatch = new HydrationLatch();
- _inMemDao = new BpelDAOConnectionFactoryImpl(_contexts.scheduler);
+ _inMemDao = new BpelDAOConnectionFactoryImpl(_contexts.txManager);
// TODO : do this on a per-partnerlink basis, support transacted styles.
HashSet<InvocationStyle> istyles = new HashSet<InvocationStyle>();
@@ -167,8 +173,9 @@
if (__log.isDebugEnabled())
__log.debug("Recovering activity in process " + instanceDAO.getInstanceId() + " with action " + action);
markused();
- BpelRuntimeContextImpl processInstance = createRuntimeContext(instanceDAO, null, null);
- processInstance.recoverActivity(channel, activityId, action, fault);
+ throw new AssertionError("TODO: fixme");// TODO
+ // BpelRuntimeContextImpl processInstance = createRuntimeContext(instanceDAO, null, null);
+ // processInstance.recoverActivity(channel, activityId, action, fault);
}
static String generateMessageExchangeIdentifier(String partnerlinkName, String operationName) {
@@ -183,7 +190,9 @@
*
* @param mex
*/
- void invokeProcess(MessageExchangeDAO mexdao) {
+ void invokeProcess(final MessageExchangeDAO mexdao) {
+ InvocationStyle istyle = InvocationStyle.valueOf(mexdao.getInvocationStyle());
+
_hydrationLatch.latch(1);
try {
PartnerLinkMyRoleImpl target = getMyRoleForService(mexdao.getCallee());
@@ -205,7 +214,29 @@
// }
markused();
- target.invokeMyRole(mexdao);
+ CorrelationStatus cstatus = target.invokeMyRole(mexdao);
+ if (cstatus == null) {
+ ; // do nothing
+ } else if (cstatus == CorrelationStatus.CREATE_INSTANCE) {
+ doInstanceWork(mexdao.getInstance().getInstanceId(), new Callable<Void>() {
+ public Void call() {
+ executeCreateInstance(mexdao);
+ return null;
+ }
+ });
+
+ } else if (cstatus == CorrelationStatus.MATCHED) {
+ doInstanceWork(mexdao.getInstance().getInstanceId(), new Callable<Void>() {
+ public Void call() {
+ executeContinueInstance(mexdao);
+ return null;
+ }
+ });
+
+ } else if (cstatus == CorrelationStatus.QUEUED) {
+ ; // do nothing
+ }
+ // TODO: handle correlation status (i.e. execute instance).
} finally {
_hydrationLatch.release(1);
}
@@ -216,6 +247,66 @@
// }
}
+ void executeCreateInstance(MessageExchangeDAO mexdao) {
+ BpelInstanceWorker worker = _instanceWorkerCache.get(mexdao.getInstance().getInstanceId());
+ assert worker.isWorkerThread();
+ BpelRuntimeContextImpl instanceCtx = new BpelRuntimeContextImpl(worker, mexdao.getInstance(), new PROCESS(_oprocess),
+ mexdao);
+ instanceCtx.execute();
+ }
+
+ void executeContinueInstance(MessageExchangeDAO mexdao) {
+ BpelInstanceWorker worker = _instanceWorkerCache.get(mexdao.getInstance().getInstanceId());
+ assert worker.isWorkerThread();
+ BpelRuntimeContextImpl instance = new BpelRuntimeContextImpl(worker, mexdao.getInstance(), null, null);
+ int amp = mexdao.getChannel().indexOf('&');
+ String groupId = mexdao.getChannel().substring(0, amp);
+ int idx = Integer.valueOf(mexdao.getChannel().substring(amp + 1));
+ instance.inputMsgMatch(groupId, idx, mexdao);
+ instance.execute();
+ }
+
+
+
+ private void enqueueInstanceWork(Long instanceId, Runnable runnable) {
+ BpelInstanceWorker iworker = _instanceWorkerCache.get(instanceId);
+ iworker.enqueue(new ProcessRunnable(runnable));
+ }
+
+ private void enqueueInstanceTransaction(Long instanceId, final Runnable runnable) {
+ enqueueInstanceWork(instanceId, new ProcessRunnable(new TransactedRunnable(runnable)));
+ }
+
+ /**
+ * Schedule work for a given instance; work will occur if transaction commits.
+ *
+ * @param instanceId
+ * @param name
+ */
+ private void scheduleInstanceWork(final Long instanceId, final Runnable runnable) {
+ _contexts.registerCommitSynchronizer(new Runnable() {
+ public void run() {
+ BpelInstanceWorker iworker = _instanceWorkerCache.get(instanceId);
+ iworker.enqueue(new ProcessRunnable(runnable));
+ }
+ });
+
+ }
+
+ private void scheduleInstanceTX(Long instanceId, final Runnable transaction) {
+ scheduleInstanceWork(instanceId, new TransactedRunnable(transaction));
+ }
+
+ private <T> T doInstanceWork(Long instanceId, final Callable<T> callable) {
+ try {
+ BpelInstanceWorker iworker = _instanceWorkerCache.get(instanceId);
+ return iworker.execInCurrentThread(new ProcessCallable<T>(callable));
+
+ } catch (Exception ex) {
+ throw new BpelEngineException(ex);
+ }
+ }
+
private PartnerLinkMyRoleImpl getMyRoleForService(QName serviceName) {
for (Map.Entry<Endpoint, PartnerLinkMyRoleImpl> e : getEndpointToMyRoleMap().entrySet()) {
if (e.getKey().serviceName.equals(serviceName))
@@ -323,116 +414,111 @@
return true;
}
- /*
- * // DONT PUT CODE HERE-need this method real tight in a try/catch block, we need to handle // all types of failure here, the
- * scheduler is not going to know how to handle our errors, // ALSO we have to release the lock obtained above (IMPORTANT), lest
- * the whole system come // to a grinding halt. try {
- *
- * ProcessInstanceDAO instance; if (process.isInMemory()) instance =
- * _contexts.inMemDao.getConnection().getInstance(we.getIID()); else instance =
- * _contexts.dao.getConnection().getInstance(we.getIID());
- *
- * if (instance == null) { __log.error(__msgs.msgScheduledJobReferencesUnknownInstance(we.getIID())); // nothing we can do, this
- * instance is not in the database, it will // always // fail. return; } ProcessDAO processDao = instance.getProcess(); process =
- * _activeProcesses.get(processDao.getProcessId());
- *
- * process.handleWorkEvent(we.getDetail()); debuggingDelay(); } catch (BpelEngineException bee) {
- * __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), bee); throw new Scheduler.JobProcessorException(bee,
- * checkRetry(jobInfo, bee)); } catch (ContextException ce) { __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), ce);
- * throw new Scheduler.JobProcessorException(ce, checkRetry(jobInfo, ce)); } catch (RuntimeException rte) {
- * __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), rte); throw new Scheduler.JobProcessorException(rte,
- * checkRetry(jobInfo, rte)); } catch (Throwable t) { __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), t); throw new
- * Scheduler.JobProcessorException(false);
- *
- *
- */
/**
- * @throws JobProcessorException
+ * Handle a work event; this method is called from the scheduler thread and should be very quick, i.e. any serious work needs to
+ * be handed off to a separate thread.
+ *
+ * @throws JobProcessorException
* @see org.apache.ode.bpel.engine.BpelProcess#handleWorkEvent(java.util.Map<java.lang.String,java.lang.Object>)
*/
- public void handleWorkEvent(JobInfo jobInfo) throws JobProcessorException {
- _hydrationLatch.latch(1);
- try {
- markused();
+ public void handleWorkEvent(final JobInfo jobInfo) throws JobProcessorException {
+ assert !_contexts.isTransacted() : "work events must be received outside of a transaction";
+
+ markused();
+
+ final WorkEvent we = new WorkEvent(jobInfo.jobDetail);
+ if (__log.isDebugEnabled()) {
+ __log.debug(ObjectPrinter.stringifyMethodEnter("handleWorkEvent", new Object[] { "jobInfo", jobInfo }));
+ }
+ // Process-level events
+ if (we.getType().equals(WorkEvent.Type.MYROLE_INVOKE)) {
+ // second stage of my-role invoke for BLOCKING/ASYNC/RELIABLE invocation style.
if (__log.isDebugEnabled()) {
- __log.debug(ObjectPrinter.stringifyMethodEnter("handleWorkEvent", new Object[] { "jobInfo", jobInfo }));
+ __log.debug("InvokeInternal event for mexid " + we.getMexId());
}
- final WorkEvent we = new WorkEvent(jobInfo.jobDetail);
-
- // Process level events
- if (we.getType().equals(WorkEvent.Type.MYROLE_INVOKE)) {
- if (__log.isDebugEnabled()) {
- __log.debug("InvokeInternal event for mexid " + we.getMexId());
+ enqueueTransaction(new Callable<Void>() {
+ public Void call() {
+ _contexts.scheduler.jobCompleted(jobInfo.jobName);
+ MessageExchangeDAO mexdao = loadMexDao(we.getMexId());
+ invokeProcess(mexdao);
+ return null;
}
- MessageExchangeDAO mexdao = loadMexDao(we.getMexId());
- invokeProcess(mexdao);
- } else {
- // Instance level events
- // We lock the instance to prevent concurrent transactions and prevent unnecessary rollbacks,
- // Note that we don't want to wait too long here to get our lock, since we are likely holding
- // on to scheduler's locks of various sorts.
- try {
- _instanceLockManager.lock(we.getIID(), 1, TimeUnit.MICROSECONDS);
- _contexts.scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
- public void afterCompletion(boolean success) {
- _instanceLockManager.unlock(we.getIID());
- }
+ });
- public void beforeCompletion() {
- }
- });
- } catch (InterruptedException e) {
- // Retry later.
- __log.debug("Thread interrupted, job will be rescheduled: " + jobInfo);
- throw new Scheduler.JobProcessorException(true);
- } catch (org.apache.ode.bpel.engine.InstanceLockManager.TimeoutException e) {
- __log.debug("Instance " + we.getIID() + " is busy, rescheduling job.");
- // TODO: This should really be more of something like the exponential backoff algorithm in ethernet.
- _contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail, new Date(System.currentTimeMillis()
- + _random.nextInt(1000)));
- return;
- }
-
- ProcessInstanceDAO procInstance = getProcessDAO().getInstance(we.getIID());
- if (procInstance == null) {
- if (__log.isDebugEnabled()) {
- __log.debug("handleWorkEvent: no ProcessInstance found with iid " + we.getIID() + "; ignoring.");
- }
- return;
+ } else /* instance-level events */{
+ enqueueInstanceTransaction(we.getIID(), new Runnable() {
+ public void run() {
+ _contexts.scheduler.jobCompleted(jobInfo.jobName);
+ execInstanceEvent(we);
}
- BpelRuntimeContextImpl processInstance = createRuntimeContext(procInstance, null, null);
- switch (we.getType()) {
- case TIMER:
- if (__log.isDebugEnabled()) {
- __log.debug("handleWorkEvent: TimerWork event for process instance " + processInstance);
- }
- processInstance.timerEvent(we.getChannel());
- break;
- case RESUME:
- if (__log.isDebugEnabled()) {
- __log.debug("handleWorkEvent: ResumeWork event for iid " + we.getIID());
- }
- processInstance.execute();
- break;
- case PARTNER_RESPONSE:
- if (__log.isDebugEnabled()) {
- __log.debug("InvokeResponse event for iid " + we.getIID());
- }
- processInstance.injectPartnerResponse(we.getMexId(), we.getChannel());
- processInstance.execute();
- break;
- case MATCHER:
- if (__log.isDebugEnabled()) {
- __log.debug("Matcher event for iid " + we.getIID());
- }
- processInstance.matcherEvent(we.getCorrelatorId(), we.getCorrelationKey());
+ });
+ }
+
+ }
+
+ /**
+ * Enqueue a transaction for execution by the engine.
+ *
+ * @param tx
+ * the transaction
+ */
+ private <T> Future<T> enqueueTransaction(final Callable<T> tx) {
+ // We have to wrap our transaction to make sure that we are hydrated when the transaction runs.
+ return _server.execIsolatedTransaction(new Callable<T>() {
+ public T call() throws Exception {
+ _hydrationLatch.latch(1);
+ try {
+ return tx.call();
+ } finally {
+ _hydrationLatch.release(1);
}
}
- } finally {
- _hydrationLatch.release(1);
+
+ });
+
+ }
+
+ private void execInstanceEvent(WorkEvent we) {
+ BpelInstanceWorker worker = _instanceWorkerCache.get(we.getIID());
+ assert worker.isWorkerThread();
+
+ ProcessInstanceDAO procInstance = getProcessDAO().getInstance(we.getIID());
+ if (procInstance == null) {
+ if (__log.isDebugEnabled()) {
+ __log.debug("handleWorkEvent: no ProcessInstance found with iid " + we.getIID() + "; ignoring.");
+ }
+ return;
+ }
+
+ BpelRuntimeContextImpl processInstance = new BpelRuntimeContextImpl(worker, procInstance, null, null);
+ switch (we.getType()) {
+ case TIMER:
+ if (__log.isDebugEnabled()) {
+ __log.debug("handleWorkEvent: TimerWork event for process instance " + processInstance);
+ }
+ processInstance.timerEvent(we.getChannel());
+ break;
+ case RESUME:
+ if (__log.isDebugEnabled()) {
+ __log.debug("handleWorkEvent: ResumeWork event for iid " + we.getIID());
+ }
+ processInstance.execute();
+ break;
+ case PARTNER_RESPONSE:
+ if (__log.isDebugEnabled()) {
+ __log.debug("InvokeResponse event for iid " + we.getIID());
+ }
+ processInstance.injectPartnerResponse(we.getMexId(), we.getChannel());
+ processInstance.execute();
+ break;
+ case MATCHER:
+ if (__log.isDebugEnabled()) {
+ __log.debug("Matcher event for iid " + we.getIID());
+ }
+ processInstance.matcherEvent(we.getCorrelatorId(), we.getCorrelationKey());
}
}
@@ -661,7 +747,24 @@
_hydrationLatch.latch(1);
try {
- MyRoleMessageExchangeImpl mex = new ReliableMyRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId());
+ MyRoleMessageExchangeImpl mex;
+ switch (istyle) {
+ case RELIABLE:
+ mex = new ReliableMyRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId());
+ break;
+ case ASYNC:
+ mex = new AsyncMyRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId());
+ break;
+ case TRANSACTED:
+ mex = new TransactedMyRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId());
+ break;
+ case BLOCKING:
+ mex = new BlockingMyRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId());
+ break;
+ default:
+ throw new AssertionError("Unexpected invocation style: " + istyle);
+
+ }
OPartnerLink plink = (OPartnerLink) _oprocess.getChild(mexdao.getPartnerLinkModelId());
PortType ptype = plink.myRolePortType;
Operation op = plink.getMyRoleOperation(mexdao.getOperation());
@@ -682,25 +785,24 @@
Operation op = plink.getPartnerRoleOperation(mexdao.getOperation());
switch (istyle) {
case BLOCKING:
- mex = new BlockingPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), ptype, op,
- null, /* EPR todo */
- plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
+ mex = new BlockingPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), ptype, op, null, /* EPR todo */
+ plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
break;
case ASYNC:
- mex = new AsyncPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), ptype, op,
- null, /* EPR todo */
- plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
+ mex = new AsyncPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), ptype, op, null, /* EPR todo */
+ plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
break;
case TRANSACTED:
- mex = new TransactedPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), ptype, op,
- null, /* EPR todo */
- plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
+ mex = new TransactedPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), ptype, op, null, /*
+ * EPR
+ * todo
+ */
+ plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
break;
case RELIABLE:
- mex = new ReliablePartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), ptype, op,
- null, /* EPR todo */
- plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
+ mex = new ReliablePartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), ptype, op, null, /* EPR todo */
+ plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
break;
default:
@@ -756,13 +858,6 @@
_lastUsed = System.currentTimeMillis();
}
- /** Create a version-appropriate runtime context. */
- BpelRuntimeContextImpl createRuntimeContext(ProcessInstanceDAO dao, PROCESS template,
- MessageExchangeDAO instantiatingMessageExchange) {
- return new BpelRuntimeContextImpl(this, dao, template, instantiatingMessageExchange);
-
- }
-
/**
* If necessary, create an object in the data store to represent the process. We'll re-use an existing object if it already
* exists and matches the GUID.
@@ -804,7 +899,7 @@
}
}
- private class HydrationLatch extends NStateLatch {
+ class HydrationLatch extends NStateLatch {
HydrationLatch() {
super(new Runnable[2]);
@@ -889,13 +984,13 @@
if (isInMemory()) {
bounceProcessDAO(_inMemDao.getConnection(), _pid, _pconf.getVersion(), _oprocess);
- } else if (_contexts.scheduler.isTransacted()) {
+ } else if (_contexts.isTransacted()) {
// If we have a transaction, we do this in the current transaction.
bounceProcessDAO(_contexts.dao.getConnection(), _pid, _pconf.getVersion(), _oprocess);
} else {
// If we do not have a transaction we need to create one.
try {
- _contexts.scheduler.execIsolatedTransaction(new Callable<Object>() {
+ _contexts.execTransaction(new Callable<Object>() {
public Object call() throws Exception {
bounceProcessDAO(_contexts.dao.getConnection(), _pid, _pconf.getVersion(), _oprocess);
return null;
@@ -921,6 +1016,83 @@
MessageExchangeDAO getInMemMexDAO(String mexId) {
return _inMemDao.getConnection().getMessageExchange(mexId);
+ }
+
+ /**
+ * Schedule process-level work. This method defers to the server to do the scheduling and wraps the {@link Runnable} in a
+ * try-finally block that ensures that the process is hydrated.
+ *
+ * @param runnable
+ */
+ public void scheduleRunnable(final Runnable runnable) {
+ if (__log.isDebugEnabled())
+ __log.debug("schedulingRunnable for process " + _pid + ": " + runnable);
+
+ _server.scheduleRunnable(new ProcessRunnable(runnable));
+ }
+
+
+ class TransactedRunnable implements Runnable {
+ Runnable _work;
+
+ TransactedRunnable(Runnable work) {
+ _work = work;
+ }
+
+ public void run() {
+ _contexts.execTransaction(_work);
+ }
+ }
+
+ class TransactedCallable<T> implements Callable<T> {
+ Callable<T> _work;
+
+ TransactedCallable(Callable<T> work) {
+ _work = work;
+ }
+
+ public T call() throws Exception {
+ return _contexts.execTransaction(_work);
+ }
+ }
+
+
+ class ProcessRunnable implements Runnable {
+ Runnable _work;
+
+ ProcessRunnable(Runnable work) {
+ _work = work;
+ }
+
+ public void run() {
+ _hydrationLatch.latch(1);
+ try {
+ _work.run();
+ } finally {
+ _hydrationLatch.release(1);
+ }
+
+ }
+
+ }
+
+ class ProcessCallable<T> implements Callable<T> {
+ Callable<T> _work;
+
+ ProcessCallable(Callable<T> work) {
+ _work = work;
+ }
+
+ public T call () throws Exception {
+ _hydrationLatch.latch(1);
+ try {
+ return _work.call();
+ } finally {
+ _hydrationLatch.release(1);
+ }
+
+ }
+
}
}
Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcessDatabase.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcessDatabase.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcessDatabase.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcessDatabase.java Tue Jul 24 13:58:12 2007
@@ -18,53 +18,53 @@
*/
package org.apache.ode.bpel.engine;
-import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
-import org.apache.ode.bpel.dao.ProcessDAO;
-import org.apache.ode.bpel.iapi.Scheduler;
-
import javax.xml.namespace.QName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
+import org.apache.ode.bpel.dao.ProcessDAO;
/**
* Encapusulates access to a BPEL process database.
*/
class BpelProcessDatabase extends BpelDatabase {
- static Log __log = LogFactory.getLog(BpelProcessDatabase.class);
+ static Log __log = LogFactory.getLog(BpelProcessDatabase.class);
+
+ private QName _processId;
+
+ /**
+ * Constructor.
+ *
+ * @param sscf
+ * BPEL state store connection factory
+ * @param txm
+ * JTA transaction manager
+ * @param processId
+ * name of the process
+ */
+ BpelProcessDatabase(Contexts contexts, QName processId) {
+ super(contexts);
+ _processId = processId;
+ }
+
+ QName getProcessId() {
+ return _processId;
+ }
- private QName _processId;
+ ProcessDAO getProcessDAO() {
+ return getConnection().getProcess(_processId);
- /**
- * Constructor.
- * @param sscf BPEL state store connection factory
- * @param txm JTA transaction manager
- * @param processId name of the process
- */
- BpelProcessDatabase(BpelDAOConnectionFactory sscf,
- Scheduler scheduler,
- QName processId) {
- super(sscf,scheduler);
- _processId = processId;
- }
-
- QName getProcessId() {
- return _processId;
- }
-
- ProcessDAO getProcessDAO() {
- return getConnection().getProcess(_processId);
-
- }
-
- abstract class Callable<T> implements BpelDatabase.Callable<T> {
- public T exec() throws Exception {
- return BpelProcessDatabase.this.exec(this);
}
- protected ProcessDAO getProcessDAO() {
- return BpelProcessDatabase.this.getProcessDAO();
+ abstract class Callable<T> implements BpelDatabase.Callable<T> {
+ public T exec() throws Exception {
+ return BpelProcessDatabase.this.exec(this);
+ }
+
+ protected ProcessDAO getProcessDAO() {
+ return BpelProcessDatabase.this.getProcessDAO();
+ }
}
- }
}