You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mr...@apache.org on 2008/08/28 02:23:22 UTC

svn commit: r689679 - in /ode/branches/rtver: axis2-war/src/test/java/org/apache/ode/axis2/ bpel-runtime/src/main/java/org/apache/ode/bpel/engine/

Author: mriou
Date: Wed Aug 27 17:23:21 2008
New Revision: 689679

URL: http://svn.apache.org/viewvc?rev=689679&view=rev
Log:
Chanpagne! All tests pass. Just have to fix extensibility.

Modified:
    ode/branches/rtver/axis2-war/src/test/java/org/apache/ode/axis2/TestSimpleScenario.java
    ode/branches/rtver/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
    ode/branches/rtver/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeCache.java
    ode/branches/rtver/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java
    ode/branches/rtver/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java

Modified: ode/branches/rtver/axis2-war/src/test/java/org/apache/ode/axis2/TestSimpleScenario.java
URL: http://svn.apache.org/viewvc/ode/branches/rtver/axis2-war/src/test/java/org/apache/ode/axis2/TestSimpleScenario.java?rev=689679&r1=689678&r2=689679&view=diff
==============================================================================
--- ode/branches/rtver/axis2-war/src/test/java/org/apache/ode/axis2/TestSimpleScenario.java (original)
+++ ode/branches/rtver/axis2-war/src/test/java/org/apache/ode/axis2/TestSimpleScenario.java Wed Aug 27 17:23:21 2008
@@ -16,6 +16,7 @@
 
     }
 
+    
     public void testDynPartner() throws Exception {
         String bundleName = "TestDynPartner";
         if(!server._ode.getProcessStore().getPackages().contains(bundleName)) server.deployProcess(bundleName);

Modified: ode/branches/rtver/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/rtver/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?rev=689679&r1=689678&r2=689679&view=diff
==============================================================================
--- ode/branches/rtver/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original)
+++ ode/branches/rtver/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Wed Aug 27 17:23:21 2008
@@ -107,6 +107,9 @@
     /** Mapping from myrole service name to active process. */
     private final HashMap<QName, ODEProcess> _serviceMap = new HashMap<QName, ODEProcess>();
 
+    /** Weak-reference cache of all the my-role message exchange objects. */
+    private final MyRoleMessageExchangeCache _myRoleMexCache = new MyRoleMessageExchangeCache();
+
     private State _state = State.SHUTDOWN;
 
     Contexts _contexts = new Contexts();
@@ -345,7 +348,7 @@
 
             __log.debug("Registering process " + conf.getProcessId() + " with server.");
 
-            ODEProcess process = new ODEProcess(this, conf, null, buildRuntime(conf));
+            ODEProcess process = new ODEProcess(this, conf, null, buildRuntime(conf), _myRoleMexCache);
 
             for (Endpoint e : process.getServiceNames()) {
                 __log.debug("Register process: serviceId=" + e + ", process=" + process);
@@ -367,7 +370,8 @@
         // Relying on package naming conventions to find our runtime
         String qualifiedName = "org.apache.ode.bpel.rtrep.v" + conf.getRuntimeVersion() + ".RuntimeImpl";
         try {
-            return (OdeRuntime) Class.forName(qualifiedName).newInstance();
+            OdeRuntime runtime = (OdeRuntime) Class.forName(qualifiedName).newInstance();
+            return runtime;
         } catch (Exception e) {
             throw new RuntimeException("Couldn't instantiate ODE runtime version " + conf.getRuntimeVersion() +
                     ", either your process definition version is outdated or we have a bug.");

Modified: ode/branches/rtver/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeCache.java
URL: http://svn.apache.org/viewvc/ode/branches/rtver/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeCache.java?rev=689679&r1=689678&r2=689679&view=diff
==============================================================================
--- ode/branches/rtver/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeCache.java (original)
+++ ode/branches/rtver/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeCache.java Wed Aug 27 17:23:21 2008
@@ -3,6 +3,8 @@
 import java.lang.ref.WeakReference;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.ode.bpel.dao.MessageExchangeDAO;
 
@@ -16,29 +18,19 @@
 
     private static final int CLEANUP_PERIOD = 20;
 
-    private HashMap<String, WeakReference<MyRoleMessageExchangeImpl>> _cache = new HashMap<String, WeakReference<MyRoleMessageExchangeImpl>>();
+    private Map<String, WeakReference<MyRoleMessageExchangeImpl>> _cache = new ConcurrentHashMap<String, WeakReference<MyRoleMessageExchangeImpl>>();
 
     private int _inserts = 0;
 
-    private ODEProcess _process;
-
-    MyRoleMessageExchangeCache(ODEProcess process) {
-        _process = process;
-    }
-    
     void put(MyRoleMessageExchangeImpl mex) {
-        synchronized (this) {
-            ++_inserts;
-            if (_inserts > CLEANUP_PERIOD) {
-                cleanup();
-            }
-
-            WeakReference<MyRoleMessageExchangeImpl> ref = _cache.get(mex.getMessageExchangeId());
-            if (ref != null && ref.get() != null)
-                throw new IllegalStateException("InternalError: duplicate myrolemex registration!");
+        ++_inserts;
+        if (_inserts > CLEANUP_PERIOD) cleanup();
 
-            _cache.put(mex.getMessageExchangeId(), new WeakReference<MyRoleMessageExchangeImpl>(mex));
-        }
+        WeakReference<MyRoleMessageExchangeImpl> ref = _cache.get(mex.getMessageExchangeId());
+        if (ref != null && ref.get() != null)
+            throw new IllegalStateException("InternalError: duplicate myrolemex registration!");
+
+        _cache.put(mex.getMessageExchangeId(), new WeakReference<MyRoleMessageExchangeImpl>(mex));
     }
 
     /**
@@ -47,17 +39,15 @@
      * @param mexdao
      * @return
      */
-    MyRoleMessageExchangeImpl get(MessageExchangeDAO mexdao) {
-        synchronized (this) {
-            WeakReference<MyRoleMessageExchangeImpl> ref = _cache.get(mexdao.getMessageExchangeId());
-            MyRoleMessageExchangeImpl mex = ref == null ? null : ref.get();
-
-            if (mex == null) {
-                mex = _process.recreateMyRoleMex(mexdao);
-                _cache.put(mexdao.getMessageExchangeId(), new WeakReference<MyRoleMessageExchangeImpl>(mex));
-            }
-            return mex;
+    MyRoleMessageExchangeImpl get(MessageExchangeDAO mexdao, ODEProcess process) {
+        WeakReference<MyRoleMessageExchangeImpl> ref = _cache.get(mexdao.getMessageExchangeId());
+        MyRoleMessageExchangeImpl mex = ref == null ? null : ref.get();
+
+        if (mex == null) {
+            mex = process.recreateMyRoleMex(mexdao);
+            _cache.put(mexdao.getMessageExchangeId(), new WeakReference<MyRoleMessageExchangeImpl>(mex));
         }
+        return mex;
     }
 
     /**
@@ -67,8 +57,7 @@
     private void cleanup() {
         for (Iterator<WeakReference<MyRoleMessageExchangeImpl>> i = _cache.values().iterator(); i.hasNext();) {
             WeakReference<MyRoleMessageExchangeImpl> ref = i.next();
-            if (ref.get() == null)
-                i.remove();
+            if (ref.get() == null) i.remove();
         }
         _inserts = 0;
     }

Modified: ode/branches/rtver/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java
URL: http://svn.apache.org/viewvc/ode/branches/rtver/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java?rev=689679&r1=689678&r2=689679&view=diff
==============================================================================
--- ode/branches/rtver/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java (original)
+++ ode/branches/rtver/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ODEProcess.java Wed Aug 27 17:23:21 2008
@@ -137,15 +137,15 @@
 
     final BpelServerImpl _server;
 
-    /** Weak-reference cache of all the my-role message exchange objects. */
-    final private MyRoleMessageExchangeCache _myRoleMexCache = new MyRoleMessageExchangeCache(this);
+    private MyRoleMessageExchangeCache _myRoleMexCache;
 
     /** Deploy-time configuraton for external variables. */
     private ExternalVariableConf _extVarConf;
     
     private ExternalVariableManager _evm;
     
-    ODEProcess(BpelServerImpl server, ProcessConf conf, BpelEventListener debugger, OdeRuntime odeRuntime) {
+    ODEProcess(BpelServerImpl server, ProcessConf conf, BpelEventListener debugger,
+               OdeRuntime odeRuntime, MyRoleMessageExchangeCache mexCache) {
         _runtime = odeRuntime;
         _server = server;
         _pid = conf.getProcessId();
@@ -153,17 +153,14 @@
         _hydrationLatch = new HydrationLatch();
         _contexts = server._contexts;
         _inMemDao = new BpelDAOConnectionFactoryImpl(_contexts.txManager);
+        _myRoleMexCache = mexCache;
 
         // TODO : do this on a per-partnerlink basis, support transacted styles.
         HashSet<InvocationStyle> istyles = new HashSet<InvocationStyle>();
         istyles.add(InvocationStyle.UNRELIABLE);
 
-        if (!conf.isTransient()) {
-            istyles.add(InvocationStyle.RELIABLE);
-        } else {
-            istyles.add(InvocationStyle.TRANSACTED);
-        }
-
+        if (!conf.isTransient()) istyles.add(InvocationStyle.RELIABLE);
+        else istyles.add(InvocationStyle.TRANSACTED);
         _invocationStyles = Collections.unmodifiableSet(istyles);
     }
 
@@ -304,7 +301,8 @@
                     we.setType(WorkEvent.Type.MYROLE_INVOKE);
                     we.setIID(mexdao.getInstance().getInstanceId());
                     we.setMexId(mexdao.getMessageExchangeId());
-                    we.setProcessId(_pid);
+                    // Could be different to this pid when routing to an older version
+                    we.setProcessId(mexdao.getInstance().getProcess().getProcessId());
 
                     scheduleWorkEvent(we, null);
                 }
@@ -791,7 +789,6 @@
             break;
         default:
             throw new AssertionError("Unexpected invocation style: " + istyle);
-
         }
 
         _myRoleMexCache.put(mex);
@@ -806,7 +803,7 @@
      * @return client representation
      */
     MyRoleMessageExchangeImpl lookupMyRoleMex(MessageExchangeDAO mexdao) {
-        return _myRoleMexCache.get(mexdao); // this will re-create if necessary
+        return _myRoleMexCache.get(mexdao, this); // this will re-create if necessary
     }
 
     /**
@@ -1028,7 +1025,7 @@
             // Do an Async wakeup if we are in the ASYNC state. If we're not, we'll pick up the ACK when we unwind
             // the stack.
             if (old == Status.ASYNC) {
-                MyRoleMessageExchangeImpl mymex = _myRoleMexCache.get(mexdao);
+                MyRoleMessageExchangeImpl mymex = _myRoleMexCache.get(mexdao, this);
                 System.out.println("ON ASYNC ACK");
                 mymex.onAsyncAck(mexdao);
                 try {

Modified: ode/branches/rtver/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/rtver/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java?rev=689679&r1=689678&r2=689679&view=diff
==============================================================================
--- ode/branches/rtver/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java (original)
+++ ode/branches/rtver/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java Wed Aug 27 17:23:21 2008
@@ -138,6 +138,8 @@
             response = null;
         }
 
+        final UnreliableMyRoleMessageExchangeImpl self = this;
+        final ResponseFuture f = _future;
         // Lets be careful, the TX can still rollback!
         _process.scheduleRunnable(new Runnable() {
             public void run() {
@@ -145,6 +147,7 @@
                 _fault = fault;
                 _failureType = failureType;
                 _explanation = explanation;
+
                 ack(ackType);
                 System.out.println("FUTURE DONE.");
                 _future.done(Status.ACK);