You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/09/06 07:46:57 UTC

svn commit: r573153 [8/9] - in /ode/trunk: ./ axis2/src/main/java/org/apache/ode/axis2/ bpel-api/src/ bpel-api/src/main/java/org/apache/ode/bpel/explang/ bpel-api/src/main/java/org/apache/ode/bpel/iapi/ bpel-api/src/main/java/org/apache/ode/bpel/pmapi/...

Modified: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java (original)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java Wed Sep  5 22:46:42 2007
@@ -41,359 +41,366 @@
  * Hibernate-based {@link ProcessInstanceDAO} implementation.
  */
 class ProcessInstanceDaoImpl extends HibernateDao implements ProcessInstanceDAO {
-  /** Query for removing selectors. */
-  private static final String QRY_DELSELECTORS = "delete from "  + 
-    HCorrelatorSelector.class.getName() + " where instance = ?";
-
-  private static final String QRY_VARIABLES = "from " + HXmlData.class.getName()
-    + " as x where x.name = ? and x.scope.scopeModelId = ? and x.scope.instance.id = ?";
-
-  private static final String QRY_RECOVERIES = "from " + HActivityRecovery.class.getName() +
-    " AS x WHERE x.instance.id = ?";
-
-  private HProcessInstance _instance;
-
-  private ScopeDAO _root;
-  
-	public ProcessInstanceDaoImpl(SessionManager sm, HProcessInstance instance) {
-    super(sm, instance);
-		_instance = instance;
-	}
-
-  /**
-	 * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getCreateTime()
-	 */
-	public Date getCreateTime() {
-		return _instance.getCreated();
-	}
-  
-  public void setFault(FaultDAO fault) {
-    _instance.setFault(((FaultDAOImpl)fault)._self);
-    getSession().update(_instance);
-    
-  }
-
-
-   /**
-   * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#setFault(javax.xml.namespace.QName, String, int, int, org.w3c.dom.Element)
-   */
-  public void setFault(QName name, String explanation, int lineNo, int activityId, Element faultData) {
-    if (_instance.getFault() != null)
-      getSession().delete(_instance.getFault());
-
-    HFaultData fault = new HFaultData();
-    fault.setName(QNameUtils.fromQName(name));
-    fault.setExplanation(explanation);
-    fault.setLineNo(lineNo);
-    fault.setActivityId(activityId);
-    if (faultData != null) {
-      HLargeData ld = new HLargeData(DOMUtils.domToString(faultData));
-      fault.setData(ld);
-      getSession().save(ld);
-    }
-
-    _instance.setFault(fault);
-    getSession().save(fault);
-    getSession().update(_instance);
-  }
-  /**
-   * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getFault()
-   */
-  public FaultDAO getFault() {
-    if (_instance.getFault() == null) return null;
-    else return new FaultDAOImpl(_sm, _instance.getFault());
-  }
-
-	/**
-	 * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getExecutionState()
-	 */
-	public byte[] getExecutionState() {
-    if (_instance.getJacobState() == null) return null;
-    return _instance.getJacobState().getBinary();
-	}
-	/**
-	 * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#setExecutionState(byte[])
-	 */
-	public void setExecutionState(byte[] bytes) {
-    if (_instance.getJacobState() != null)
-      getSession().delete(_instance.getJacobState());
-    if (bytes.length > 0) {
-      HLargeData ld = new HLargeData(bytes);
-      _instance.setJacobState(ld);
-      getSession().save(ld);
-    }
-    getSession().update(_instance);
-  }
-	
-	/**
-	 * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getProcess()
-	 */
-	public ProcessDAO getProcess() {
-		return new ProcessDaoImpl(_sm, _instance.getProcess());
-	}
-  
-	/**
-	 * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getRootScope()
-	 */
-	public ScopeDAO getRootScope() {
-    if (_root != null) 
-      return _root;
-    Query rootQry = getSession().createFilter(_instance.getScopes(),
-        "where this.parentScope is null");
-    HScope hroot = (HScope)rootQry.uniqueResult();
-    if (hroot == null)
-      return null;
-		return _root = new ScopeDaoImpl(_sm, hroot);
-	}
-	/**
-	 * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#setState(short)
-	 */
-	public void setState(short state) {
+    /** Query for removing selectors. */
+    private static final String QRY_DELSELECTORS = "delete from " + HCorrelatorSelector.class.getName() + " where instance = ?";
+
+    private static final String QRY_VARIABLES = "from " + HXmlData.class.getName()
+            + " as x where x.name = ? and x.scope.scopeModelId = ? and x.scope.instance.id = ?";
+
+    private static final String QRY_RECOVERIES = "from " + HActivityRecovery.class.getName() + " AS x WHERE x.instance.id = ?";
+
+    private HProcessInstance _instance;
+
+    private ScopeDAO _root;
+
+    public ProcessInstanceDaoImpl(SessionManager sm, HProcessInstance instance) {
+        super(sm, instance);
+        _instance = instance;
+    }
+
+    /**
+     * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getCreateTime()
+     */
+    public Date getCreateTime() {
+        return _instance.getCreated();
+    }
+
+    public void setFault(FaultDAO fault) {
+        _instance.setFault(((FaultDAOImpl) fault)._self);
+        getSession().update(_instance);
+
+    }
+
+    /**
+     * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#setFault(javax.xml.namespace.QName, String, int, int, org.w3c.dom.Element)
+     */
+    public void setFault(QName name, String explanation, int lineNo, int activityId, Element faultData) {
+        if (_instance.getFault() != null)
+            getSession().delete(_instance.getFault());
+
+        HFaultData fault = new HFaultData();
+        fault.setName(QNameUtils.fromQName(name));
+        fault.setExplanation(explanation);
+        fault.setLineNo(lineNo);
+        fault.setActivityId(activityId);
+        if (faultData != null) {
+            HLargeData ld = new HLargeData(DOMUtils.domToString(faultData));
+            fault.setData(ld);
+            getSession().save(ld);
+        }
+
+        _instance.setFault(fault);
+        getSession().save(fault);
+        getSession().update(_instance);
+    }
+
+    /**
+     * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getFault()
+     */
+    public FaultDAO getFault() {
+        if (_instance.getFault() == null)
+            return null;
+        else
+            return new FaultDAOImpl(_sm, _instance.getFault());
+    }
+
+    /**
+     * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getExecutionState()
+     */
+    public byte[] getExecutionState() {
+        if (_instance.getJacobState() == null)
+            return null;
+        return _instance.getJacobState().getBinary();
+    }
+
+    /**
+     * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#setExecutionState(byte[])
+     */
+    public void setExecutionState(byte[] bytes) {
+        if (_instance.getJacobState() != null)
+            getSession().delete(_instance.getJacobState());
+        if (bytes.length > 0) {
+            HLargeData ld = new HLargeData(bytes);
+            _instance.setJacobState(ld);
+            getSession().save(ld);
+        }
+        getSession().update(_instance);
+    }
+
+    /**
+     * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getProcess()
+     */
+    public ProcessDAO getProcess() {
+        return new ProcessDaoImpl(_sm, _instance.getProcess());
+    }
+
+    /**
+     * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getRootScope()
+     */
+    public ScopeDAO getRootScope() {
+        if (_root != null)
+            return _root;
+        Query rootQry = getSession().createFilter(_instance.getScopes(), "where this.parentScope is null");
+        HScope hroot = (HScope) rootQry.uniqueResult();
+        if (hroot == null)
+            return null;
+        return _root = new ScopeDaoImpl(_sm, hroot);
+    }
+
+    /**
+     * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#setState(short)
+     */
+    public void setState(short state) {
         _instance.setPreviousState(_instance.getState());
-    _instance.setState(state);
-    if(state==ProcessState.STATE_TERMINATED) {
-      clearSelectors();
-    }
-    getSession().update(_instance);
-  }
-	/**
-	 * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getState()
-	 */
-	public short getState() {
-		return _instance.getState();
-	}
-	/**
-	 * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getPreviousState()
-	 */
-	public short getPreviousState() {
-		return _instance.getPreviousState();
-	}
-
-  
-	public ScopeDAO createScope(ScopeDAO parentScope, String name, int scopeModelId) {
-		HScope scope = new HScope();
-    scope.setParentScope(parentScope != null
-        ? (HScope)((ScopeDaoImpl)parentScope).getHibernateObj()
-        : null);
-    scope.setName(name);
-    scope.setScopeModelId(scopeModelId);
-    scope.setState(ScopeStateEnum.ACTIVE.toString());
-    scope.setInstance(_instance);
-    scope.setCreated(new Date());
-    _instance.getScopes().add(scope);
-    getSession().save(scope);
-
-		return new ScopeDaoImpl(_sm, scope);
-	}
-
-	/**
-	 * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getInstanceId()
-	 */
-	public Long getInstanceId() {
-		return _instance.getId();
-	}
-
-  public ScopeDAO getScope(Long scopeInstanceId) {
-    Long id = Long.valueOf(scopeInstanceId);
-    HScope scope = (HScope)getSession().get(HScope.class, id);
-    return scope != null
-            ? new ScopeDaoImpl(_sm, scope)
-            : null;
-  }
-  
-	/**
-	 * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getScopes(java.lang.String)
-	 */
-	@SuppressWarnings("unchecked")
-  public Collection<ScopeDAO> getScopes(String scopeName) {
-    Collection<HScope> hscopes;
-    if (scopeName != null) {
-      Query filter = _sm.getSession().createFilter(_instance.getScopes(),
-          "where this.name=?");
-      filter.setString(0,scopeName);
-      hscopes = filter.list();
-    } else
-      hscopes = _instance.getScopes();
-    ArrayList<ScopeDAO> ret = new ArrayList<ScopeDAO>();
-    CollectionsX.transform(ret, hscopes, new UnaryFunction<HScope,ScopeDAO> () {
-      public ScopeDAO apply(HScope x) {
-        return new ScopeDaoImpl(_sm, x);
-      }
-     });
-    return ret;
-	}
-
-  public Collection<ScopeDAO> getScopes() {
-    return getScopes(null);
-  }
-  
-	/**
-	 * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getInstantiatingCorrelator()
-	 */
-	public CorrelatorDAO getInstantiatingCorrelator() {
-		return new CorrelatorDaoImpl(_sm, _instance.getInstantiatingCorrelator());
-	}
-
-  /**
-	 * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getLastActiveTime()
-	 */
-	public Date getLastActiveTime() {
-		return _instance.getLastActiveTime();
-	}
-	/**
-	 * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#setLastActiveTime(java.util.Date)
-	 */
-	public void setLastActiveTime(Date dt) {
-		_instance.setLastActiveTime(dt);
-	}
-
-  public Set<CorrelationSetDAO> getCorrelationSets() {
-    Set<CorrelationSetDAO> results = new HashSet<CorrelationSetDAO>();
-
-    for (HCorrelationSet hCorrelationSet : _instance.getCorrelationSets()) {
-      results.add(new CorrelationSetDaoImpl(_sm, hCorrelationSet));
-    }
-
-    return results;
-  }
-
-  public CorrelationSetDAO getCorrelationSet(String name) {
-    for (HCorrelationSet hCorrelationSet : _instance.getCorrelationSets()) {
-      if (hCorrelationSet.getName().equals(name))
-        return new CorrelationSetDaoImpl(_sm, hCorrelationSet);
-    }
-    return null;
-  }
-
-  /**
-	 * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getVariables(java.lang.String, int)
-	 */
-	public XmlDataDAO[] getVariables(String variableName, int scopeModelId) {
-    List<XmlDataDAO> results = new ArrayList<XmlDataDAO>();
-
-    Iterator iter;
-    Query qry = getSession().createQuery(QRY_VARIABLES);
-    qry.setString(0, variableName);
-    qry.setInteger(1, scopeModelId);
-    qry.setLong(2, _instance.getId());
-    iter = qry.iterate();
-
-    while(iter.hasNext()) {
-    	results.add(new XmlDataDaoImpl(_sm, (HXmlData)iter.next()));
-    }
-    Hibernate.close(iter);
-
-    return results.toArray(new XmlDataDAO[results.size()]);
-	}
-
-  /**
-   * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#finishCompletion()
-   */
-  public void finishCompletion() {
-    // make sure we have completed.
-    assert (ProcessState.isFinished(this.getState()));
-    // let our process know that we've done our work.
-    this.getProcess().instanceCompleted(this);
-  }
-
-  public void delete() {
-    _sm.getSession().delete(_instance);
-  }
-
-  public void insertBpelEvent(ProcessInstanceEvent event) {
-    // Defer to the BpelDAOConnectionImpl
-    BpelDAOConnectionImpl._insertBpelEvent(_sm.getSession(),event, this.getProcess(), this);
-  }
-
-  public EventsFirstLastCountTuple getEventsFirstLastCount() {
-
-    // Using a criteria, find the min,max, and count of event tstamps.
-    Criteria c = _sm.getSession().createCriteria(HBpelEvent.class);
-    c.add(Restrictions.eq("instance",_instance));
-    c.setProjection(Projections.projectionList().add(Projections.min("tstamp"))
-                                                .add(Projections.max("tstamp"))
-                                                .add(Projections.count("tstamp")));
-    
-    Object[] ret = (Object[]) c.uniqueResult();
-    EventsFirstLastCountTuple flc = new EventsFirstLastCountTuple();
-    flc.first = (Date) ret[0];
-    flc.last = (Date) ret[1];
-    flc.count = (Integer)ret[2];
-    return flc;
-  }
-
-  public long genMonotonic() {
-    long seq = _instance.getSequence()+1;
-    _instance.setSequence(seq);
-    return seq;
-  }
-  
-  protected void clearSelectors() {
-    Query q = getSession().createQuery(QRY_DELSELECTORS);
-    q.setEntity(0, _instance);
-    q.executeUpdate();    
-  }
-
-  public int getActivityFailureCount() {
-    return _instance.getActivityFailureCount();
-  }
-
-  public Date getActivityFailureDateTime() {
-    return _instance.getActivityFailureDateTime();
-  }
-
-  public Collection<ActivityRecoveryDAO> getActivityRecoveries() {
-    List<ActivityRecoveryDAO> results = new ArrayList<ActivityRecoveryDAO>();
-    Query qry = getSession().createQuery(QRY_RECOVERIES);
-    qry.setLong(0, _instance.getId());
-    Iterator iter = qry.iterate();
-    while (iter.hasNext())
-      results.add(new ActivityRecoveryDaoImpl(_sm, (HActivityRecovery) iter.next()));
-    Hibernate.close(iter);
-    return results;
-  }
-
-  public void createActivityRecovery(String channel, long activityId, String reason,
-                                     Date dateTime, Element data, String[] actions, int retries) {
-    HActivityRecovery recovery = new HActivityRecovery();
-    recovery.setInstance(_instance);
-    recovery.setChannel(channel);
-    recovery.setActivityId(activityId);
-    recovery.setReason(reason);
-    recovery.setDateTime(dateTime);
-    recovery.setRetries(retries);
-    if (data != null) {
-      HLargeData ld = new HLargeData(DOMUtils.domToString(data));
-      recovery.setDetails(ld);
-      getSession().save(ld);
-    }
-    String list = actions[0];
-    for (int i = 1; i < actions.length; ++i)
-      list += " " + actions[i];
-    recovery.setActions(list);
-    _instance.getActivityRecoveries().add(recovery);
-    getSession().save(recovery);
-    _instance.setActivityFailureDateTime(dateTime);
-    _instance.setActivityFailureCount(_instance.getActivityFailureCount() + 1);
-    getSession().update(_instance);
-  }
-
-  /**
-   * Delete previously registered activity recovery.
-   */
-  public void deleteActivityRecovery(String channel) {
-    for (HActivityRecovery recovery : _instance.getActivityRecoveries()) {
-      if (recovery.getChannel().equals(channel)) {
-        getSession().delete(recovery);
-        _instance.setActivityFailureCount(_instance.getActivityFailureCount() - 1);
+        _instance.setState(state);
+        if (state == ProcessState.STATE_TERMINATED) {
+            clearSelectors();
+        }
         getSession().update(_instance);
-        return;
-      }
     }
-  }
-  
-  public BpelDAOConnection getConnection() {
-    return new BpelDAOConnectionImpl(_sm);
-  }
+
+    /**
+     * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getState()
+     */
+    public short getState() {
+        return _instance.getState();
+    }
+
+    /**
+     * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getPreviousState()
+     */
+    public short getPreviousState() {
+        return _instance.getPreviousState();
+    }
+
+    public ScopeDAO createScope(ScopeDAO parentScope, String name, int scopeModelId) {
+        HScope scope = new HScope();
+        scope.setParentScope(parentScope != null ? (HScope) ((ScopeDaoImpl) parentScope).getHibernateObj() : null);
+        scope.setName(name);
+        scope.setScopeModelId(scopeModelId);
+        scope.setState(ScopeStateEnum.ACTIVE.toString());
+        scope.setInstance(_instance);
+        scope.setCreated(new Date());
+        _instance.getScopes().add(scope);
+        getSession().save(scope);
+
+        return new ScopeDaoImpl(_sm, scope);
+    }
+
+    /**
+     * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getInstanceId()
+     */
+    public Long getInstanceId() {
+        return _instance.getId();
+    }
+
+    public ScopeDAO getScope(Long scopeInstanceId) {
+        Long id = Long.valueOf(scopeInstanceId);
+        HScope scope = (HScope) getSession().get(HScope.class, id);
+        return scope != null ? new ScopeDaoImpl(_sm, scope) : null;
+    }
+
+    /**
+     * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getScopes(java.lang.String)
+     */
+    @SuppressWarnings("unchecked")
+    public Collection<ScopeDAO> getScopes(String scopeName) {
+        Collection<HScope> hscopes;
+        if (scopeName != null) {
+            Query filter = _sm.getSession().createFilter(_instance.getScopes(), "where this.name=?");
+            filter.setString(0, scopeName);
+            hscopes = filter.list();
+        } else
+            hscopes = _instance.getScopes();
+        ArrayList<ScopeDAO> ret = new ArrayList<ScopeDAO>();
+        CollectionsX.transform(ret, hscopes, new UnaryFunction<HScope, ScopeDAO>() {
+            public ScopeDAO apply(HScope x) {
+                return new ScopeDaoImpl(_sm, x);
+            }
+        });
+        return ret;
+    }
+
+    public Collection<ScopeDAO> getScopes() {
+        return getScopes(null);
+    }
+
+    /**
+     * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getInstantiatingCorrelator()
+     */
+    public CorrelatorDAO getInstantiatingCorrelator() {
+        return new CorrelatorDaoImpl(_sm, _instance.getInstantiatingCorrelator());
+    }
+
+    /**
+     * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getLastActiveTime()
+     */
+    public Date getLastActiveTime() {
+        return _instance.getLastActiveTime();
+    }
+
+    /**
+     * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#setLastActiveTime(java.util.Date)
+     */
+    public void setLastActiveTime(Date dt) {
+        _instance.setLastActiveTime(dt);
+    }
+
+    public Set<CorrelationSetDAO> getCorrelationSets() {
+        Set<CorrelationSetDAO> results = new HashSet<CorrelationSetDAO>();
+
+        for (HCorrelationSet hCorrelationSet : _instance.getCorrelationSets()) {
+            results.add(new CorrelationSetDaoImpl(_sm, hCorrelationSet));
+        }
+
+        return results;
+    }
+
+    public CorrelationSetDAO getCorrelationSet(String name) {
+        for (HCorrelationSet hCorrelationSet : _instance.getCorrelationSets()) {
+            if (hCorrelationSet.getName().equals(name))
+                return new CorrelationSetDaoImpl(_sm, hCorrelationSet);
+        }
+        return null;
+    }
+
+    /**
+     * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getVariables(java.lang.String, int)
+     */
+    public XmlDataDAO[] getVariables(String variableName, int scopeModelId) {
+        List<XmlDataDAO> results = new ArrayList<XmlDataDAO>();
+
+        Iterator iter;
+        Query qry = getSession().createQuery(QRY_VARIABLES);
+        qry.setString(0, variableName);
+        qry.setInteger(1, scopeModelId);
+        qry.setLong(2, _instance.getId());
+        iter = qry.iterate();
+
+        while (iter.hasNext()) {
+            results.add(new XmlDataDaoImpl(_sm, (HXmlData) iter.next()));
+        }
+        Hibernate.close(iter);
+
+        return results.toArray(new XmlDataDAO[results.size()]);
+    }
+
+    /**
+     * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#finishCompletion()
+     */
+    public void finishCompletion() {
+        // make sure we have completed.
+        assert (ProcessState.isFinished(this.getState()));
+        // let our process know that we've done our work.
+        this.getProcess().instanceCompleted(this);
+    }
+
+    public void delete() {
+        _sm.getSession().delete(_instance);
+    }
+
+    public void insertBpelEvent(ProcessInstanceEvent event) {
+        // Defer to the BpelDAOConnectionImpl
+        BpelDAOConnectionImpl._insertBpelEvent(_sm.getSession(), event, this.getProcess(), this);
+    }
+
+    public EventsFirstLastCountTuple getEventsFirstLastCount() {
+
+        // Using a criteria, find the min,max, and count of event tstamps.
+        Criteria c = _sm.getSession().createCriteria(HBpelEvent.class);
+        c.add(Restrictions.eq("instance", _instance));
+        c.setProjection(Projections.projectionList().add(Projections.min("tstamp")).add(Projections.max("tstamp")).add(
+                Projections.count("tstamp")));
+
+        Object[] ret = (Object[]) c.uniqueResult();
+        EventsFirstLastCountTuple flc = new EventsFirstLastCountTuple();
+        flc.first = (Date) ret[0];
+        flc.last = (Date) ret[1];
+        flc.count = (Integer) ret[2];
+        return flc;
+    }
+
+    public long genMonotonic() {
+        long seq = _instance.getSequence() + 1;
+        _instance.setSequence(seq);
+        return seq;
+    }
+
+    protected void clearSelectors() {
+        Query q = getSession().createQuery(QRY_DELSELECTORS);
+        q.setEntity(0, _instance);
+        q.executeUpdate();
+    }
+
+    public int getActivityFailureCount() {
+        return _instance.getActivityFailureCount();
+    }
+
+    public Date getActivityFailureDateTime() {
+        return _instance.getActivityFailureDateTime();
+    }
+
+    public Collection<ActivityRecoveryDAO> getActivityRecoveries() {
+        List<ActivityRecoveryDAO> results = new ArrayList<ActivityRecoveryDAO>();
+        Query qry = getSession().createQuery(QRY_RECOVERIES);
+        qry.setLong(0, _instance.getId());
+        Iterator iter = qry.iterate();
+        while (iter.hasNext())
+            results.add(new ActivityRecoveryDaoImpl(_sm, (HActivityRecovery) iter.next()));
+        Hibernate.close(iter);
+        return results;
+    }
+
+    public void createActivityRecovery(String channel, long activityId, String reason, Date dateTime, Element data,
+            String[] actions, int retries) {
+        HActivityRecovery recovery = new HActivityRecovery();
+        recovery.setInstance(_instance);
+        recovery.setChannel(channel);
+        recovery.setActivityId(activityId);
+        recovery.setReason(reason);
+        recovery.setDateTime(dateTime);
+        recovery.setRetries(retries);
+        if (data != null) {
+            HLargeData ld = new HLargeData(DOMUtils.domToString(data));
+            recovery.setDetails(ld);
+            getSession().save(ld);
+        }
+        String list = actions[0];
+        for (int i = 1; i < actions.length; ++i)
+            list += " " + actions[i];
+        recovery.setActions(list);
+        _instance.getActivityRecoveries().add(recovery);
+        getSession().save(recovery);
+        _instance.setActivityFailureDateTime(dateTime);
+        _instance.setActivityFailureCount(_instance.getActivityFailureCount() + 1);
+        getSession().update(_instance);
+    }
+
+    /**
+     * Delete previously registered activity recovery.
+     */
+    public void deleteActivityRecovery(String channel) {
+        for (HActivityRecovery recovery : _instance.getActivityRecoveries()) {
+            if (recovery.getChannel().equals(channel)) {
+                getSession().delete(recovery);
+                _instance.setActivityFailureCount(_instance.getActivityFailureCount() - 1);
+                getSession().update(_instance);
+                return;
+            }
+        }
+    }
+
+    public BpelDAOConnection getConnection() {
+        return new BpelDAOConnectionImpl(_sm);
+    }
+
+    public int getExecutionStateCounter() {
+        return _instance.getExecutionStateCounter();
+    }
+
+    public void setExecutionStateCounter(int stateCounter) {
+        _instance.setExecutionStateCounter(stateCounter);
+
+    }
 
 }

Modified: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessage.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessage.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessage.java (original)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessage.java Wed Sep  5 22:46:42 2007
@@ -27,20 +27,9 @@
  */
 public class HMessage extends HObject {
 
-  private HMessageExchange _mex;
   private String _type;
   private HLargeData _data;
   
-  
-  public void setMessageExchange(HMessageExchange mex) {
-    _mex = mex;
-  }
-  
-  /** @hibernate.many-to-one column="MEX" */
-  public HMessageExchange getMessageExchange() {
-    return _mex;
-  }
-
   public void setType(String type) {
     _type = type;
   }

Modified: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessageExchange.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessageExchange.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessageExchange.java (original)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessageExchange.java Wed Sep  5 22:46:42 2007
@@ -18,11 +18,12 @@
  */
 package org.apache.ode.daohib.bpel.hobj;
 
-
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.ode.bpel.iapi.MessageExchange.AckType;
+
 /**
  * Hibernate-managed table for keeping track of message exchanges.
  * 
@@ -47,7 +48,7 @@
     private HMessage _request;
 
     private HMessage _response;
-    
+
     private HPartnerLink _partnerLink;
 
     private String _clientKey;
@@ -70,10 +71,22 @@
 
     private String _callee;
 
-    private String _pipedMessageExchangeId;
+    private String _p2pPeer;
 
     private Map<String, String> _properties = new HashMap<String, String>();
 
+    private long _timeout;
+
+    private String _istyle;
+
+    private String _failureType;
+
+    private String _mexId;
+
+    private String _ackType;
+
+    private String _pipedPid;
+
     /**
      * 
      */
@@ -82,6 +95,20 @@
     }
 
     /**
+     * 
+     * @hibernate.property
+     * @hibernate.column name="MEXID" not-null="true" unique="true"
+     */
+
+    public String getMexId() {
+        return _mexId;
+    }
+
+    public void setMexId(String mexId) {
+        _mexId = mexId;
+    }
+
+    /**
      * @hibernate.property column="PORT_TYPE"
      */
     public String getPortType() {
@@ -299,8 +326,7 @@
     }
 
     /**
-     * @hibernate.map name="properties" table="BPEL_MEX_PROPS" lazy="false"
-     *                cascade="delete"
+     * @hibernate.map name="properties" table="BPEL_MEX_PROPS" lazy="false" cascade="delete"
      * @hibernate.collection-key column="MEX"
      * @hibernate.collection-index column="NAME" type="string"
      * @hibernate.collection-element column="VALUE" type="string" length="8000"
@@ -318,20 +344,80 @@
     }
 
     /**
-     * @hibernate.many-to-one column="PARTNERLINK" 
+     * @hibernate.many-to-one column="PARTNERLINK"
      */
     public HPartnerLink getPartnerLink() {
         return _partnerLink;
     }
 
     /**
-     * @hibernate.property column="PIPED_ID"
+     * @hibernate.property column="TIMEOUT"
+     * 
+     */
+    public long getTimeout() {
+        return _timeout;
+    }
+
+    public void setTimeout(long timeout) {
+        _timeout = timeout;
+    }
+
+    /**
+     * @hibernate.property column="ISTYLE"
+     */
+    public String getInvocationStyle() {
+        return _istyle;
+    }
+
+    /**
+     * @hibernate.property column="P2P_PEER"
+     * @return
+     */
+    public String getPipedMessageExchange() {
+        return _p2pPeer;
+    }
+
+    public void setPipedMesageExchange(String p2ppeer) {
+        _p2pPeer = p2ppeer;
+    }
+
+    public void setFailureType(String failureType) {
+        _failureType = failureType;
+    }
+
+    /**
+     * @hibernate.property column="FAILURE_TYPE"
+     * @return
+     */
+    public String getFailureType() {
+        return _failureType;
+    }
+
+    public void setInvocationStyle(String invocationStyle) {
+        _istyle = invocationStyle;
+    }
+
+    /**
+     * @hibernate.property column="ACK_TYPE"
+     * @return
+     */
+    public String getAckType() {
+        return _ackType;
+    }
+
+    public void setAckType(String ackType) {
+        _ackType = ackType;
+    }
+
+    /**
+     * @hibernate.property column="PIPED_PID"
+     * @return
      */
-    public String getPipedMessageExchangeId() {
-        return _pipedMessageExchangeId;
+    public String getPipedPID() {
+        return _pipedPid;
     }
 
-    public void setPipedMessageExchangeId(String pipedMessageExchangeId) {
-        _pipedMessageExchangeId = pipedMessageExchangeId;
+    public void setPipedPID(String ppid) {
+        _pipedPid = ppid;
     }
 }

Modified: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java (original)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java Wed Sep  5 22:46:42 2007
@@ -66,6 +66,8 @@
 
     private long _seq;
 
+    private int _execStateCounter;
+
     /**
      *
      */
@@ -263,6 +265,19 @@
 
     public void setActivityFailureDateTime(Date dateTime) {
         _activityFailureDateTime = dateTime;
+    }
+
+    /**
+     * @hibernate.property column="EXEC_STATE_COUNT"
+     * @return
+     */
+    public int getExecutionStateCounter() {
+        return _execStateCounter;
+    }
+
+    public void setExecutionStateCounter(int stateCounter) {
+        _execStateCounter = stateCounter;
+        
     }
 
 }

Modified: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ql/HibernateInstancesQueryCompiler.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ql/HibernateInstancesQueryCompiler.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ql/HibernateInstancesQueryCompiler.java (original)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ql/HibernateInstancesQueryCompiler.java Wed Sep  5 22:46:42 2007
@@ -334,7 +334,6 @@
         try {
           value.setValue(ISO8601DateParser.parse((String) value.getValue()));
         } catch (ParseException ex) {
-          // TODO
           throw new RuntimeException(ex);
         }
       }
@@ -521,7 +520,6 @@
               objValues.add(ISO8601DateParser.parse((String) value.getValue()));
             }
           } catch (ParseException ex) {
-            // TODO
             throw new RuntimeException(ex);
           }
         } else {

Modified: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/BPELDAOConnectionImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/BPELDAOConnectionImpl.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/BPELDAOConnectionImpl.java (original)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/BPELDAOConnectionImpl.java Wed Sep  5 22:46:42 2007
@@ -69,8 +69,8 @@
         _em = null;
     }
 
-    public MessageExchangeDAO createMessageExchange(char dir) {
-        MessageExchangeDAOImpl ret = new MessageExchangeDAOImpl(dir);
+    public MessageExchangeDAO createMessageExchange(String mexId, char dir) {
+        MessageExchangeDAOImpl ret = new MessageExchangeDAOImpl(mexId, dir);
         _em.persist(ret);
         return ret;
     }
@@ -152,7 +152,11 @@
 	}
 	
     public MessageExchangeDAO getMessageExchange(String mexid) {
-        return _em.find(MessageExchangeDAOImpl.class, mexid);
+        List l = _em.createQuery("select x from MessageExchangeDAOImpl x where x._id = ?1")
+        .setParameter(1, mexid).getResultList();
+        if (l.size() == 0) return null;
+        MessageExchangeDAOImpl m = (MessageExchangeDAOImpl) l.get(0);
+        return m;
     }
 
     public EntityManager getEntityManager() {

Modified: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageDAOImpl.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageDAOImpl.java (original)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageDAOImpl.java Wed Sep  5 22:46:42 2007
@@ -20,25 +20,21 @@
 package org.apache.ode.dao.jpa;
 
 
-import org.apache.ode.bpel.dao.MessageDAO;
-import org.apache.ode.bpel.dao.MessageExchangeDAO;
-import org.apache.ode.utils.DOMUtils;
-import org.w3c.dom.Element;
-
 import javax.persistence.Basic;
-import javax.persistence.CascadeType;
 import javax.persistence.Column;
 import javax.persistence.Entity;
-import javax.persistence.FetchType;
 import javax.persistence.GeneratedValue;
 import javax.persistence.GenerationType;
 import javax.persistence.Id;
 import javax.persistence.Lob;
-import javax.persistence.ManyToOne;
 import javax.persistence.Table;
 import javax.persistence.Transient;
 import javax.xml.namespace.QName;
 
+import org.apache.ode.bpel.dao.MessageDAO;
+import org.apache.ode.utils.DOMUtils;
+import org.w3c.dom.Element;
+
 
 @Entity
 @Table(name="ODE_MESSAGE")
@@ -53,15 +49,12 @@
     private String _data;
 	@Transient
     private Element _element;
-	@ManyToOne(fetch=FetchType.LAZY,cascade={CascadeType.ALL}) @Column(name="MESSAGE_EXCHANGE_ID")
-	private MessageExchangeDAOImpl _messageExchange;
 
 	public MessageDAOImpl() {
 		
 	}
 	public MessageDAOImpl(QName type, MessageExchangeDAOImpl me) {
 		_type = type.toString();
-		_messageExchange = me;
 	}
 	
 	public Element getData() {
@@ -74,10 +67,6 @@
 		}
 		
 		return _element;
-	}
-
-	public MessageExchangeDAO getMessageExchange() {
-		return _messageExchange;
 	}
 
 	public QName getType() {

Modified: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java (original)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java Wed Sep  5 22:46:42 2007
@@ -25,6 +25,10 @@
 import org.apache.ode.bpel.dao.PartnerLinkDAO;
 import org.apache.ode.bpel.dao.ProcessDAO;
 import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.iapi.InvocationStyle;
+import org.apache.ode.bpel.iapi.MessageExchange.AckType;
+import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
 import org.apache.ode.utils.DOMUtils;
 import org.apache.ode.utils.uuid.UUID;
 import org.w3c.dom.Element;
@@ -91,6 +95,9 @@
     private String _correlationKeys;
     @Basic @Column(name="PIPED_ID")
     private String _pipedMessageExchangeId;
+    
+    @Basic @Column(name="ACK_TYPE")
+    private String _ackType;
 
     @OneToMany(targetEntity=MexProperty.class,mappedBy="_mex",fetch=FetchType.EAGER,cascade={CascadeType.ALL})
     private Collection<MexProperty> _props = new ArrayList<MexProperty>();
@@ -100,19 +107,31 @@
 	private PartnerLinkDAOImpl _partnerLink;
 	@ManyToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) @Column(name="PROCESS_ID")
 	private ProcessDAOImpl _process;
-	@OneToOne(fetch=FetchType.LAZY,cascade={CascadeType.ALL}) @Column(name="REQUEST_MESSAGE_ID")
+	@OneToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) @Column(name="REQUEST_MESSAGE_ID")
 	private MessageDAOImpl _request;
-	@OneToOne(fetch=FetchType.LAZY,cascade={CascadeType.ALL}) @Column(name="RESPONSE_MESSAGE_ID")
+    @OneToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) @Column(name="RESPONSE_MESSAGE_ID")
 	private MessageDAOImpl _response;
 
     @ManyToOne(fetch= FetchType.LAZY,cascade={CascadeType.PERSIST}) @Column(name="CORR_ID")
     private CorrelatorDAOImpl _correlator;
+    
+    @Basic @Column(name="ISTYLE")
+    private String _istyle;
+
+    @Basic @Column(name="TIMEOUT")
+    private long _timeout;
+    
+    @Basic @Column(name="FAILURE_TYPE")
+    private String _failureType;
+    
+    @Basic @Column(name="PIPED_PID")
+    private String _pipedPid;
 
     public MessageExchangeDAOImpl() {}
     
-	public MessageExchangeDAOImpl(char direction){
+	public MessageExchangeDAOImpl(String mexId, char direction){
 		_direction = direction;
-		_id = new UUID().toString();
+		_id = mexId;
 	}
 	
 	public MessageDAO createMessage(QName type) {
@@ -128,7 +147,7 @@
 		return _channel;
 	}
 
-	public String getCorrelationId() {
+	public String getPartnersKey() {
 		return _correlationId;
 	}
 
@@ -223,8 +242,8 @@
 		return _response;
 	}
 
-	public String getStatus() {
-		return _status;
+	public Status getStatus() {
+		return _status == null ? null : Status.valueOf(_status);
 	}
 
 	public void setCallee(QName callee) {
@@ -235,7 +254,7 @@
 		_channel = channel;
 	}
 
-	public void setCorrelationId(String correlationId) {
+	public void setPartnersKey(String correlationId) {
 		_correlationId = correlationId;
 	}
 
@@ -296,8 +315,8 @@
 		_response = (MessageDAOImpl)msg;
 	}
 
-	public void setStatus(String status) {
-		_status = status;
+	public void setStatus(Status status) {
+		_status = status == null ?  null : status.toString();
 	}
 
     public String getPipedMessageExchangeId() {
@@ -338,5 +357,46 @@
 
     public void setCorrelator(CorrelatorDAOImpl correlator) {
         _correlator = correlator;
+    }
+
+    public InvocationStyle getInvocationStyle() {
+        return _istyle == null ? null : InvocationStyle.valueOf(_istyle);
+    }
+
+
+    public long getTimeout() {
+        return _timeout;
+    }
+
+    public void setFailureType(FailureType failureType) {
+        _failureType = failureType == null ? null :failureType.toString();
+    }
+    
+    public FailureType getFailureType() {
+        return _failureType == null ? null : FailureType.valueOf(_failureType);
+    }
+
+    public void setInvocationStyle(InvocationStyle invocationStyle) {
+        _istyle = invocationStyle == null ? null : invocationStyle.toString();
+    }
+
+    public void setTimeout(long timeout) {
+        _timeout = timeout;
+    }
+
+    public AckType getAckType() {
+        return _ackType == null ? null : AckType.valueOf(_ackType);
+    }
+
+    public void setAckType(AckType ackType) {
+        _ackType = ackType == null ? null :ackType.toString();
+    }
+
+    public QName getPipedPID() {
+        return _pipedPid == null ? null : QName.valueOf(_pipedPid);
+    }
+
+    public void setPipedPID(QName pipedPid) {
+        _pipedPid = pipedPid == null ? null : pipedPid.toString();
     }
 }

Modified: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java (original)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java Wed Sep  5 22:46:42 2007
@@ -77,10 +77,13 @@
     private short _previousState;
 	@Lob @Column(name="EXECUTION_STATE")
     private byte[] _executionState;
-	@Basic @Column(name="SEQUENCE")
+    @Basic @Column(name="SEQUENCE")
     private long _sequence;
 	@Basic @Column(name="DATE_CREATED")
     private Date _dateCreated = new Date();
+    
+    @Basic @Column(name="EXEC_STATE_COUNTER")
+    private int _execStateCounter;
 	
 	@OneToOne(fetch=FetchType.LAZY,cascade={CascadeType.ALL}) @Column(name="ROOT_SCOPE_ID")
 	private ScopeDAOImpl _rootScope;
@@ -295,5 +298,11 @@
 
     public BpelDAOConnection getConnection() {
         return new BPELDAOConnectionImpl(getEM());
+    }
+    public int getExecutionStateCounter() {
+        return _execStateCounter;
+    }
+    public void setExecutionStateCounter(int stateCounter) {
+        _execStateCounter = stateCounter;
     }
 }

Modified: ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobMessages.java
URL: http://svn.apache.org/viewvc/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobMessages.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobMessages.java (original)
+++ ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobMessages.java Wed Sep  5 22:46:42 2007
@@ -44,24 +44,7 @@
         methodName, className);
   }
 
-  // TODO
-  public String msgContDeHydrationErr(String channel, String name) {
-    throw new UnsupportedOperationException();
-  }
 
-  /**
-   * Error indicating that a re-hydration of a saved _continuation object could
-   * not be completed.
-   * 
-   * @param channel
-   *          channel with the dangling _continuation
-   * @param mlClassName
-   *          name of de-hydrated {@link org.apache.ode.jacob.ChannelListener} object
-   * 
-   */
-  public String msgContHydrationErr(String channel, String mlClassName) {
-    throw new UnsupportedOperationException();
-  }
 
   /**
    * Internal error indicating that a required client method was not accessible

Modified: ode/trunk/jbi/src/main/java/org/apache/ode/jbi/MessageExchangeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/MessageExchangeContextImpl.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/jbi/src/main/java/org/apache/ode/jbi/MessageExchangeContextImpl.java (original)
+++ ode/trunk/jbi/src/main/java/org/apache/ode/jbi/MessageExchangeContextImpl.java Wed Sep  5 22:46:42 2007
@@ -19,44 +19,73 @@
 
 package org.apache.ode.jbi;
 
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.ContextException;
+import org.apache.ode.bpel.iapi.EndpointReference;
+import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.iapi.MessageExchangeContext;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
+import org.apache.ode.bpel.iapi.PartnerRoleChannel;
 import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
 
+
 /**
- * Implementation of the ODE {@link org.apache.ode.bpel.iapi.MessageExchangeContext}
- * interface. This class is used by the ODE engine to make invocation on JBI
- * services provided by other engines (i.e. the BPEL engine is acting as
- * client/consumer of services). 
+ * Implementation of the ODE {@link org.apache.ode.bpel.iapi.MessageExchangeContext} interface. This class is used by the ODE engine
+ * to make invocation on JBI services provided by other engines (i.e. the BPEL engine is acting as client/consumer of services).
  */
 public class MessageExchangeContextImpl implements MessageExchangeContext {
 
-  private static final Log __log = LogFactory
-      .getLog(MessageExchangeContextImpl.class);
+    private static final Log __log = LogFactory.getLog(MessageExchangeContextImpl.class);
+
+    private OdeContext _ode;
+
+    /** Supported invocation styles. For now this is fixed. */
+    private static final Set<InvocationStyle> __supported;
+    static {
+        HashSet<InvocationStyle> supported = new HashSet<InvocationStyle>();
+        supported.add(InvocationStyle.UNRELIABLE);
+        __supported = Collections.unmodifiableSet(supported);
+    }
+
+    public MessageExchangeContextImpl(OdeContext ode) {
+        _ode = ode;
+    }
+
+
+    public void invokePartnerUnreliable(PartnerRoleMessageExchange mex) throws ContextException {
+        _ode._consumer.invokePartner(mex);
+    }
+
+    public void invokePartnerReliable(PartnerRoleMessageExchange mex) throws ContextException {
+        throw new ContextException("Unsupported.");
 
-  private OdeContext _ode;
+    }
+
+    public void invokePartnerTransacted(PartnerRoleMessageExchange mex) throws ContextException {
+        throw new ContextException("Unsupported.");
+
+    }
+
+
+    public void cancel(PartnerRoleMessageExchange mex) throws ContextException {
+        // What can we do in JBI to cancel? --- not much. 
+        
+    }
+
+    public Set<InvocationStyle> getSupportedInvocationStyle(PartnerRoleChannel prc, EndpointReference partnerEpr) {
+        return __supported ;
+    }
+
+
+    public void onMyRoleMessageExchangeStateChanged(MyRoleMessageExchange myRoleMex) throws BpelEngineException {
+    }
+    
 
-  public MessageExchangeContextImpl(OdeContext ode) {
-    _ode = ode;
-  }
-
-  public void onAsyncReply(MyRoleMessageExchange myrolemex)
-      throws BpelEngineException {
-    OdeService ode = _ode.getService(myrolemex.getServiceName());
-    if (ode !=  null)
-      ode.onResponse(myrolemex);
-    else {
-      __log.error("No active service for message exchange: "  + myrolemex);
-    }
-  }
-
-  public void invokePartner(PartnerRoleMessageExchange mex) throws ContextException {
-    _ode._consumer.invokePartner(mex);
-  }
+    
 }

Modified: ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java
URL: http://svn.apache.org/viewvc/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java (original)
+++ ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java Wed Sep  5 22:46:42 2007
@@ -18,40 +18,37 @@
  */
 package org.apache.ode.jbi;
 
-import org.apache.ode.bpel.iapi.ContextException;
-import org.apache.ode.bpel.iapi.Message;
-import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
-import org.apache.ode.bpel.iapi.Scheduler;
-import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
-import org.apache.ode.jbi.msgmap.Mapper;
-import org.apache.ode.jbi.msgmap.MessageTranslationException;
-
 import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
 
-import javax.jbi.messaging.*;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessageExchangeFactory;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
 import javax.jbi.servicedesc.ServiceEndpoint;
 import javax.xml.namespace.QName;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.iapi.ContextException;
+import org.apache.ode.bpel.iapi.Message;
+import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
+import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
+import org.apache.ode.jbi.msgmap.Mapper;
+import org.apache.ode.jbi.msgmap.MessageTranslationException;
 
 /**
  * Bridge between ODE (consumers) and JBI (providers). An single object of this type handles all communications initiated by ODE
- * that is destined for other JBI providers. 
+ * that is destined for other JBI providers.
  */
-abstract class OdeConsumer extends ServiceBridge implements JbiMessageExchangeProcessor {
+class OdeConsumer extends ServiceBridge implements JbiMessageExchangeProcessor {
     private static final Log __log = LogFactory.getLog(OdeConsumer.class);
-    private static final long DEFAULT_RESPONSE_TIMEOUT = Long.getLong("org.apache.ode.jbi.timeout", 2 * 60 * 1000L);
 
     protected OdeContext _ode;
 
-    protected long _responseTimeout = DEFAULT_RESPONSE_TIMEOUT;
-
-
-    protected Map<String, PartnerRoleMessageExchange> _outstandingExchanges = new ConcurrentHashMap<String, PartnerRoleMessageExchange>();
-
     OdeConsumer(OdeContext ode) {
         _ode = ode;
     }
@@ -100,35 +97,15 @@
                 NormalizedMessage nmsg = inonly.createMessage();
                 mapper.toNMS(nmsg, odeMex.getRequest(), odeMex.getOperation().getInput().getMessage(), null);
                 inonly.setInMessage(nmsg);
-                _ode._scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
-                    public void afterCompletion(boolean success) {
-                        if (success) {
-                            doSendOneWay(odeMex, inonly);
-                        }
-                    }
-                    public void beforeCompletion() {
-                    }
-
-                });
+                doSendJBI(odeMex, inonly);
                 odeMex.replyOneWayOk();
             } else {
                 final InOut inout = (InOut) jbiMex;
                 NormalizedMessage nmsg = inout.createMessage();
                 mapper.toNMS(nmsg, odeMex.getRequest(), odeMex.getOperation().getInput().getMessage(), null);
                 inout.setInMessage(nmsg);
-                _ode._scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
-                    public void afterCompletion(boolean success) {
-                        if (success) {
-                            doSendTwoWay(odeMex, inout);
-                        }
-                    }
-
-                    public void beforeCompletion() {
-                    }
-
-                });
-
-                odeMex.replyAsync();
+                doSendJBI(odeMex, inout);
+                odeMex.replyAsync(inout.getExchangeId());
             }
         } catch (MessagingException me) {
             String errmsg = "JBI messaging error for ODE MEX " + odeMex;
@@ -142,83 +119,58 @@
 
     }
 
-    protected abstract void doSendOneWay(PartnerRoleMessageExchange odeMex, InOnly inonly);
-
-    protected abstract void doSendTwoWay(PartnerRoleMessageExchange odeMex, InOut inout);
-
-    protected abstract void inOutDone(InOut inout);
 
     public void onJbiMessageExchange(MessageExchange jbiMex) throws MessagingException {
-        if (!jbiMex.getPattern().equals(MessageExchangePattern.IN_ONLY) &&
-            !jbiMex.getPattern().equals(MessageExchangePattern.IN_OUT)) {
+        if (!jbiMex.getPattern().equals(MessageExchangePattern.IN_ONLY)
+                && !jbiMex.getPattern().equals(MessageExchangePattern.IN_OUT)) {
             __log.error("JBI MessageExchange " + jbiMex.getExchangeId() + " is of an unsupported pattern " + jbiMex.getPattern());
             return;
         }
         if (jbiMex.getStatus() == ExchangeStatus.ACTIVE) {
             if (jbiMex.getPattern().equals(MessageExchangePattern.IN_OUT)) {
-                inOutDone((InOut) jbiMex);
                 outResponse((InOut) jbiMex);
             }
             jbiMex.setStatus(ExchangeStatus.DONE);
             _ode.getChannel().send(jbiMex);
         } else if (jbiMex.getStatus() == ExchangeStatus.ERROR) {
-            inOutDone((InOut) jbiMex);
             outFailure((InOut) jbiMex);
         } else if (jbiMex.getStatus() == ExchangeStatus.DONE) {
-            _outstandingExchanges.remove(jbiMex.getExchangeId());
+            ; // anything todo here? 
         } else {
             __log.error("Unexpected status " + jbiMex.getStatus() + " for JBI message exchange: " + jbiMex.getExchangeId());
         }
     }
 
     private void outFailure(final InOut jbiMex) {
-        final PartnerRoleMessageExchange pmex = _outstandingExchanges.remove(jbiMex.getExchangeId());
+        PartnerRoleMessageExchange pmex = (PartnerRoleMessageExchange) _ode._server.getMessageExchangeByForeignKey(jbiMex.getExchangeId());
         if (pmex == null) {
-            __log.warn("Received a response for unknown JBI message exchange " + jbiMex.getExchangeId());
+            __log.warn("Received a response for unknown partner role message exchange " + pmex.getMessageExchangeId());
             return;
         }
-
-        try {
-            _ode._scheduler.execTransaction(new Callable<Boolean>() {
-                public Boolean call() throws Exception {
-                    pmex.replyWithFailure(FailureType.OTHER, "Error: " + jbiMex.getError(), null);
-                    return null;
-                }
-            });
-        } catch (Exception ex) {
-            __log.error("error delivering failure: ", ex);
-        }
-
+        
+        pmex.replyWithFailure(FailureType.OTHER, "Error: " + jbiMex.getError(), null);
     }
 
     private void outResponse(final InOut jbiMex) {
-        final PartnerRoleMessageExchange outstanding = _outstandingExchanges.remove(jbiMex.getExchangeId());
-        if (outstanding == null) {
-            __log.warn("Received a response for unknown JBI message exchange " + jbiMex.getExchangeId());
+
+        PartnerRoleMessageExchange pmex = (PartnerRoleMessageExchange) _ode._server.getMessageExchangeByForeignKey(jbiMex.getExchangeId());
+        if (pmex == null) {
+            __log.warn("Received a response for unknown partner role message exchange " + pmex.getMessageExchangeId());
             return;
         }
-
-        try {
-            _ode._scheduler.execTransaction(new Callable<Boolean>() {
-                @SuppressWarnings("unchecked")
-                public Boolean call() throws Exception {
-                    // need to reload mex since we're in a different transaction
-                    PartnerRoleMessageExchange pmex = (PartnerRoleMessageExchange) _ode._server.getEngine().getMessageExchange(outstanding.getMessageExchangeId());
-                    if (pmex == null) {
-                        __log.warn("Received a response for unknown partner role message exchange " + pmex.getMessageExchangeId());
-                        return Boolean.FALSE;
-                    }
-                    String mapperName = pmex.getProperty(Mapper.class.getName());
-                    Mapper mapper = mapperName == null ? _ode.getDefaultMapper() : _ode.getMapper(mapperName);
-                    if (mapper == null) {
-                        String errmsg = "Mapper not found.";
-                        __log.error(errmsg);
-                        pmex.replyWithFailure(FailureType.FORMAT_ERROR, errmsg, null);
-                    } else {
-                        try {
-                            Fault jbiFlt = jbiMex.getFault();
-                            if (jbiFlt != null) {
-                                javax.wsdl.Fault wsdlFlt = mapper.toFaultType(jbiFlt, (Collection<javax.wsdl.Fault>) pmex.getOperation().getFaults().values());
+     
+        String mapperName = pmex.getProperty(Mapper.class.getName());
+        Mapper mapper = mapperName == null ? _ode.getDefaultMapper() : _ode.getMapper(mapperName);
+        if (mapper == null) {
+            String errmsg = "Mapper not found.";
+            __log.error(errmsg);
+            pmex.replyWithFailure(FailureType.FORMAT_ERROR, errmsg, null);
+        } else {
+            try {
+                Fault jbiFlt = jbiMex.getFault();
+                if (jbiFlt != null) {
+                    javax.wsdl.Fault wsdlFlt = mapper.toFaultType(jbiFlt, (Collection<javax.wsdl.Fault>) pmex
+                            .getOperation().getFaults().values());
                                 if (wsdlFlt == null) {
                                     pmex.replyWithFailure(FailureType.FORMAT_ERROR, "Unrecognized fault message.", null);
                                 } else {
@@ -234,30 +186,25 @@
                                                 + wsdlFlt.getName(), null);
                                     }
                                 }
-                            } else {
-                                Message response = pmex.createMessage(pmex.getOperation().getOutput().getMessage().getQName());
-                                mapper.toODE(response, jbiMex.getOutMessage(), pmex.getOperation().getOutput().getMessage());
-                                pmex.reply(response);
-                            }
-                        } catch (MessageTranslationException mte) {
-                            __log.error("Error translating message.", mte);
-                            pmex.replyWithFailure(FailureType.FORMAT_ERROR, mte.getMessage(), null);
-                        }
-                    }
-                    return null;
+                } else {
+                    Message response = pmex.createMessage(pmex.getOperation().getOutput().getMessage().getQName());
+                    mapper.toODE(response, jbiMex.getOutMessage(), pmex.getOperation().getOutput().getMessage());
+                    pmex.reply(response);
                 }
-            });
-        } catch (Exception ex) {
-            __log.error("error delivering RESPONSE: ", ex);
-
+            } catch (MessageTranslationException mte) {
+                __log.error("Error translating message.", mte);
+                pmex.replyWithFailure(FailureType.FORMAT_ERROR, mte.getMessage(), null);
+            }
         }
     }
 
-    public void setResponseTimeout(long timeout) {
-    	_responseTimeout = timeout;
+    protected void doSendJBI(final PartnerRoleMessageExchange odeMex, final MessageExchange jbiMex) {
+        try {
+            _ode.getChannel().send(jbiMex);
+        } catch (MessagingException e) {
+            String errmsg = "Error sending request-only message to JBI for ODE mex " + odeMex;
+            __log.error(errmsg, e);
+        }
     }
 
-    public long getResponseTimeout() {
-    	return _responseTimeout;
-    }
 }

Modified: ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeContext.java
URL: http://svn.apache.org/viewvc/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeContext.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeContext.java (original)
+++ ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeContext.java Wed Sep  5 22:46:42 2007
@@ -40,12 +40,12 @@
 import org.apache.ode.bpel.engine.BpelServerImpl;
 import org.apache.ode.bpel.iapi.Endpoint;
 import org.apache.ode.bpel.iapi.ProcessConf;
+import org.apache.ode.bpel.iapi.Scheduler;
 import org.apache.ode.bpel.o.OPartnerLink;
 import org.apache.ode.bpel.o.OProcess;
 import org.apache.ode.bpel.o.Serializer;
 import org.apache.ode.jbi.msgmap.Mapper;
 import org.apache.ode.jbi.util.WSDLFlattener;
-import org.apache.ode.scheduler.simple.SimpleScheduler;
 import org.apache.ode.store.ProcessStoreImpl;
 import org.w3c.dom.Document;
 
@@ -81,7 +81,7 @@
 
     MessageExchangeContextImpl _mexContext;
 
-    SimpleScheduler _scheduler;
+    Scheduler _scheduler;
 
     ExecutorService _executorService;
 
@@ -95,6 +95,7 @@
 
     /** Mapping of Endpoint to OdeService */
     private Map<Endpoint, OdeService> _activeOdeServices = new ConcurrentHashMap<Endpoint, OdeService>();
+
 
     /**
      * Gets the delivery channel.