You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by se...@apache.org on 2009/10/29 00:34:03 UTC

svn commit: r830795 - in /ode/branches/APACHE_ODE_1.X: axis2/src/main/java/org/apache/ode/axis2/ bpel-api/src/main/java/org/apache/ode/bpel/iapi/ bpel-dao/src/main/java/org/apache/ode/bpel/dao/ bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ bpe...

Author: seanahn
Date: Wed Oct 28 23:34:02 2009
New Revision: 830795

URL: http://svn.apache.org/viewvc?rev=830795&view=rev
Log:
ode-689, Take out the third transaction that loads up the my role mex and related data

Modified:
    ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
    ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchange.java
    ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java
    ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
    ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
    ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageDAOImpl.java
    ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImplTest.java
    ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java
    ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
    ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java

Modified: ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/ODEService.java?rev=830795&r1=830794&r2=830795&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/ODEService.java (original)
+++ ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/ODEService.java Wed Oct 28 23:34:02 2009
@@ -21,7 +21,6 @@
 
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.Map;
 
 import javax.transaction.TransactionManager;
 import javax.wsdl.Definition;
@@ -93,9 +92,10 @@
     }
     
     public String getName() {
-    	return _axisService.getName();
+        return _axisService.getName();
     }
     
+    @SuppressWarnings("unchecked")
     public void onAxisMessageExchange(MessageContext msgContext, MessageContext outMsgContext, SOAPFactory soapFactory)
             throws AxisFault {
         boolean success = true;
@@ -141,7 +141,7 @@
             success = false;
             String message = e.getMessage();
             if (message == null) {
-            	message = "An exception occured while invoking ODE.";
+                message = "An exception occured while invoking ODE.";
             }
             throw new OdeFault(message, e);
         } finally {
@@ -158,55 +158,33 @@
         if (odeMex.getOperation().getOutput() != null) {
             // Waits for the response to arrive
             try {
-                responseFuture.get(getTimeout(), TimeUnit.MILLISECONDS);
+                odeMex = (MyRoleMessageExchange)responseFuture.get(getTimeout(), TimeUnit.MILLISECONDS);
             } catch (Exception e) {
                 String errorMsg = "Timeout or execution error when waiting for response to MEX "
                         + odeMex + " " + e.toString();
                 __log.error(errorMsg, e);
+                
                 throw new OdeFault(errorMsg);
             }
-
+            
+            // this should not happen, if odeMex is null, you should always get the Timout exception
+            assert odeMex != null;
+            
             if (outMsgContext != null) {
                 SOAPEnvelope envelope = soapFactory.getDefaultEnvelope();
                 outMsgContext.setEnvelope(envelope);
 
                 // Hopefully we have a response
                 __log.debug("Handling response for MEX " + odeMex);
-                boolean commit = false;
-                try {
-                    if (__log.isDebugEnabled()) __log.debug("Starting transaction.");
-                    _txManager.begin();
-                } catch (Exception ex) {
-                    throw new OdeFault("Error starting transaction!", ex);
-                }
                 try {
-                    // Refreshing the message exchange
-                    odeMex = (MyRoleMessageExchange) _server.getEngine().getMessageExchange(odeMex.getMessageExchangeId());
                     onResponse(odeMex, outMsgContext);
-                    commit = true;
                 } catch (AxisFault af) {
                     __log.warn("MEX produced a fault " + odeMex, af);
-                    commit = true;
                     throw af;
                 } catch (Exception e) {
                     __log.error("Error processing response for MEX " + odeMex, e);
                     throw new OdeFault("An exception occured when invoking ODE.", e);
                 } finally {
-                    odeMex.release(commit);
-                    if (commit) {
-                        try {
-                            if (__log.isDebugEnabled()) __log.debug("Comitting transaction.");
-                            _txManager.commit();
-                        } catch (Exception e) {
-                            throw new OdeFault("Commit failed!", e);
-                        }
-                    } else {
-                        try {
-                            _txManager.rollback();
-                        } catch (Exception ex) {
-                            throw new OdeFault("Rollback failed!", ex);
-                        }
-                    }
                 }
             }
             if (!success) {

Modified: ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchange.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchange.java?rev=830795&r1=830794&r2=830795&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchange.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/MessageExchange.java Wed Oct 28 23:34:02 2009
@@ -21,6 +21,7 @@
 import javax.wsdl.Operation;
 import javax.wsdl.PortType;
 import javax.xml.namespace.QName;
+
 import java.util.Set;
 
 /**

Modified: ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java?rev=830795&r1=830794&r2=830795&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessInstanceDAO.java Wed Oct 28 23:34:02 2009
@@ -194,15 +194,9 @@
 
     /**
      * Delete the process instance object from the database.
-     */
-    void delete(Set<CLEANUP_CATEGORY> cleanupCategories);
-
-    /**
-     * Delete the process instance object from the database.
      * @param cleanupCategories the categories of entities to delete
-     * @param deleteMyRoleMex will clean up the my role mex if set to true
      */
-    void delete(Set<CLEANUP_CATEGORY> cleanupCategories, boolean deleteMyRoleMex);
+    void delete(Set<CLEANUP_CATEGORY> cleanupCategories);
 
     /**
      * Insert a BPEL event to the database (associating with this process).

Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?rev=830795&r1=830794&r2=830795&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Wed Oct 28 23:34:02 2009
@@ -227,7 +227,7 @@
             public void afterCompletion(boolean success) {
             }
             public void beforeCompletion() { 
-                _dao.delete(_bpelProcess.getCleanupCategories(false), false);
+                _dao.delete(_bpelProcess.getCleanupCategories(false));
             }
         });
     }
@@ -256,7 +256,7 @@
             public void afterCompletion(boolean success) {
             }
             public void beforeCompletion() { 
-                _dao.delete(_bpelProcess.getCleanupCategories(true), false);
+                _dao.delete(_bpelProcess.getCleanupCategories(true));
             }
         });
     }

Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?rev=830795&r1=830794&r2=830795&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java Wed Oct 28 23:34:02 2009
@@ -19,10 +19,7 @@
 
 package org.apache.ode.bpel.engine;
 
-import java.util.Date;
-import java.util.List;
 import java.util.Map;
-import java.util.ArrayList;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -33,6 +30,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.dao.MessageDAO;
 import org.apache.ode.bpel.dao.MessageExchangeDAO;
 import org.apache.ode.bpel.iapi.Message;
 import org.apache.ode.bpel.iapi.MessageExchange;
@@ -44,12 +42,12 @@
 import org.apache.ode.bpel.intercept.InterceptorInvoker;
 import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
 import org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorContext;
+import org.apache.ode.bpel.memdao.MessageDAOImpl;
+import org.apache.ode.bpel.memdao.MessageExchangeDAOImpl;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 
 public class MyRoleMessageExchangeImpl extends MessageExchangeImpl implements MyRoleMessageExchange {
-
-
     private static final Log __log = LogFactory.getLog(MyRoleMessageExchangeImpl.class);
     
     protected BpelProcess _process;
@@ -223,6 +221,7 @@
         public boolean cancel(boolean mayInterruptIfRunning) {
             throw new UnsupportedOperationException();
         }
+        
         public Object get() throws InterruptedException, ExecutionException {
             try {
                 return get(0, TimeUnit.MILLISECONDS);
@@ -231,19 +230,27 @@
                 throw new ExecutionException(e);
             }
         }
+        
         public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
             ResponseCallback callback = _waitingCallbacks.get(_clientId);
             if (callback != null) {
                 callback.waitResponse(timeout);
                 _done = true;
-                if (callback._timedout)
+
+                // There's the small window after the response times out, in which the myRoleMex is set, ignore that case
+                if (callback._timedout) {
                     throw new TimeoutException("Message exchange " + this + " timed out(" + timeout + " ms) when waiting for a response!");
+                }
+                
+                return callback._myRoleMex;
             }
             return null;
         }
+        
         public boolean isCancelled() {
             return false;
         }
+        
         public boolean isDone() {
             return _done;
         }
@@ -253,23 +260,69 @@
     protected void responseReceived() {
         final String cid = getClientId();
         _engine._contexts.scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
+            public void beforeCompletion() {
+                ResponseCallback callback = _waitingCallbacks.get(cid);
+                if (callback != null) {
+                    // Create a mem-backed message exchange and populate properties from the real one
+                    // this requires a valid persistent framework's session and thus 
+                    // should be done in the 'beforeCompletion()'
+                    MessageExchangeDAO mexDao = getDAO();
+                    if(mexDao != null) {
+                        callback._myRoleMex = createTransientMessageExchange(mexDao);
+                    } else if( __log.isDebugEnabled() ) __log.debug("The MEX Dao has been already released.");
+                }
+            }
+
             public void afterCompletion(boolean success) {
                 __log.debug("Received myrole mex response callback");
                 if( success ) {
                     ResponseCallback callback = _waitingCallbacks.remove(cid);
-                    if (callback != null) callback.responseReceived();
+                    if (callback != null) {
+                        callback.responseReceived();
+                    }
                 } else {
                     __log.warn("Transaction is rolled back on sending back the response.");
                 }
             }
-            public void beforeCompletion() {
-            }
         });
     }
 
+    private MyRoleMessageExchangeImpl createTransientMessageExchange(MessageExchangeDAO persistentDao) {
+        MessageExchangeDAOImpl dao = new MessageExchangeDAOImpl(persistentDao.getDirection(), getMessageExchangeId());
+        dao.setCorrelationId(persistentDao.getCorrelationId());
+        dao.setCorrelationStatus(persistentDao.getCorrelationStatus());
+        dao.setPattern(persistentDao.getPattern());
+        dao.setCallee(persistentDao.getCallee());
+        dao.setStatus(persistentDao.getStatus());
+        dao.setOperation(persistentDao.getOperation());
+        dao.setPipedMessageExchangeId(persistentDao.getPipedMessageExchangeId());
+        dao.setFault(getFault());
+
+        if(getResponse() != null) {
+            assert getResponse() instanceof MessageImpl;
+
+            MessageDAO responseDao = ((MessageImpl)getResponse())._dao;
+            responseDao = new MessageDAOImpl(null, responseDao.getType(), responseDao.getData(), responseDao.getHeader());
+            dao.setResponse(responseDao);
+        } else if(getFaultResponse() != null) {
+            assert getFaultResponse() instanceof MessageImpl;
+
+            MessageDAO responseDao = ((MessageImpl)getFaultResponse())._dao;
+            responseDao = new MessageDAOImpl(null, responseDao.getType(), responseDao.getData(), responseDao.getHeader());
+            dao.setResponse(responseDao);
+        }
+        
+        MyRoleMessageExchangeImpl memMexCopy =  new MyRoleMessageExchangeImpl(_process, _engine, dao);
+        memMexCopy.setPortOp(getPortType(), getOperation());
+        
+        return memMexCopy;
+    }
+
     static class ResponseCallback {
         private boolean _timedout;
         private boolean _waiting = true;
+        // this object does not have to be synchronized
+        private volatile MyRoleMessageExchangeImpl _myRoleMex;
 
         synchronized boolean responseReceived() {
             if (_timedout) {

Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageDAOImpl.java?rev=830795&r1=830794&r2=830795&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageDAOImpl.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageDAOImpl.java Wed Oct 28 23:34:02 2009
@@ -36,6 +36,13 @@
 		this.messageExchange = messageExchange;
 	}
 
+	public MessageDAOImpl(MessageExchangeDAO messageExchange, QName type, Element data, Element header) {
+		this.messageExchange = messageExchange;
+		this.type = type;
+		this.data = data;
+		this.header = header;
+	}
+
 	public void setType(QName type) {
 		this.type = type;
 	}

Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImplTest.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImplTest.java?rev=830795&r1=830794&r2=830795&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImplTest.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImplTest.java Wed Oct 28 23:34:02 2009
@@ -6,6 +6,7 @@
 import java.util.concurrent.TimeoutException;
 
 import javax.transaction.TransactionManager;
+import javax.xml.namespace.QName;
 
 import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
 import org.apache.ode.bpel.dao.MessageExchangeDAO;
@@ -23,7 +24,17 @@
     TransactionManager _txm;
     
     public void testResponseReceived() throws Exception {
-        mexDao.expects(exactly(3)).method("getCorrelationId").will(returnValue("corrId"));
+        mexDao.expects(atLeastOnce()).method("getDirection").will(returnValue(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE));
+        mexDao.expects(atLeastOnce()).method("getMessageExchangeId").will(returnValue("163"));
+        mexDao.expects(atLeastOnce()).method("getCorrelationId").will(returnValue("corrId"));
+        mexDao.expects(atLeastOnce()).method("getCorrelationStatus").will(returnValue("MATCHED"));
+        mexDao.expects(atLeastOnce()).method("getPattern").will(returnValue("PATTERN"));
+        mexDao.expects(atLeastOnce()).method("getCallee").will(returnValue(new QName("CALLEE")));
+        mexDao.expects(atLeastOnce()).method("getStatus").will(returnValue("10"));
+        mexDao.expects(atLeastOnce()).method("getOperation").will(returnValue("10"));
+        mexDao.expects(atLeastOnce()).method("getPipedMessageExchangeId").will(returnValue("163"));
+        mexDao.expects(atLeastOnce()).method("getFault").will(returnValue(null));
+        mexDao.expects(atLeastOnce()).method("getResponse").will(returnValue(null));
         
         final boolean[] responded = new boolean[1];
         myRoleMexImpl.callbacks().put("corrId", new ResponseCallback() {
@@ -52,7 +63,18 @@
     }
     
     public void testResponseTimeout() throws Exception {
+        mexDao.expects(atLeastOnce()).method("getDirection").will(returnValue(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE));
+        mexDao.expects(atLeastOnce()).method("getMessageExchangeId").will(returnValue("163"));
         mexDao.expects(atLeastOnce()).method("getCorrelationId").will(returnValue("corrId"));
+        mexDao.expects(atLeastOnce()).method("getCorrelationStatus").will(returnValue("MATCHED"));
+        mexDao.expects(atLeastOnce()).method("getPattern").will(returnValue("PATTERN"));
+        mexDao.expects(atLeastOnce()).method("getCallee").will(returnValue(new QName("CALLEE")));
+        mexDao.expects(atLeastOnce()).method("getStatus").will(returnValue("10"));
+        mexDao.expects(atLeastOnce()).method("getOperation").will(returnValue("10"));
+        mexDao.expects(atLeastOnce()).method("getPipedMessageExchangeId").will(returnValue("163"));
+        mexDao.expects(atLeastOnce()).method("getFault").will(returnValue(null));
+        mexDao.expects(atLeastOnce()).method("getResponse").will(returnValue(null));
+
         myRoleMexImpl.callbacks().put("corrId", new MyRoleMessageExchangeImpl.ResponseCallback());
 
         _txm.begin();

Modified: ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java?rev=830795&r1=830794&r2=830795&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java (original)
+++ ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java Wed Oct 28 23:34:02 2009
@@ -326,10 +326,6 @@
   }
 
   public void delete(Set<CLEANUP_CATEGORY> cleanupCategories) {
-      delete(cleanupCategories, true);
-  }
-  
-  public void delete(Set<CLEANUP_CATEGORY> cleanupCategories, boolean deleteMyRoleMex) {
     entering("ProcessInstanceDaoImpl.delete");
     if(__log.isDebugEnabled()) __log.debug("Cleaning up instance data with categories = " + cleanupCategories);
       
@@ -349,7 +345,7 @@
     }
 
     if( cleanupCategories.contains(CLEANUP_CATEGORY.MESSAGES) ) {
-      deleteMessages(instances, deleteMyRoleMex);
+      deleteMessages(instances);
     }
       
     if( cleanupCategories.contains(CLEANUP_CATEGORY.VARIABLES) ) {
@@ -390,37 +386,20 @@
   }
 
   @SuppressWarnings("unchecked")
-  private void deleteMessages(HProcessInstance[] instances, boolean deleteMyRoleMex) {
-      // Let's delete ALL mex properties here
-      List<Long> allMexes = getSession().getNamedQuery(HMessageExchange.SELECT_MEX_IDS_BY_INSTANCES).setParameterList("instances", instances).list();
-      deleteByColumn(HMessageExchangeProperty.class, "mex.id", allMexes);
-
-      if( deleteMyRoleMex ) { // Delete my role mex and partner role mexes
-          // delete message data
-          deleteByIds(HLargeData.class, getSession().getNamedQuery(HLargeData.SELECT_MESSAGE_LDATA_IDS_BY_INSTANCES_1).setParameterList("instances", instances).list());
-          deleteByIds(HLargeData.class, getSession().getNamedQuery(HLargeData.SELECT_MESSAGE_LDATA_IDS_BY_INSTANCES_2).setParameterList("instances", instances).list());
-
-          // delete messages
-          deleteByIds(HMessage.class, getSession().getNamedQuery(HMessage.SELECT_MESSAGE_IDS_BY_INSTANCES).setParameterList("instances", instances).list());
-          
-          // delete all mexes
-          deleteByIds(HMessageExchange.class, allMexes);
-      } else { // Delete only the unmatched mexes, there are chances that some unmatched messages are still there
-          // delete message data 
-          deleteByIds(HLargeData.class, getSession().getNamedQuery(HLargeData.SELECT_UNMATCHED_MESSAGE_LDATA_IDS_BY_INSTANCES_1).setParameterList("instances", instances).list());
-          deleteByIds(HLargeData.class, getSession().getNamedQuery(HLargeData.SELECT_UNMATCHED_MESSAGE_LDATA_IDS_BY_INSTANCES_2).setParameterList("instances", instances).list());
-
-          Collection<HMessageExchange> unmatchedMex = getSession().getNamedQuery(HMessageExchange.SELECT_UNMATCHED_MEX_BY_INSTANCES).setParameterList("instances", instances).list();
-          if( !unmatchedMex.isEmpty() ) {
-              List<Long> mexIdList = new ArrayList<Long>();
-              for( HMessageExchange mex : unmatchedMex ) {
-                  mexIdList.add(mex.getId());
-              }
-
-              // delete unmatched mexes
-              getSession().delete(unmatchedMex);
-          }
-      }
+  private void deleteMessages(HProcessInstance[] instances) {
+      // Let's delete mex properties here
+      List<Long> mexes = getSession().getNamedQuery(HMessageExchange.SELECT_MEX_IDS_BY_INSTANCES).setParameterList("instances", instances).list();
+      deleteByColumn(HMessageExchangeProperty.class, "mex.id", mexes);
+
+      // delete message data
+      deleteByIds(HLargeData.class, getSession().getNamedQuery(HLargeData.SELECT_MESSAGE_LDATA_IDS_BY_INSTANCES_1).setParameterList("instances", instances).list());
+      deleteByIds(HLargeData.class, getSession().getNamedQuery(HLargeData.SELECT_MESSAGE_LDATA_IDS_BY_INSTANCES_2).setParameterList("instances", instances).list());
+
+      // delete messages
+      deleteByIds(HMessage.class, getSession().getNamedQuery(HMessage.SELECT_MESSAGE_IDS_BY_INSTANCES).setParameterList("instances", instances).list());
+      
+      // delete mexes
+      deleteByIds(HMessageExchange.class, mexes);
 
       // Delete routes and unmatched messages
       deleteByIds(HCorrelatorMessage.class, getSession().getNamedQuery(HCorrelatorMessage.SELECT_CORMESSAGE_IDS_BY_INSTANCES).setParameterList("instances", instances).list());

Modified: ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java?rev=830795&r1=830794&r2=830795&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java (original)
+++ ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java Wed Oct 28 23:34:02 2009
@@ -53,13 +53,15 @@
 @Table(name="ODE_MESSAGE_EXCHANGE")
 @NamedQueries({
     @NamedQuery(name=MessageExchangeDAOImpl.DELETE_MEXS_BY_PROCESS, query="delete from MessageExchangeDAOImpl as m where m._process = :process"),
-    @NamedQuery(name=MessageExchangeDAOImpl.SELECT_MEX_IDS_BY_PROCESS, query="select m._id from MessageExchangeDAOImpl as m where m._process = :process")
+    @NamedQuery(name=MessageExchangeDAOImpl.SELECT_MEX_IDS_BY_PROCESS, query="select m._id from MessageExchangeDAOImpl as m where m._process = :process"),
+    @NamedQuery(name=MessageExchangeDAOImpl.SELECT_MEXES_BY_INSTANCE, query="select x from MessageExchangeDAOImpl as x where x._processInst = :instance")
 })
 public class MessageExchangeDAOImpl extends OpenJPADAO implements MessageExchangeDAO, CorrelatorMessageDAO {
     private static final Log __log = LogFactory.getLog(MessageExchangeDAOImpl.class);
     
     public final static String DELETE_MEXS_BY_PROCESS = "DELETE_MEXS_BY_PROCESS";
     public final static String SELECT_MEX_IDS_BY_PROCESS = "SELECT_MEX_IDS_BY_PROCESS";
+    public final static String SELECT_MEXES_BY_INSTANCE = "SELECT_MEXES_BY_INSTANCE";
     
     @Id @Column(name="MESSAGE_EXCHANGE_ID") 
     private String _id;

Modified: ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java?rev=830795&r1=830794&r2=830795&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java (original)
+++ ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java Wed Oct 28 23:34:02 2009
@@ -153,10 +153,6 @@
     }
     
     public void delete(Set<CLEANUP_CATEGORY> cleanupCategories) {
-        delete(cleanupCategories, true);
-    }
-
-    public void delete(Set<CLEANUP_CATEGORY> cleanupCategories, boolean deleteMyRoleMex) {
         if(__log.isDebugEnabled()) __log.debug("Cleaning up instance Data with " + cleanupCategories);
         
         // remove jacob state
@@ -207,8 +203,13 @@
         batchUpdateByIds(scopeIds.iterator(), getEM().createNamedQuery(ScopeDAOImpl.DELETE_SCOPES_BY_SCOPE_IDS), "ids");
     }
 
-    private void deleteMessageRoutes() {
+    @SuppressWarnings("unchecked")
+	private void deleteMessageRoutes() {
         getEM().createNamedQuery(MessageRouteDAOImpl.DELETE_MESSAGE_ROUTES_BY_INSTANCE).setParameter ("instance", this).executeUpdate();
+        Collection<MessageExchangeDAOImpl> mexes = getEM().createNamedQuery(MessageExchangeDAOImpl.SELECT_MEXES_BY_INSTANCE).setParameter ("instance", this).getResultList();
+        for( MessageExchangeDAOImpl mex : mexes ) {
+            getEM().remove(mex);
+        }
     }
     
     @SuppressWarnings("unchecked")