You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/07/24 22:58:14 UTC

svn commit: r559204 [1/3] - in /incubator/ode/branches/bart: ./ axis2/src/main/java/org/apache/ode/axis2/ bpel-api/src/main/java/org/apache/ode/bpel/iapi/ bpel-epr/src/main/java/org/apache/ode/il/ bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ ...

Author: mszefler
Date: Tue Jul 24 13:58:12 2007
New Revision: 559204

URL: http://svn.apache.org/viewvc?view=rev&rev=559204
Log:
BART, scheduling / transaction management refactor.


Modified:
    incubator/ode/branches/bart/Rakefile
    incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java
    incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
    incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
    incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
    incubator/ode/branches/bart/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelDatabase.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcessDatabase.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionFactoryImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
    incubator/ode/branches/bart/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
    incubator/ode/branches/bart/bpel-test/src/main/java/org/apache/ode/test/MessageExchangeContextImpl.java
    incubator/ode/branches/bart/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java
    incubator/ode/branches/bart/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessageExchange.java
    incubator/ode/branches/bart/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
    incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeContext.java
    incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java

Modified: incubator/ode/branches/bart/Rakefile
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/Rakefile?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/Rakefile (original)
+++ incubator/ode/branches/bart/Rakefile Tue Jul 24 13:58:12 2007
@@ -15,7 +15,7 @@
 #    limitations under the License.
 #
 
-gem "buildr", "~>1.1"
+gem "buildr", "=1.1.3"
 require "buildr"
 # require "buildr/lib/buildr"
 require "buildr/xmlbeans.rb"
@@ -33,7 +33,7 @@
 AXIOM               = group("axiom-api", "axiom-impl", "axiom-dom", :under=>"org.apache.ws.commons.axiom", :version=>"1.2.4")
 AXIS2               = "org.apache.axis2:axis2:jar:1.1.1"
 AXIS2_WAR           = "org.apache.axis2:axis2:war:1.1.1"
-AXIS2_ALL           = group("axis2", "axis2-adb", "axis2-codegen", "axis2-tools",
+AXIS2_ALL           = group("axis2-adb", "axis2-codegen", "axis2-tools",
                         "axis2-java2wsdl", "axis2-jibx", "axis2-saaj", "axis2-xmlbeans",
                         :under=>"org.apache.axis2", :version=>"1.1.1")
 AXIS2_PATCHED       = "org.apache.axis2:axis2-kernel-intalio:jar:1.1.1b"
@@ -71,7 +71,8 @@
   :persistence      =>"javax.persistence:persistence-api:jar:1.0",
   :servlet          =>"org.apache.geronimo.specs:geronimo-servlet_2.4_spec:jar:1.0",
   :stream           =>"stax:stax-api:jar:1.0.1",
-  :transaction      =>"org.apache.geronimo.specs:geronimo-jta_1.0.1B_spec:jar:1.0"
+  :transaction      =>"org.apache.geronimo.specs:geronimo-jta_1.0.1B_spec:jar:1.0",
+  :resource			=>"javax.resource:connector:jar:1.0"
 )
 JAXEN               = "jaxen:jaxen:jar:1.1-beta-8"
 JBI                 = "org.apache.servicemix:servicemix-jbi:jar:3.1-incubating"
@@ -80,7 +81,6 @@
 LOG4J               = "log4j:log4j:jar:1.2.13"
 OPENJPA             = ["org.apache.openjpa:openjpa-all:jar:#{Buildr::OpenJPA::VERSION}",
                        "net.sourceforge.serp:serp:jar:1.12.0"]
-QUARTZ              = "quartz:quartz:jar:1.5.2"
 SAXON               = group("saxon", "saxon-xpath", "saxon-dom", :under=>"net.sf.saxon", :version=>"8.7")
 SERVICEMIX          = group("servicemix-core", "servicemix-shared", "servicemix-services",
                         :under=>"org.apache.servicemix", :version=>"3.1-incubating")
@@ -100,12 +100,19 @@
 XBEAN               = group("xbean-classloader", "xbean-kernel", "xbean-server", "xbean-spring",
                         :under=>"org.apache.xbean", :version=>"2.8")
 XMLBEANS            = "xmlbeans:xbean:jar:2.2.0"
+JOTM				= struct(
+  :jotm				=>"jotm:jotm:jar:2.0.10",
+  :carol			=>"org.objectweb.carol:carol:jar:2.0.5",
+  :jrmp				=>"jotm:jotm_jrmp_stubs:jar:2.0.10",
+  :howl				=>"howl:howl-logger:jar:0.1.11"
+)
 
 
 repositories.remote << "http://pxe.intalio.org/public/maven2"
 repositories.remote << "http://people.apache.org/repo/m2-incubating-repository"
 repositories.remote << "http://repo1.maven.org/maven2"
 repositories.remote << "http://people.apache.org/repo/m2-snapshot-repository"
+repositories.remote << "http://download.java.net/maven/2"
 repositories.deploy_to[:url] ||= "sftp://guest@localhost/home/guest"
 
 # Changing releases tag names
@@ -132,7 +139,7 @@
   desc "ODE Axis Integration Layer"
   define "axis2" do
     compile.with projects("bpel-api", "bpel-connector", "bpel-dao", "bpel-epr", "bpel-runtime",
-      "bpel-scheduler-quartz", "bpel-schemas", "bpel-store", "utils"),
+      "scheduler-simple", "bpel-schemas", "bpel-store", "utils"),
       AXIOM, AXIS2, COMMONS.logging, COMMONS.collections, DERBY, GERONIMO.kernel, GERONIMO.transaction,
       JAVAX.activation, JAVAX.servlet, JAVAX.stream, JAVAX.transaction, JENCKS, WSDL4J, WS_COMMONS.xml_schema,
       XMLBEANS
@@ -146,13 +153,13 @@
   desc "ODE Axis2 Based Web Application"
   define "axis2-war" do
     libs = projects("axis2", "bpel-api", "bpel-compiler", "bpel-connector", "bpel-dao",
-      "bpel-epr", "bpel-obj", "bpel-ql", "bpel-runtime", "bpel-scheduler-quartz",
+      "bpel-epr", "bpel-obj", "bpel-ql", "bpel-runtime", "scheduler-simple",
       "bpel-schemas", "bpel-store", "dao-hibernate", "jacob", "jca-ra", "jca-server",
       "utils", "dao-jpa"),
       AXIS2_ALL, AXIS2_PATCHED, ANNONGEN, BACKPORT, COMMONS.codec, COMMONS.collections, COMMONS.fileupload, COMMONS.httpclient,
       COMMONS.lang, COMMONS.logging, COMMONS.pool, DERBY, DERBY_TOOLS, JAXEN, JAVAX.activation, JAVAX.ejb, JAVAX.javamail,
       JAVAX.connector, JAVAX.jms, JAVAX.persistence, JAVAX.transaction, JAVAX.stream,  JIBX,
-      GERONIMO.connector, GERONIMO.kernel, GERONIMO.transaction, LOG4J, OPENJPA, QUARTZ, SAXON, TRANQL,
+      GERONIMO.connector, GERONIMO.kernel, GERONIMO.transaction, LOG4J, OPENJPA, SAXON, TRANQL,
       WOODSTOX, WSDL4J, WS_COMMONS.axiom, WS_COMMONS.neethi, WS_COMMONS.xml_schema, XALAN, XERCES, XMLBEANS
 
     package(:war).with(:libs=>libs).path("WEB-INF").tap do |web_inf|
@@ -214,7 +221,7 @@
   desc "ODE Interface Layers Common"
   define "bpel-epr" do
     compile.with projects("utils", "bpel-dao", "bpel-api"),
-      COMMONS.logging, DERBY, JAVAX.transaction, GERONIMO.transaction, GERONIMO.connector, TRANQL, JAVAX.connector
+      COMMONS.logging, DERBY, JAVAX.transaction, GERONIMO.transaction, GERONIMO.connector, TRANQL, JAVAX.connector, COMMONS.lang
     package :jar
   end
 
@@ -241,18 +248,20 @@
       "bpel-store", "jacob", "jacob-ap", "utils"),
       COMMONS.logging, COMMONS.collections, JAXEN, JAVAX.persistence, JAVAX.stream, SAXON, WSDL4J, XMLBEANS
 
-    test.compile.with projects("bpel-scheduler-quartz", "dao-jpa", "dao-hibernate", "bpel-epr"),
+    test.compile.with projects("scheduler-simple", "dao-jpa", "dao-hibernate", "bpel-epr"),
         BACKPORT, COMMONS.pool, COMMONS.lang, DERBY, JAVAX.connector, JAVAX.transaction,
         GERONIMO.transaction, GERONIMO.kernel, GERONIMO.connector, TRANQL, HSQLDB, JAVAX.ejb,
-        LOG4J, XERCES, Buildr::OpenJPA::REQUIRES, QUARTZ, XALAN
+        LOG4J, XERCES, Buildr::OpenJPA::REQUIRES, XALAN
     test.junit.with HIBERNATE, DOM4J
 
     package :jar
   end
 
-  desc "ODE Quartz Interface"
-  define "bpel-scheduler-quartz" do
-    compile.with projects("bpel-api", "utils"), COMMONS.collections, COMMONS.logging, JAVAX.transaction, QUARTZ
+  desc "ODE Simple Scheduler"
+  define "scheduler-simple" do
+    compile.with projects("bpel-api", "utils"), COMMONS.collections, COMMONS.logging, JAVAX.transaction
+	test.compile.with HSQLDB, JOTM.jotm
+	test.junit.with HSQLDB, JOTM.jotm, JOTM.carol, JOTM.jrmp, JAVAX.transaction, JOTM.howl, JAVAX.resource, JAVAX.connector, LOG4J
     package :jar
   end
 
@@ -287,7 +296,7 @@
       DERBY, Java::JUNIT_REQUIRES, JAVAX.persistence, OPENJPA, WSDL4J
 
     test.with projects("bpel-obj", "jacob", "bpel-schemas",
-      "bpel-scripts", "bpel-scheduler-quartz"),
+      "bpel-scripts", "scheduler-simple"),
       COMMONS.collections, COMMONS.lang, COMMONS.logging, DERBY, JAVAX.connector,
       JAVAX.stream, JAVAX.transaction, JAXEN, HSQLDB, LOG4J, SAXON, XERCES, XMLBEANS, XALAN
 
@@ -308,7 +317,7 @@
 
   desc "ODE Hibernate Compatible Databases"
   define "dao-hibernate-db" do
-    predefined_for = lambda { |name| _("src/main/sql/tables_#{name}.sql") }
+    predefined_for = lambda { |name| _("src/main/sql/simplesched-#{name}.sql") }
     properties_for = lambda { |name| _("src/main/sql/ode.#{name}.properties") }
 
     dao_hibernate = project("dao-hibernate").compile.target
@@ -352,13 +361,13 @@
   define "dao-jpa-ojpa-derby" do
     %w{ derby mysql }.each do |db|
       db_xml = _("src/main/descriptors/persistence.#{db}.xml")
-      quartz_sql = _("src/main/scripts/quartz-#{db}.sql")
+      scheduler_sql = _("src/main/scripts/simplesched-#{db}.sql")
       partial_sql = file("target/partial.#{db}.sql"=>db_xml) do |task|
         mkpath _("target"), :verbose=>false
         Buildr::OpenJPA.mapping_tool :properties=>db_xml, :action=>"build", :sql=>task.name,
           :classpath=>projects("bpel-store", "dao-jpa", "bpel-api", "bpel-dao", "utils" )
       end
-      sql = concat(_("target/#{db}.sql")=>[partial_sql, quartz_sql])
+      sql = concat(_("target/#{db}.sql")=>[partial_sql, scheduler_sql])
       build sql
     end
     derby_db = Derby.create(_("target/derby/jpadb")=>_("target/derby.sql"))
@@ -389,7 +398,7 @@
   desc "ODE JBI Integration Layer"
   define "jbi" do
     compile.with projects("bpel-api", "bpel-connector", "bpel-dao", "bpel-epr", "bpel-obj",
-      "bpel-runtime", "bpel-scheduler-quartz", "bpel-schemas", "bpel-store", "utils"),
+      "bpel-runtime", "scheduler-simple", "bpel-schemas", "bpel-store", "utils"),
       COMMONS.logging, COMMONS.pool, JAVAX.transaction, JBI, LOG4J, WSDL4J, XERCES
 
     package(:jar)
@@ -397,11 +406,11 @@
       libs = artifacts(package(:jar),
         projects("bpel-api", "bpel-api-jca", "bpel-compiler", "bpel-connector", "bpel-dao",
         "bpel-epr", "jca-ra", "jca-server", "bpel-obj", "bpel-ql", "bpel-runtime",
-        "bpel-scheduler-quartz", "bpel-schemas", "bpel-store", "dao-hibernate", "dao-jpa",
+        "scheduler-simple", "bpel-schemas", "bpel-store", "dao-hibernate", "dao-jpa",
         "jacob", "jacob-ap", "utils"),
         ANT, BACKPORT, COMMONS.codec, COMMONS.collections, COMMONS.dbcp, COMMONS.lang, COMMONS.pool,
         COMMONS.primitives, JAXEN, JAVAX.connector, JAVAX.ejb, JAVAX.jms,
-        JAVAX.persistence, JAVAX.stream, JAVAX.transaction, LOG4J, OPENJPA, QUARTZ, SAXON, TRANQL,
+        JAVAX.persistence, JAVAX.stream, JAVAX.transaction, LOG4J, OPENJPA, SAXON, TRANQL,
         XALAN, XMLBEANS, XSTREAM, WSDL4J)
 
       jbi.component :type=>:service_engine, :name=>"OdeBpelEngine", :description=>self.comment
@@ -416,7 +425,7 @@
       "jca-server", "jacob"),
       BACKPORT, COMMONS.lang, COMMONS.collections, DERBY, GERONIMO.connector, GERONIMO.kernel,
       GERONIMO.transaction, JAVAX.connector, JAVAX.ejb, JAVAX.persistence, JAVAX.stream,
-      JAVAX.transaction, JAXEN, JBI, OPENJPA, QUARTZ, SAXON, SERVICEMIX, SPRING, TRANQL,
+      JAVAX.transaction, JAXEN, JBI, OPENJPA, SAXON, SERVICEMIX, SPRING, TRANQL,
       XALAN, XBEAN, XMLBEANS, XSTREAM
     test.junit.using :properties=>{ "jbi.install"=>_("target/smixInstallDir"),  "jbi.examples"=>_("../distro-jbi/src/examples") }
     test.setup unzip(_("target/smixInstallDir/install/ODE")=>project("dao-jpa-ojpa-derby").package(:zip))
@@ -462,3 +471,54 @@
 
 end
 
+define "apache-ode" do
+  [:version, :group, :manifest, :meta_inf].each { |prop| send "#{prop}=", project("ode").send(prop) }
+
+  def distro(project, id)
+    project.package(:zip, :id=>id).path("#{id}-#{version}").tap do |zip|
+      zip.include meta_inf + ["RELEASE_NOTES", "README"].map { |f| path_to(f) }
+      zip.path("examples").include project.path_to("src/examples"), :as=>"."
+      zip.merge project("ode:tools-bin").package(:zip)
+      zip.path("lib").include artifacts(COMMONS.logging, COMMONS.codec, COMMONS.httpclient,
+        COMMONS.pool, COMMONS.collections, JAXEN,
+        SAXON, LOG4J, WSDL4J, XALAN, XERCES)
+      project("ode").projects("utils", "tools", "bpel-compiler", "bpel-api", "bpel-obj", "bpel-schemas").
+        map(&:packages).flatten.each do |pkg|
+        zip.include(pkg.to_s, :as=>"#{pkg.id}.#{pkg.type}", :path=>"lib")
+      end
+      yield zip
+    end
+  end
+
+  desc "ODE Axis2 Based Distribution"
+  define "distro-axis2" do
+    parent.distro(self, "#{parent.id}-war") { |zip| zip.include project("ode:axis2-war").package(:war), :as=>"ode.war" }
+
+    project("ode:axis2-war").task("start").enhance do |task|
+      target = "#{task.path}/webapp/WEB-INF/processes"
+      puts "Deploying processes to #{target}" if verbose
+      verbose(false) do
+        mkpath target
+        cp_r FileList[_("src/examples/*")].to_a, target
+        rm Dir.glob("#{target}/*.deployed")
+      end
+    end
+  end
+
+  desc "ODE JBI Based Distribution"
+  define "distro-jbi" do
+    parent.distro(self, "#{parent.id}-jbi") { |zip| zip.include project("ode:jbi").package(:zip) }
+  end
+
+  package(:zip, :id=>"#{id}-sources").path("#{id}-sources-#{version}").tap do |zip|
+    if File.exist?(".svn")
+      `svn status -v`.reject { |l| l[0] == ?? || l[0] == ?D }.
+        map { |l| l.split.last }.reject { |f| File.directory?(f) }.
+        each { |f| zip.include f, :as=>f }
+    else
+      zip.include Dir.pwd, :as=>"."
+    end
+  end
+
+  package :zip, :id=>"#{id}-docs", :include=>javadoc(project("ode").projects).target
+end

Modified: incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java (original)
+++ incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java Tue Jul 24 13:58:12 2007
@@ -19,13 +19,19 @@
 
 package org.apache.ode.axis2;
 
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.ContextException;
+import org.apache.ode.bpel.iapi.EndpointReference;
 import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.iapi.MessageExchangeContext;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
+import org.apache.ode.bpel.iapi.PartnerRoleChannel;
 import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
 
 /**
@@ -36,7 +42,17 @@
 public class MessageExchangeContextImpl implements MessageExchangeContext {
 
     private static final Log __log = LogFactory.getLog(MessageExchangeContextImpl.class);
-
+    
+    /** The currently supported invocation styles. */
+    private static final Set<InvocationStyle> __supportedInvocationStyles;
+    
+    static {
+        HashSet<InvocationStyle> styles = new HashSet<InvocationStyle>();
+        styles.add(InvocationStyle.ASYNC);
+        styles.add(InvocationStyle.BLOCKING);
+        __supportedInvocationStyles = Collections.unmodifiableSet(styles);
+    }
+    
     public MessageExchangeContextImpl(ODEServer server) {
     }
 
@@ -75,7 +91,20 @@
     }
 
     public void onReliableReply(MyRoleMessageExchange myRoleMex) throws BpelEngineException {
+        __log.error("RELIABLE reply from service " + myRoleMex.getServiceName() +"; RELIABLE IS NOT SUPPORTED!");
+
         // We don't support this yet, so not much to do here. 
+    }
+
+
+    public void cancel(PartnerRoleMessageExchange mex) throws ContextException {
+        // TODO Auto-generated method stub
+        
+    }
+
+
+    public Set<InvocationStyle> getSupportedInvocationStyle(PartnerRoleChannel prc, EndpointReference partnerEpr) {
+        return __supportedInvocationStyles;
     }
 
 }

Modified: incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java (original)
+++ incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java Tue Jul 24 13:58:12 2007
@@ -52,10 +52,11 @@
 import org.apache.ode.bpel.iapi.ProcessStoreEvent;
 import org.apache.ode.bpel.iapi.ProcessStoreListener;
 import org.apache.ode.bpel.iapi.Scheduler;
-import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl;
-import org.apache.ode.bpel.scheduler.quartz.QuartzSchedulerImpl;
 import org.apache.ode.il.dbutil.Database;
+import org.apache.ode.scheduler.simple.JdbcDelegate;
+import org.apache.ode.scheduler.simple.SimpleScheduler;
 import org.apache.ode.store.ProcessStoreImpl;
+import org.apache.ode.utils.GUID;
 import org.apache.ode.utils.fs.TempFileManager;
 
 /**
@@ -291,7 +292,7 @@
     public ODEService createService(ProcessConf pconf, QName serviceName, String portName) throws AxisFault {
         destroyService(serviceName, portName);
         AxisService axisService = ODEAxisService.createService(_axisConfig, pconf, serviceName, portName);
-        ODEService odeService = new ODEService(axisService, pconf.getDefinitionForService(serviceName), serviceName, portName, _server, _txMgr);
+        ODEService odeService = new ODEService(axisService, pconf.getDefinitionForService(serviceName), serviceName, portName, _server);
         if (_odeConfig.isReplicateEmptyNS()) {
             __log.debug("Setting service with empty namespace replication");
             odeService.setReplicateEmptyNS(true);
@@ -425,10 +426,9 @@
     }
 
     protected Scheduler createScheduler() {
-        QuartzSchedulerImpl scheduler = new QuartzSchedulerImpl();
+        SimpleScheduler scheduler = new SimpleScheduler(new GUID().toString(),new JdbcDelegate(_db.getDataSource()));
         scheduler.setTransactionManager(_txMgr);
-        scheduler.setDataSource(_db.getDataSource());
-        scheduler.init();
+
         return scheduler;
     }
 
@@ -442,7 +442,6 @@
         _scheduler.setJobProcessor(_server);
 
         _server.setDaoConnectionFactory(_daoCF);
-        _server.setInMemDaoConnectionFactory(new BpelDAOConnectionFactoryImpl(_scheduler));
         _server.setEndpointReferenceContext(new EndpointReferenceContextImpl(this));
         _server.setMessageExchangeContext(new MessageExchangeContextImpl(this));
         _server.setBindingContext(new BindingContextImpl(this, _store));

Modified: incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ODEService.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ODEService.java (original)
+++ incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/ODEService.java Tue Jul 24 13:58:12 2007
@@ -19,10 +19,6 @@
 
 package org.apache.ode.axis2;
 
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import javax.transaction.TransactionManager;
 import javax.wsdl.Definition;
 import javax.wsdl.Port;
 import javax.wsdl.Service;
@@ -44,6 +40,7 @@
 import org.apache.ode.bpel.epr.WSAEndpoint;
 import org.apache.ode.bpel.iapi.BpelServer;
 import org.apache.ode.bpel.iapi.EndpointReference;
+import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.iapi.Message;
 import org.apache.ode.bpel.iapi.MessageExchange;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
@@ -54,8 +51,8 @@
 import org.w3c.dom.Element;
 
 /**
- * A running service, encapsulates the Axis service, its receivers and our
- * receivers as well.
+ * A running service, encapsulates the Axis service, its receivers and our receivers as well.
+ * 
  * @author Matthieu Riou <mriou at apache dot org>
  */
 public class ODEService {
@@ -65,7 +62,6 @@
 
     private AxisService _axisService;
     private BpelServer _server;
-    private TransactionManager _txManager;
     private Definition _wsdlDef;
     private QName _serviceName;
     private String _portName;
@@ -73,11 +69,9 @@
     private boolean _isReplicateEmptyNS = false;
     private SoapMessageConverter _converter;
 
-    public ODEService(AxisService axisService, Definition def, QName serviceName, String portName, BpelServer server,
-                      TransactionManager txManager) throws AxisFault {
+    public ODEService(AxisService axisService, Definition def, QName serviceName, String portName, BpelServer server) throws AxisFault {
         _axisService = axisService;
         _server = server;
-        _txManager = txManager;
         _wsdlDef = def;
         _serviceName = serviceName;
         _portName = portName;
@@ -88,119 +82,64 @@
 
     public void onAxisMessageExchange(MessageContext msgContext, MessageContext outMsgContext, SOAPFactory soapFactory)
             throws AxisFault {
-        boolean success = true;
         MyRoleMessageExchange odeMex = null;
-        Future responseFuture = null;
         try {
-            _txManager.begin();
-            if (__log.isDebugEnabled()) __log.debug("Starting transaction.");
-
             // Creating mesage exchange
             String messageId = new GUID().toString();
-            odeMex = _server.getEngine().createMessageExchange("" + messageId, _serviceName,
-                    msgContext.getAxisOperation().getName().getLocalPart());
+            odeMex = _server.createMessageExchange(InvocationStyle.BLOCKING, _serviceName,
+                    msgContext.getAxisOperation().getName().getLocalPart(), "" + messageId);
+            
             __log.debug("ODE routed to operation " + odeMex.getOperation() + " from service " + _serviceName);
 
-            if (odeMex.getOperation() != null) {
-                // Preparing message to send to ODE
-                Element msgEl = DOMUtils.newDocument().createElementNS(null, "message");
-                msgEl.getOwnerDocument().appendChild(msgEl);
-                _converter.parseSoapRequest(msgEl, msgContext.getEnvelope(), odeMex.getOperation());
-                Message odeRequest = odeMex.createMessage(odeMex.getOperation().getInput().getMessage().getQName());
-                readHeader(msgContext, odeMex);
-                odeRequest.setMessage(msgEl);
-
-                if (__log.isDebugEnabled()) {
-                    __log.debug("Invoking ODE using MEX " + odeMex);
-                    __log.debug("Message content:  " + DOMUtils.domToString(odeRequest.getMessage()));
-                }
-
-                // Invoke ODE
-                responseFuture = odeMex.invoke(odeRequest);
-
-                __log.debug("Commiting ODE MEX " + odeMex);
-                try {
-                    if (__log.isDebugEnabled()) __log.debug("Commiting transaction.");
-                    _txManager.commit();
-                } catch (Exception e) {
-                    __log.error("Commit failed", e);
-                    success = false;
-                }
-            } else {
-                success = false;
+            if (odeMex.getOperation() == null) {
+                String errmsg = "Call to " + _serviceName + "." + odeMex.getOperationName() + " was not routable.";
+                __log.error(errmsg);
+                throw new OdeFault(errmsg);
+            }
+
+            // Preparing message to send to ODE
+            Element msgEl = DOMUtils.newDocument().createElementNS(null, "message");
+            msgEl.getOwnerDocument().appendChild(msgEl);
+            _converter.parseSoapRequest(msgEl, msgContext.getEnvelope(), odeMex.getOperation());
+            Message odeRequest = odeMex.createMessage(odeMex.getOperation().getInput().getMessage().getQName());
+            readHeader(msgContext, odeMex);
+            odeRequest.setMessage(msgEl);
+
+            if (__log.isDebugEnabled()) {
+                __log.debug("Invoking ODE using MEX " + odeMex);
+                __log.debug("Message content:  " + DOMUtils.domToString(odeRequest.getMessage()));
             }
-        } catch (Exception e) {
-            __log.error("Exception occured while invoking ODE", e);
-            success = false;
-            throw new OdeFault("An exception occured while invoking ODE.", e);
-        } finally {
-            if (!success) {
-                if (odeMex != null) odeMex.release();
-                try {
-                    _txManager.rollback();
-                } catch (Exception e) {
-                    throw new OdeFault("Rollback failed", e);
-                }
-            }
-        }
 
-        if (odeMex.getOperation().getOutput() != null) {
-            // Waits for the response to arrive
+            odeMex.setRequest(odeRequest);
+            // odeMex.setTimeout(TIMEOUT);
             try {
-                responseFuture.get(TIMEOUT, TimeUnit.MILLISECONDS);
-            } catch (Exception e) {
-                String errorMsg = "Timeout or execution error when waiting for response to MEX "
-                        + odeMex + " " + e.toString();
-                __log.error(errorMsg);
-                throw new OdeFault(errorMsg);
+                odeMex.invokeBlocking();
+            } catch (java.util.concurrent.TimeoutException te) {
+                String errmsg = "Call to " + _serviceName + "." + odeMex.getOperationName() + " timed out.";
+                __log.error(errmsg);
+                throw new OdeFault(errmsg);         
             }
-
-            if (outMsgContext != null) {
+            
+            if (odeMex.getOperation().getOutput() != null && outMsgContext != null) {
                 SOAPEnvelope envelope = soapFactory.getDefaultEnvelope();
                 outMsgContext.setEnvelope(envelope);
 
                 // Hopefully we have a response
                 __log.debug("Handling response for MEX " + odeMex);
-                boolean commit = false;
-                try {
-                    if (__log.isDebugEnabled()) __log.debug("Starting transaction.");
-                    _txManager.begin();
-                } catch (Exception ex) {
-                    throw new OdeFault("Error starting transaction!", ex);
-                }
+                onResponse(odeMex, outMsgContext);
+            }
+
+        } catch (Exception e) {
+            String errmsg = "Call to " + _serviceName + "." + odeMex.getOperationName() + " caused an exception.";
+            __log.error(errmsg, e);
+            throw new OdeFault(errmsg, e);         
+        } finally {
+            if (odeMex != null)
                 try {
-                    // Refreshing the message exchange
-                    odeMex = (MyRoleMessageExchange) _server.getEngine().getMessageExchange(odeMex.getMessageExchangeId());
-                    onResponse(odeMex, outMsgContext);
-                    commit = true;
-                } catch (AxisFault af) {
-                    __log.error("Error processing response for MEX " + odeMex, af);
-                    commit = true;
-                    throw af;
-                } catch (Exception e) {
-                    __log.error("Error processing response for MEX " + odeMex, e);
-                    throw new OdeFault("An exception occured when invoking ODE.", e);
-                } finally {
                     odeMex.release();
-                    if (commit) {
-                        try {
-                            if (__log.isDebugEnabled()) __log.debug("Comitting transaction.");
-                            _txManager.commit();
-                        } catch (Exception e) {
-                            throw new OdeFault("Commit failed!", e);
-                        }
-                    } else {
-                        try {
-                            _txManager.rollback();
-                        } catch (Exception ex) {
-                            throw new OdeFault("Rollback failed!", ex);
-                        }
-                    }
+                } catch (Exception ex) {
+                    __log.error("Error releasing message exchange: " + odeMex.getMessageExchangeId());
                 }
-            }
-            if (!success) {
-                throw new OdeFault("Message was either unroutable or timed out!");
-            }
         }
     }
 
@@ -235,8 +174,7 @@
     }
 
     /**
-     * Extracts endpoint information from Axis MessageContext (taken from WSA
-     * headers) to stuff them into ODE mesage exchange.
+     * Extracts endpoint information from Axis MessageContext (taken from WSA headers) to stuff them into ODE mesage exchange.
      */
     private void readHeader(MessageContext msgContext, MyRoleMessageExchange odeMex) {
         Object otse = msgContext.getProperty("targetSessionEndpoint");
@@ -262,9 +200,8 @@
     }
 
     /**
-     * Handle callback endpoints for the case where partner contact process
-     * my-role which results in an "updated" my-role EPR due to session id
-     * injection.
+     * Handle callback endpoints for the case where partner contact process my-role which results in an "updated" my-role EPR due to
+     * session id injection.
      */
     private void writeHeader(MessageContext msgContext, MyRoleMessageExchange odeMex) {
         EndpointReference targetEPR = odeMex.getEndpointReference();
@@ -286,8 +223,8 @@
     }
 
     /**
-     * Return the service-ref element that will be used to represent this
-     * endpoint.
+     * Return the service-ref element that will be used to represent this endpoint.
+     * 
      * @return my service endpoint
      */
     public EndpointReference getMyServiceRef() {
@@ -296,7 +233,7 @@
 
     /**
      * Get the EPR of this service from the WSDL.
-     *
+     * 
      * @param name
      *            service name
      * @param portName
@@ -335,7 +272,7 @@
 
     /**
      * Create-and-copy a service-ref element.
-     *
+     * 
      * @param elmt
      * @return wrapped element
      */

Modified: incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java (original)
+++ incubator/ode/branches/bart/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java Tue Jul 24 13:58:12 2007
@@ -45,14 +45,7 @@
             throws ContextException ;
 
 
-    /**
-     * Schedule a volatile (non-persisted) job. Volatile jobs should not be
-     * saved in the database and should not survive system crash. 
-     *
-     * @param transacted 
-     * @return unique (as far as the scheduler is concerned) job identifier
-     */
-    <T> Future<T> scheduleVolatileJob(boolean transacted, final Callable<T> call) throws ContextException;
+    void jobCompleted(String jobId);
 
     /**
      * Make a good effort to cancel the job. If its already running no big
@@ -61,62 +54,12 @@
      */
     void cancelJob(String jobId) throws ContextException;
 
-    /**
-     * Execute a {@link Callable} in a transactional context. If the callable
-     * throws an exception, then the transaction will be rolled back, otherwise
-     * the transaction will commit.
-     *
-     * @param <T> return type
-     * @param transaction transaction to execute
-     * @return result
-     * @throws Exception
-     */
-    <T> T execTransaction(Callable<T> transaction)
-            throws Exception, ContextException;
-
-    /**
-     * Same as execTransaction but executes in a different thread to guarantee
-     * isolation from the main execution thread.
-     * @param transaction
-     * @return
-     * @throws Exception
-     * @throws ContextException
-     */
-    <T> Future<T> execIsolatedTransaction(final Callable<T> transaction)
-            throws Exception, ContextException;
-
-    /**
-     * @return true if the current thread is associated with a transaction.
-     */
-    boolean isTransacted();
-
-    /**
-     * Register a transaction synchronizer.
-     * @param synch synchronizer
-     * @throws ContextException
-     */
-    void registerSynchronizer(Synchronizer synch) throws ContextException;
-
     void start();
 
     void stop();
 
     void shutdown();
 
-    public interface Synchronizer {
-        /**
-         * Called after the transaction is completed.
-         * @param success indicates whether the transaction was comitted
-         */
-        void afterCompletion(boolean success);
-
-        /**
-         * Called before the transaction is completed.
-         */
-        void beforeCompletion();
-
-    }
-
     /**
      * Interface implemented by the object responsible for job execution.
      * @author mszefler
@@ -166,6 +109,7 @@
         }
 
     }
+
 
 
 }

Modified: incubator/ode/branches/bart/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java (original)
+++ incubator/ode/branches/bart/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java Tue Jul 24 13:58:12 2007
@@ -19,123 +19,72 @@
 
 package org.apache.ode.il;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.ode.bpel.iapi.ContextException;
-import org.apache.ode.bpel.iapi.Scheduler;
-
-import javax.transaction.Status;
-import javax.transaction.Synchronization;
-import javax.transaction.SystemException;
-import javax.transaction.TransactionManager;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.transaction.Synchronization;
+import javax.transaction.TransactionManager;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.iapi.ContextException;
+import org.apache.ode.bpel.iapi.Scheduler;
 
 /**
+ * 
  * @author Matthieu Riou <mriou at apache dot org>
+ * 
+ * - BART refactor: Removed transaction management logic. 
+ * @author Maciej Szefler <mszefler at gmail dot com> 
  */
 public class MockScheduler implements Scheduler {
 
     private static final Log __log = LogFactory.getLog(MockScheduler.class);
 
     private JobProcessor _processor;
-    private ExecutorService _executorSvc = Executors.newCachedThreadPool();
-    private ThreadLocal<Boolean> _transacted = new ThreadLocal<Boolean>();
-    private TransactionManager _txm;
 
-    public MockScheduler() {
-        _transacted.set(false);
-    }
+    private ScheduledExecutorService _exec;
+
+    private TransactionManager _txm;
 
     public MockScheduler(TransactionManager txm) {
         _txm = txm;
-        _transacted.set(false);
     }
 
-    ThreadLocal<List<Synchronizer>> _synchros = new ThreadLocal<List<Scheduler.Synchronizer>>() {
+    ThreadLocal<List<Synchronization>> _synchros = new ThreadLocal<List<Synchronization>>() {
         @Override
-        protected List<Synchronizer> initialValue() {
-            return new ArrayList<Synchronizer>();
+        protected List<Synchronization> initialValue() {
+            return new ArrayList<Synchronization>();
         }
     };
 
-    public String schedulePersistedJob(Map<String, Object> detail, Date date) throws ContextException {
-        if (date != null) {
-            try {
-                while(new Date().before(date)) { Thread.sleep(100); }
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-        }
-        return scheduleVolatileJob(true, detail);
-    }
-
-    public String scheduleVolatileJob(final boolean transacted, final Map<String, Object> detail) throws ContextException {
-        registerSynchronizer(new Synchronizer() {
-            public void afterCompletion(boolean success) {
-                try {
-                    if (transacted) {
-                        execIsolatedTransaction(new Callable() {
-                            public Object call() throws Exception {
-                                JobInfo ji = new JobInfo("volatileJob", detail, 0);
-                                doExecute(ji);
-                                return null;
-                            }
-                        });
-                    } else {
-                        JobInfo ji = new JobInfo("volatileJob", detail, 0);
+    public String schedulePersistedJob(final Map<String, Object> detail, final Date date) throws ContextException {
+        registerSynchronizer(new Synchronization() {
+            public void afterCompletion(int status) {
+                long delay = Math.max(0, date.getTime() - System.currentTimeMillis());
+                _exec.schedule(new Callable<Void>() {
+                    public Void call() throws Exception {
+                        JobInfo ji = new JobInfo("job" + System.currentTimeMillis(), detail, 0);
                         doExecute(ji);
+                        return null;
                     }
-                } catch (Exception e) {
-                    throw new ContextException("Failure when starting a new volatile job.", e);
-                }
+                }, delay, TimeUnit.MILLISECONDS);
+            }
+
+            public void beforeCompletion() {
             }
-            public void beforeCompletion() { }
         });
         return null;
-    }
-
-    public void cancelJob(String arg0) throws ContextException {
 
     }
 
-    public <T> T execTransaction(Callable<T> transaction) throws Exception, ContextException {
-        begin();
-        try {
-            T retval = transaction.call();
-            commit();
-            return retval;
-        } catch (Throwable t) {
-            __log.error("Caught an exception during transaction", t);
-            rollback();
-            throw new ContextException("Error in tx", t);
-        }
-    }
-
-    public <T> Future<T> execIsolatedTransaction(final Callable<T> transaction) throws Exception, ContextException {
-        return _executorSvc.submit(new Callable<T>() {
-            public T call() throws Exception {
-                return execTransaction(transaction);
-            }
-        });
-    }
+    public void cancelJob(String arg0) throws ContextException {
 
-    public boolean isTransacted() {
-        if (_txm != null) {
-            try {
-                return _txm.getTransaction() != null;
-            } catch (SystemException e) {
-                __log.error("Exception in mock scheduler isTransacted.", e);
-                throw new RuntimeException(e);
-            }
-        }
-        else return _transacted.get();
     }
 
     public void start() {
@@ -147,90 +96,15 @@
     public void shutdown() {
     }
 
-    public void registerSynchronizer(final Synchronizer synch) throws ContextException {
-        if (_txm != null) {
-            try {
-                _txm.getTransaction().registerSynchronization(new Synchronization() {
-                    public void beforeCompletion() {
-                        synch.beforeCompletion();
-                    }
-                    public void afterCompletion(int status) {
-                        synch.afterCompletion(status == Status.STATUS_COMMITTED);
-                    }
-                });
-            } catch (Exception e) {
-                __log.error("Exception in mock scheduler sync registration.", e);
-                throw new RuntimeException(e);
-            }
-        } else {
-            _synchros.get().add(synch);
-        }
-    }
-
-    public void begin() {
-        if (_txm != null) {
-            try {
-                _txm.begin();
-            } catch (Exception e) {
-                __log.error("Exception in mock scheduler begin.", e);
-                throw new RuntimeException(e);
-            }
-        } else {
-            if (_transacted.get() == Boolean.TRUE)
-                throw new RuntimeException("Transaction active.");
-            _synchros.get().clear();
-        }
-        _transacted.set(Boolean.TRUE);
-    }
-
-    public void commit() {
-        if (_txm != null) {
-            try {
-                _txm.commit();
-            } catch (Exception e) {
-                __log.error("Exception in mock scheduler commit.", e);
-                throw new RuntimeException(e);
-            }
-        } else {
-            for (Synchronizer s : _synchros.get())
-                try {
-                    s.beforeCompletion();
-                } catch (Throwable t) {
-                }
-            for (Synchronizer s : _synchros.get())
-                try {
-                    s.afterCompletion(true);
-                } catch (Throwable t) {
-                }
-
-            _synchros.get().clear();
+    private void registerSynchronizer(final Synchronization synch) throws ContextException {
+        try {
+            _txm.getTransaction().registerSynchronization(synch);
+        } catch (Exception e) {
+            __log.error("Exception in mock scheduler sync registration.", e);
+            throw new RuntimeException(e);
         }
-        _transacted.set(Boolean.FALSE);
     }
 
-    public void rollback() {
-        if (_txm != null) {
-            try {
-                _txm.rollback();
-            } catch (Exception e) {
-                __log.error("Exception in mock scheduler rollback.", e);
-                throw new RuntimeException(e);
-            }
-        } else {
-            for (Synchronizer s : _synchros.get())
-                try {
-                    s.beforeCompletion();
-                } catch (Throwable t) {
-                }
-            for (Synchronizer s : _synchros.get())
-                try {
-                    s.afterCompletion(false);
-                } catch (Throwable t) {
-                }
-            _synchros.get().clear();
-        }
-        _transacted.set(Boolean.FALSE);
-    }
 
     private void doExecute(JobInfo ji) {
         JobProcessor processor = _processor;
@@ -247,7 +121,7 @@
         _processor = processor;
     }
 
-    public void setExecutorSvc(ExecutorService executorSvc) {
-        _executorSvc = executorSvc;
+    public void jobCompleted(String jobId) {
+
     }
 }

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java Tue Jul 24 13:58:12 2007
@@ -32,7 +32,7 @@
 
     @Override
     protected void resumeInstance() {
-        assert !_contexts.scheduler.isTransacted() : "checkReplyContext() should have prevented us from getting here.";
+        assert !_contexts.isTransacted() : "checkReplyContext() should have prevented us from getting here.";
         assert !_process.isInMemory() : "resumeInstance() for in-mem processes makes no sense.";
 
         final WorkEvent we = generateInvokeResponseWorkEvent();
@@ -56,7 +56,7 @@
         super.checkReplyContextOk();
 
         // Prevent user from attempting the replyXXXX calls while a transaction is active. 
-        if (!_ownerThread.get() && _contexts.scheduler.isTransacted())
+        if (!_ownerThread.get() && _contexts.isTransacted())
             throw new BpelEngineException("Cannot reply to ASYNC style invocation from a transactional context!");
         
 

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelDatabase.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelDatabase.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelDatabase.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelDatabase.java Tue Jul 24 13:58:12 2007
@@ -18,8 +18,6 @@
  */
 package org.apache.ode.bpel.engine;
 
-
-
 import javax.xml.namespace.QName;
 
 import org.apache.ode.bpel.dao.BpelDAOConnection;
@@ -33,52 +31,54 @@
  * Encapsulates transactional access to the BPEL database.
  */
 class BpelDatabase {
-  static Log __log = LogFactory.getLog(BpelDatabase.class);
+    static Log __log = LogFactory.getLog(BpelDatabase.class);
+
+    protected BpelDAOConnectionFactory _sscf;
 
-  protected BpelDAOConnectionFactory _sscf;
-  protected Scheduler _scheduler;
+    protected Contexts _contexts;
 
-  BpelDatabase(BpelDAOConnectionFactory sscf, Scheduler scheduler) {
-    if (sscf == null)
-      throw new NullPointerException("sscf is null!");
-    if (scheduler == null)
-      throw new NullPointerException("scheduler is null!");
-    
-    _sscf = sscf;
-    _scheduler = scheduler;
-    
-  }
-
-  /**
-   * Get a connection to the database with the correct store identifier.
-   * @return a state store connection
-   * @throws org.apache.ode.utils.dao.DConnectionException
-   */
-  BpelDAOConnection getConnection() {
-    // Note: this will give us a connection associated with the current
-    // transaction, so no need to worry about closing it.
-    return _sscf.getConnection();
-  }
-
-  BpelProcessDatabase getProcessDb(QName pid) {
-    return new BpelProcessDatabase(_sscf, _scheduler, pid);
-  }
-
-  /**
-   * Execute a self-contained database transaction.
-   * @param callable database transaction
-   * @return
-   * @throws DConnectionException
-   */
-  <T> T exec(final Callable<T> callable) throws Exception {
-    return _scheduler.execTransaction(new java.util.concurrent.Callable<T>() {
-      public T call() throws Exception {
-        return callable.run(_sscf.getConnection());
-      }
-    });
-  }
-
-  interface Callable<T> {
-     public T run(BpelDAOConnection conn) throws Exception;
-  }
+    BpelDatabase(Contexts contexts) {
+        if (contexts == null)
+            throw new NullPointerException("scheduler is null!");
+
+        _sscf = contexts.dao;
+        _contexts = contexts;
+
+    }
+
+    /**
+     * Get a connection to the database with the correct store identifier.
+     * 
+     * @return a state store connection
+     * @throws org.apache.ode.utils.dao.DConnectionException
+     */
+    BpelDAOConnection getConnection() {
+        // Note: this will give us a connection associated with the current
+        // transaction, so no need to worry about closing it.
+        return _sscf.getConnection();
+    }
+
+    BpelProcessDatabase getProcessDb(QName pid) {
+        return new BpelProcessDatabase(_contexts, pid);
+    }
+
+    /**
+     * Execute a self-contained database transaction.
+     * 
+     * @param callable
+     *            database transaction
+     * @return
+     * @throws DConnectionException
+     */
+    <T> T exec(final Callable<T> callable) throws Exception {
+        return _contexts.execTransaction(new java.util.concurrent.Callable<T>() {
+            public T call() throws Exception {
+                return callable.run(_sscf.getConnection());
+            }
+        });
+    }
+
+    interface Callable<T> {
+        public T run(BpelDAOConnection conn) throws Exception;
+    }
 }

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Tue Jul 24 13:58:12 2007
@@ -19,7 +19,6 @@
 package org.apache.ode.bpel.engine;
 
 import java.io.InputStream;
-import java.sql.Date;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -29,7 +28,7 @@
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Future;
 
 import javax.wsdl.Operation;
 import javax.wsdl.PortType;
@@ -53,9 +52,9 @@
 import org.apache.ode.bpel.iapi.MessageExchange;
 import org.apache.ode.bpel.iapi.PartnerRoleChannel;
 import org.apache.ode.bpel.iapi.ProcessConf;
-import org.apache.ode.bpel.iapi.Scheduler;
 import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
 import org.apache.ode.bpel.iapi.MessageExchange.Status;
+import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
 import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
 import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
 import org.apache.ode.bpel.intercept.InterceptorInvoker;
@@ -126,12 +125,11 @@
     private final List<MessageExchangeInterceptor> _mexInterceptors = new ArrayList<MessageExchangeInterceptor>();
 
     /** Latch-like thing to control hydration/dehydration. */
-    private HydrationLatch _hydrationLatch;
+    HydrationLatch _hydrationLatch;
 
     protected Contexts _contexts;
 
-    /** Manage instance-level locks. */
-    private final InstanceLockManager _instanceLockManager = new InstanceLockManager();
+    final BpelInstanceWorkerCache _instanceWorkerCache = new BpelInstanceWorkerCache(this);
 
     private final Set<InvocationStyle> _invocationStyles;
 
@@ -141,12 +139,20 @@
 
     final BpelServerImpl _server;
 
+    /** Indicates whether we are operating in a server-managed transaction. */
+    private ThreadLocal<Boolean> _serverTx = new ThreadLocal<Boolean>() {
+        @Override
+        protected Boolean initialValue() {
+            return Boolean.FALSE;
+        }
+    };
+
     public BpelProcess(BpelServerImpl server, ProcessConf conf, BpelEventListener debugger) {
         _server = server;
         _pid = conf.getProcessId();
         _pconf = conf;
         _hydrationLatch = new HydrationLatch();
-        _inMemDao = new BpelDAOConnectionFactoryImpl(_contexts.scheduler);
+        _inMemDao = new BpelDAOConnectionFactoryImpl(_contexts.txManager);
 
         // TODO : do this on a per-partnerlink basis, support transacted styles.
         HashSet<InvocationStyle> istyles = new HashSet<InvocationStyle>();
@@ -167,8 +173,9 @@
         if (__log.isDebugEnabled())
             __log.debug("Recovering activity in process " + instanceDAO.getInstanceId() + " with action " + action);
         markused();
-        BpelRuntimeContextImpl processInstance = createRuntimeContext(instanceDAO, null, null);
-        processInstance.recoverActivity(channel, activityId, action, fault);
+        throw new AssertionError("TODO: fixme");// TODO
+        // BpelRuntimeContextImpl processInstance = createRuntimeContext(instanceDAO, null, null);
+        // processInstance.recoverActivity(channel, activityId, action, fault);
     }
 
     static String generateMessageExchangeIdentifier(String partnerlinkName, String operationName) {
@@ -183,7 +190,9 @@
      * 
      * @param mex
      */
-    void invokeProcess(MessageExchangeDAO mexdao) {
+    void invokeProcess(final MessageExchangeDAO mexdao) {
+        InvocationStyle istyle = InvocationStyle.valueOf(mexdao.getInvocationStyle());
+
         _hydrationLatch.latch(1);
         try {
             PartnerLinkMyRoleImpl target = getMyRoleForService(mexdao.getCallee());
@@ -205,7 +214,29 @@
             // }
 
             markused();
-            target.invokeMyRole(mexdao);
+            CorrelationStatus cstatus = target.invokeMyRole(mexdao);
+            if (cstatus == null) {
+                ; // do nothing
+            } else if (cstatus == CorrelationStatus.CREATE_INSTANCE) {
+                doInstanceWork(mexdao.getInstance().getInstanceId(), new Callable<Void>() {
+                    public Void call() {
+                        executeCreateInstance(mexdao);
+                        return null;
+                    }
+                });
+
+            } else if (cstatus == CorrelationStatus.MATCHED) {
+                doInstanceWork(mexdao.getInstance().getInstanceId(), new Callable<Void>() {
+                    public Void call() {
+                        executeContinueInstance(mexdao);
+                        return null;
+                    }
+                });
+
+            } else if (cstatus == CorrelationStatus.QUEUED) {
+                ; // do nothing
+            }
+            // TODO: handle correlation status (i.e. execute instance).
         } finally {
             _hydrationLatch.release(1);
         }
@@ -216,6 +247,66 @@
         // }
     }
 
+    void executeCreateInstance(MessageExchangeDAO mexdao) {
+        BpelInstanceWorker worker = _instanceWorkerCache.get(mexdao.getInstance().getInstanceId());
+        assert worker.isWorkerThread();
+        BpelRuntimeContextImpl instanceCtx = new BpelRuntimeContextImpl(worker, mexdao.getInstance(), new PROCESS(_oprocess),
+                mexdao);
+        instanceCtx.execute();
+    }
+
+    void executeContinueInstance(MessageExchangeDAO mexdao) {
+        BpelInstanceWorker worker = _instanceWorkerCache.get(mexdao.getInstance().getInstanceId());
+        assert worker.isWorkerThread();
+        BpelRuntimeContextImpl instance = new BpelRuntimeContextImpl(worker, mexdao.getInstance(), null, null);
+        int amp = mexdao.getChannel().indexOf('&');
+        String groupId = mexdao.getChannel().substring(0, amp);
+        int idx = Integer.valueOf(mexdao.getChannel().substring(amp + 1));
+        instance.inputMsgMatch(groupId, idx, mexdao);
+        instance.execute();
+    }
+
+    
+
+    private void enqueueInstanceWork(Long instanceId, Runnable runnable) {
+        BpelInstanceWorker iworker = _instanceWorkerCache.get(instanceId);
+        iworker.enqueue(new ProcessRunnable(runnable));
+    }
+
+    private void enqueueInstanceTransaction(Long instanceId, final Runnable runnable) {
+        enqueueInstanceWork(instanceId, new ProcessRunnable(new TransactedRunnable(runnable)));
+    }
+    
+    /**
+     * Schedule work for a given instance; work will occur if transaction commits.
+     * 
+     * @param instanceId
+     * @param name
+     */
+    private void scheduleInstanceWork(final Long instanceId, final Runnable runnable) {
+        _contexts.registerCommitSynchronizer(new Runnable() {
+            public void run() {
+                BpelInstanceWorker iworker = _instanceWorkerCache.get(instanceId);
+                iworker.enqueue(new ProcessRunnable(runnable));
+            }
+        });
+
+    }
+    
+    private void scheduleInstanceTX(Long instanceId, final Runnable transaction) {
+        scheduleInstanceWork(instanceId, new TransactedRunnable(transaction));
+    }
+
+    private <T> T doInstanceWork(Long instanceId, final Callable<T> callable) {
+        try {
+            BpelInstanceWorker iworker = _instanceWorkerCache.get(instanceId);
+            return iworker.execInCurrentThread(new ProcessCallable<T>(callable));
+
+        } catch (Exception ex) {
+            throw new BpelEngineException(ex);
+        }
+    }
+
     private PartnerLinkMyRoleImpl getMyRoleForService(QName serviceName) {
         for (Map.Entry<Endpoint, PartnerLinkMyRoleImpl> e : getEndpointToMyRoleMap().entrySet()) {
             if (e.getKey().serviceName.equals(serviceName))
@@ -323,116 +414,111 @@
         return true;
     }
 
-    /*
-     * // DONT PUT CODE HERE-need this method real tight in a try/catch block, we need to handle // all types of failure here, the
-     * scheduler is not going to know how to handle our errors, // ALSO we have to release the lock obtained above (IMPORTANT), lest
-     * the whole system come // to a grinding halt. try {
-     * 
-     * ProcessInstanceDAO instance; if (process.isInMemory()) instance =
-     * _contexts.inMemDao.getConnection().getInstance(we.getIID()); else instance =
-     * _contexts.dao.getConnection().getInstance(we.getIID());
-     * 
-     * if (instance == null) { __log.error(__msgs.msgScheduledJobReferencesUnknownInstance(we.getIID())); // nothing we can do, this
-     * instance is not in the database, it will // always // fail. return; } ProcessDAO processDao = instance.getProcess(); process =
-     * _activeProcesses.get(processDao.getProcessId());
-     * 
-     * process.handleWorkEvent(we.getDetail()); debuggingDelay(); } catch (BpelEngineException bee) {
-     * __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), bee); throw new Scheduler.JobProcessorException(bee,
-     * checkRetry(jobInfo, bee)); } catch (ContextException ce) { __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), ce);
-     * throw new Scheduler.JobProcessorException(ce, checkRetry(jobInfo, ce)); } catch (RuntimeException rte) {
-     * __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), rte); throw new Scheduler.JobProcessorException(rte,
-     * checkRetry(jobInfo, rte)); } catch (Throwable t) { __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), t); throw new
-     * Scheduler.JobProcessorException(false);
-     * 
-     * 
-     */
     /**
-     * @throws JobProcessorException 
+     * Handle a work event; this method is called from the scheduler thread and should be very quick, i.e. any serious work needs to
+     * be handed off to a separate thread.
+     * 
+     * @throws JobProcessorException
      * @see org.apache.ode.bpel.engine.BpelProcess#handleWorkEvent(java.util.Map<java.lang.String,java.lang.Object>)
      */
-    public void handleWorkEvent(JobInfo jobInfo) throws JobProcessorException {
-        _hydrationLatch.latch(1);
-        try {
-            markused();
+    public void handleWorkEvent(final JobInfo jobInfo) throws JobProcessorException {
+        assert !_contexts.isTransacted() : "work events must be received outside of a transaction";
+
+        markused();
+
+        final WorkEvent we = new WorkEvent(jobInfo.jobDetail);
+        if (__log.isDebugEnabled()) {
+            __log.debug(ObjectPrinter.stringifyMethodEnter("handleWorkEvent", new Object[] { "jobInfo", jobInfo }));
+        }
 
+        // Process-level events
+        if (we.getType().equals(WorkEvent.Type.MYROLE_INVOKE)) {
+            // second stage of my-role invoke for BLOCKING/ASYNC/RELIABLE invocation style.
             if (__log.isDebugEnabled()) {
-                __log.debug(ObjectPrinter.stringifyMethodEnter("handleWorkEvent", new Object[] { "jobInfo", jobInfo }));
+                __log.debug("InvokeInternal event for mexid " + we.getMexId());
             }
 
-            final WorkEvent we = new WorkEvent(jobInfo.jobDetail);
-
-            // Process level events
-            if (we.getType().equals(WorkEvent.Type.MYROLE_INVOKE)) {
-                if (__log.isDebugEnabled()) {
-                    __log.debug("InvokeInternal event for mexid " + we.getMexId());
+            enqueueTransaction(new Callable<Void>() {
+                public Void call() {
+                    _contexts.scheduler.jobCompleted(jobInfo.jobName);
+                    MessageExchangeDAO mexdao = loadMexDao(we.getMexId());
+                    invokeProcess(mexdao);
+                    return null;
                 }
-                MessageExchangeDAO mexdao = loadMexDao(we.getMexId());
-                invokeProcess(mexdao);
-            } else {
-                // Instance level events
-                // We lock the instance to prevent concurrent transactions and prevent unnecessary rollbacks,
-                // Note that we don't want to wait too long here to get our lock, since we are likely holding
-                // on to scheduler's locks of various sorts.
-                try {
-                    _instanceLockManager.lock(we.getIID(), 1, TimeUnit.MICROSECONDS);
-                    _contexts.scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
-                        public void afterCompletion(boolean success) {
-                            _instanceLockManager.unlock(we.getIID());
-                        }
+            });
 
-                        public void beforeCompletion() {
-                        }
-                    });
-                } catch (InterruptedException e) {
-                    // Retry later.
-                    __log.debug("Thread interrupted, job will be rescheduled: " + jobInfo);
-                    throw new Scheduler.JobProcessorException(true);
-                } catch (org.apache.ode.bpel.engine.InstanceLockManager.TimeoutException e) {
-                    __log.debug("Instance " + we.getIID() + " is busy, rescheduling job.");
-                    // TODO: This should really be more of something like the exponential backoff algorithm in ethernet.
-                    _contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail, new Date(System.currentTimeMillis()
-                            + _random.nextInt(1000)));
-                    return;
-                }
-
-                ProcessInstanceDAO procInstance = getProcessDAO().getInstance(we.getIID());
-                if (procInstance == null) {
-                    if (__log.isDebugEnabled()) {
-                        __log.debug("handleWorkEvent: no ProcessInstance found with iid " + we.getIID() + "; ignoring.");
-                    }
-                    return;
+        } else /* instance-level events */{
+            enqueueInstanceTransaction(we.getIID(), new Runnable() {
+                public void run() {
+                    _contexts.scheduler.jobCompleted(jobInfo.jobName);
+                    execInstanceEvent(we);
                 }
 
-                BpelRuntimeContextImpl processInstance = createRuntimeContext(procInstance, null, null);
-                switch (we.getType()) {
-                case TIMER:
-                    if (__log.isDebugEnabled()) {
-                        __log.debug("handleWorkEvent: TimerWork event for process instance " + processInstance);
-                    }
-                    processInstance.timerEvent(we.getChannel());
-                    break;
-                case RESUME:
-                    if (__log.isDebugEnabled()) {
-                        __log.debug("handleWorkEvent: ResumeWork event for iid " + we.getIID());
-                    }
-                    processInstance.execute();
-                    break;
-                case PARTNER_RESPONSE:
-                    if (__log.isDebugEnabled()) {
-                        __log.debug("InvokeResponse event for iid " + we.getIID());
-                    }
-                    processInstance.injectPartnerResponse(we.getMexId(), we.getChannel());
-                    processInstance.execute();
-                    break;
-                case MATCHER:
-                    if (__log.isDebugEnabled()) {
-                        __log.debug("Matcher event for iid " + we.getIID());
-                    }
-                    processInstance.matcherEvent(we.getCorrelatorId(), we.getCorrelationKey());
+            });
+        }
+
+    }
+
+    /**
+     * Enqueue a transaction for execution by the engine.
+     * 
+     * @param tx
+     *            the transaction
+     */
+    private <T> Future<T> enqueueTransaction(final Callable<T> tx) {
+        // We have to wrap our transaction to make sure that we are hydrated when the transaction runs.
+        return _server.execIsolatedTransaction(new Callable<T>() {
+            public T call() throws Exception {
+                _hydrationLatch.latch(1);
+                try {
+                    return tx.call();
+                } finally {
+                    _hydrationLatch.release(1);
                 }
             }
-        } finally {
-            _hydrationLatch.release(1);
+
+        });
+
+    }
+
+    private void execInstanceEvent(WorkEvent we) {
+        BpelInstanceWorker worker = _instanceWorkerCache.get(we.getIID());
+        assert worker.isWorkerThread();
+
+        ProcessInstanceDAO procInstance = getProcessDAO().getInstance(we.getIID());
+        if (procInstance == null) {
+            if (__log.isDebugEnabled()) {
+                __log.debug("handleWorkEvent: no ProcessInstance found with iid " + we.getIID() + "; ignoring.");
+            }
+            return;
+        }
+
+        BpelRuntimeContextImpl processInstance = new BpelRuntimeContextImpl(worker, procInstance, null, null);
+        switch (we.getType()) {
+        case TIMER:
+            if (__log.isDebugEnabled()) {
+                __log.debug("handleWorkEvent: TimerWork event for process instance " + processInstance);
+            }
+            processInstance.timerEvent(we.getChannel());
+            break;
+        case RESUME:
+            if (__log.isDebugEnabled()) {
+                __log.debug("handleWorkEvent: ResumeWork event for iid " + we.getIID());
+            }
+            processInstance.execute();
+            break;
+        case PARTNER_RESPONSE:
+            if (__log.isDebugEnabled()) {
+                __log.debug("InvokeResponse event for iid " + we.getIID());
+            }
+            processInstance.injectPartnerResponse(we.getMexId(), we.getChannel());
+            processInstance.execute();
+            break;
+        case MATCHER:
+            if (__log.isDebugEnabled()) {
+                __log.debug("Matcher event for iid " + we.getIID());
+            }
+            processInstance.matcherEvent(we.getCorrelatorId(), we.getCorrelationKey());
         }
     }
 
@@ -661,7 +747,24 @@
 
         _hydrationLatch.latch(1);
         try {
-            MyRoleMessageExchangeImpl mex = new ReliableMyRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId());
+            MyRoleMessageExchangeImpl mex;
+            switch (istyle) {
+            case RELIABLE:
+                mex = new ReliableMyRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId());
+                break;
+            case ASYNC:
+                mex = new AsyncMyRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId());
+                break;
+            case TRANSACTED:
+                mex = new TransactedMyRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId());
+                break;
+            case BLOCKING:
+                mex = new BlockingMyRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId());
+                break;
+            default:
+                throw new AssertionError("Unexpected invocation style: " + istyle);
+
+            }
             OPartnerLink plink = (OPartnerLink) _oprocess.getChild(mexdao.getPartnerLinkModelId());
             PortType ptype = plink.myRolePortType;
             Operation op = plink.getMyRoleOperation(mexdao.getOperation());
@@ -682,25 +785,24 @@
             Operation op = plink.getPartnerRoleOperation(mexdao.getOperation());
             switch (istyle) {
             case BLOCKING:
-                mex = new BlockingPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), ptype, op,
-                        null, /* EPR todo */
-                        plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
+                mex = new BlockingPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), ptype, op, null, /* EPR todo */
+                plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
                 break;
             case ASYNC:
-                mex = new AsyncPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), ptype, op, 
-                        null, /* EPR todo */
-                        plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
+                mex = new AsyncPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), ptype, op, null, /* EPR todo */
+                plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
                 break;
 
             case TRANSACTED:
-                mex = new TransactedPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), ptype, op,
-                        null, /* EPR todo */
-                        plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
+                mex = new TransactedPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), ptype, op, null, /*
+                                                                                                                             * EPR
+                                                                                                                             * todo
+                                                                                                                             */
+                plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
                 break;
             case RELIABLE:
-                mex = new ReliablePartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), ptype, op, 
-                        null, /* EPR todo */
-                        plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
+                mex = new ReliablePartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), ptype, op, null, /* EPR todo */
+                plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
                 break;
 
             default:
@@ -756,13 +858,6 @@
         _lastUsed = System.currentTimeMillis();
     }
 
-    /** Create a version-appropriate runtime context. */
-    BpelRuntimeContextImpl createRuntimeContext(ProcessInstanceDAO dao, PROCESS template,
-            MessageExchangeDAO instantiatingMessageExchange) {
-        return new BpelRuntimeContextImpl(this, dao, template, instantiatingMessageExchange);
-
-    }
-
     /**
      * If necessary, create an object in the data store to represent the process. We'll re-use an existing object if it already
      * exists and matches the GUID.
@@ -804,7 +899,7 @@
         }
     }
 
-    private class HydrationLatch extends NStateLatch {
+    class HydrationLatch extends NStateLatch {
 
         HydrationLatch() {
             super(new Runnable[2]);
@@ -889,13 +984,13 @@
 
             if (isInMemory()) {
                 bounceProcessDAO(_inMemDao.getConnection(), _pid, _pconf.getVersion(), _oprocess);
-            } else if (_contexts.scheduler.isTransacted()) {
+            } else if (_contexts.isTransacted()) {
                 // If we have a transaction, we do this in the current transaction.
                 bounceProcessDAO(_contexts.dao.getConnection(), _pid, _pconf.getVersion(), _oprocess);
             } else {
                 // If we do not have a transaction we need to create one.
                 try {
-                    _contexts.scheduler.execIsolatedTransaction(new Callable<Object>() {
+                    _contexts.execTransaction(new Callable<Object>() {
                         public Object call() throws Exception {
                             bounceProcessDAO(_contexts.dao.getConnection(), _pid, _pconf.getVersion(), _oprocess);
                             return null;
@@ -921,6 +1016,83 @@
 
     MessageExchangeDAO getInMemMexDAO(String mexId) {
         return _inMemDao.getConnection().getMessageExchange(mexId);
+    }
+
+    /**
+     * Schedule process-level work. This method defers to the server to do the scheduling and wraps the {@link Runnable} in a
+     * try-finally block that ensures that the process is hydrated.
+     * 
+     * @param runnable
+     */
+    public void scheduleRunnable(final Runnable runnable) {
+        if (__log.isDebugEnabled())
+            __log.debug("schedulingRunnable for process " + _pid + ": " + runnable);
+
+        _server.scheduleRunnable(new ProcessRunnable(runnable));
+    }
+
+   
+    class TransactedRunnable implements Runnable {
+        Runnable _work;
+        
+        TransactedRunnable(Runnable work) {
+            _work = work;
+        }
+        
+        public void run() {
+            _contexts.execTransaction(_work);
+        } 
+    }
+    
+    class TransactedCallable<T> implements Callable<T> {
+        Callable<T> _work;
+        
+        TransactedCallable(Callable<T> work) {
+            _work = work;
+        }
+        
+        public T call() throws Exception {
+            return _contexts.execTransaction(_work);
+        } 
+    }
+    
+
+    class ProcessRunnable implements Runnable {
+        Runnable _work;
+        
+        ProcessRunnable(Runnable work) {
+            _work = work;
+        }
+        
+        public void run() {
+            _hydrationLatch.latch(1);
+            try {
+                _work.run();
+            } finally {
+                _hydrationLatch.release(1);
+            }
+            
+        }
+        
+    }
+    
+    class ProcessCallable<T> implements Callable<T> {
+        Callable<T> _work;
+        
+        ProcessCallable(Callable<T> work) {
+            _work = work;
+        }
+        
+        public T call ()  throws Exception  {
+            _hydrationLatch.latch(1);
+            try {
+                return _work.call();
+            } finally {
+                _hydrationLatch.release(1);
+            }
+            
+        }
+        
     }
 
 }

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcessDatabase.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcessDatabase.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcessDatabase.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcessDatabase.java Tue Jul 24 13:58:12 2007
@@ -18,53 +18,53 @@
  */
 package org.apache.ode.bpel.engine;
 
-import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
-import org.apache.ode.bpel.dao.ProcessDAO;
-import org.apache.ode.bpel.iapi.Scheduler;
-
 import javax.xml.namespace.QName;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
+import org.apache.ode.bpel.dao.ProcessDAO;
 
 /**
  * Encapusulates access to a BPEL process database.
  */
 class BpelProcessDatabase extends BpelDatabase {
-  static Log __log = LogFactory.getLog(BpelProcessDatabase.class);
+    static Log __log = LogFactory.getLog(BpelProcessDatabase.class);
+
+    private QName _processId;
+
+    /**
+     * Constructor.
+     * 
+     * @param sscf
+     *            BPEL state store connection factory
+     * @param txm
+     *            JTA transaction manager
+     * @param processId
+     *            name of the process
+     */
+    BpelProcessDatabase(Contexts contexts, QName processId) {
+        super(contexts);
+        _processId = processId;
+    }
+
+    QName getProcessId() {
+        return _processId;
+    }
 
-  private QName _processId;
+    ProcessDAO getProcessDAO() {
+        return getConnection().getProcess(_processId);
 
-  /**
-   * Constructor.
-   * @param sscf BPEL state store connection factory
-   * @param txm JTA transaction manager
-   * @param processId name of the process
-   */
-	BpelProcessDatabase(BpelDAOConnectionFactory sscf,
-                      Scheduler scheduler,
-                      QName processId) {
-    super(sscf,scheduler);
-    _processId = processId;
-	}
-  
-  QName getProcessId() {
-    return _processId;
-  }
-
-  ProcessDAO getProcessDAO() {
-    return getConnection().getProcess(_processId);
-
-  }
-
-  abstract class Callable<T> implements BpelDatabase.Callable<T> {
-    public T exec() throws Exception {
-      return BpelProcessDatabase.this.exec(this);
     }
 
-    protected ProcessDAO getProcessDAO() {
-      return BpelProcessDatabase.this.getProcessDAO();
+    abstract class Callable<T> implements BpelDatabase.Callable<T> {
+        public T exec() throws Exception {
+            return BpelProcessDatabase.this.exec(this);
+        }
+
+        protected ProcessDAO getProcessDAO() {
+            return BpelProcessDatabase.this.getProcessDAO();
+        }
     }
-  }
 
 }