You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/09/06 07:46:57 UTC
svn commit: r573153 [8/9] - in /ode/trunk: ./
axis2/src/main/java/org/apache/ode/axis2/ bpel-api/src/
bpel-api/src/main/java/org/apache/ode/bpel/explang/
bpel-api/src/main/java/org/apache/ode/bpel/iapi/
bpel-api/src/main/java/org/apache/ode/bpel/pmapi/...
Modified: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java (original)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java Wed Sep 5 22:46:42 2007
@@ -41,359 +41,366 @@
* Hibernate-based {@link ProcessInstanceDAO} implementation.
*/
class ProcessInstanceDaoImpl extends HibernateDao implements ProcessInstanceDAO {
- /** Query for removing selectors. */
- private static final String QRY_DELSELECTORS = "delete from " +
- HCorrelatorSelector.class.getName() + " where instance = ?";
-
- private static final String QRY_VARIABLES = "from " + HXmlData.class.getName()
- + " as x where x.name = ? and x.scope.scopeModelId = ? and x.scope.instance.id = ?";
-
- private static final String QRY_RECOVERIES = "from " + HActivityRecovery.class.getName() +
- " AS x WHERE x.instance.id = ?";
-
- private HProcessInstance _instance;
-
- private ScopeDAO _root;
-
- public ProcessInstanceDaoImpl(SessionManager sm, HProcessInstance instance) {
- super(sm, instance);
- _instance = instance;
- }
-
- /**
- * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getCreateTime()
- */
- public Date getCreateTime() {
- return _instance.getCreated();
- }
-
- public void setFault(FaultDAO fault) {
- _instance.setFault(((FaultDAOImpl)fault)._self);
- getSession().update(_instance);
-
- }
-
-
- /**
- * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#setFault(javax.xml.namespace.QName, String, int, int, org.w3c.dom.Element)
- */
- public void setFault(QName name, String explanation, int lineNo, int activityId, Element faultData) {
- if (_instance.getFault() != null)
- getSession().delete(_instance.getFault());
-
- HFaultData fault = new HFaultData();
- fault.setName(QNameUtils.fromQName(name));
- fault.setExplanation(explanation);
- fault.setLineNo(lineNo);
- fault.setActivityId(activityId);
- if (faultData != null) {
- HLargeData ld = new HLargeData(DOMUtils.domToString(faultData));
- fault.setData(ld);
- getSession().save(ld);
- }
-
- _instance.setFault(fault);
- getSession().save(fault);
- getSession().update(_instance);
- }
- /**
- * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getFault()
- */
- public FaultDAO getFault() {
- if (_instance.getFault() == null) return null;
- else return new FaultDAOImpl(_sm, _instance.getFault());
- }
-
- /**
- * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getExecutionState()
- */
- public byte[] getExecutionState() {
- if (_instance.getJacobState() == null) return null;
- return _instance.getJacobState().getBinary();
- }
- /**
- * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#setExecutionState(byte[])
- */
- public void setExecutionState(byte[] bytes) {
- if (_instance.getJacobState() != null)
- getSession().delete(_instance.getJacobState());
- if (bytes.length > 0) {
- HLargeData ld = new HLargeData(bytes);
- _instance.setJacobState(ld);
- getSession().save(ld);
- }
- getSession().update(_instance);
- }
-
- /**
- * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getProcess()
- */
- public ProcessDAO getProcess() {
- return new ProcessDaoImpl(_sm, _instance.getProcess());
- }
-
- /**
- * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getRootScope()
- */
- public ScopeDAO getRootScope() {
- if (_root != null)
- return _root;
- Query rootQry = getSession().createFilter(_instance.getScopes(),
- "where this.parentScope is null");
- HScope hroot = (HScope)rootQry.uniqueResult();
- if (hroot == null)
- return null;
- return _root = new ScopeDaoImpl(_sm, hroot);
- }
- /**
- * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#setState(short)
- */
- public void setState(short state) {
+ /** Query for removing selectors. */
+ private static final String QRY_DELSELECTORS = "delete from " + HCorrelatorSelector.class.getName() + " where instance = ?";
+
+ private static final String QRY_VARIABLES = "from " + HXmlData.class.getName()
+ + " as x where x.name = ? and x.scope.scopeModelId = ? and x.scope.instance.id = ?";
+
+ private static final String QRY_RECOVERIES = "from " + HActivityRecovery.class.getName() + " AS x WHERE x.instance.id = ?";
+
+ private HProcessInstance _instance;
+
+ private ScopeDAO _root;
+
+ public ProcessInstanceDaoImpl(SessionManager sm, HProcessInstance instance) {
+ super(sm, instance);
+ _instance = instance;
+ }
+
+ /**
+ * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getCreateTime()
+ */
+ public Date getCreateTime() {
+ return _instance.getCreated();
+ }
+
+ public void setFault(FaultDAO fault) {
+ _instance.setFault(((FaultDAOImpl) fault)._self);
+ getSession().update(_instance);
+
+ }
+
+ /**
+ * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#setFault(javax.xml.namespace.QName, String, int, int, org.w3c.dom.Element)
+ */
+ public void setFault(QName name, String explanation, int lineNo, int activityId, Element faultData) {
+ if (_instance.getFault() != null)
+ getSession().delete(_instance.getFault());
+
+ HFaultData fault = new HFaultData();
+ fault.setName(QNameUtils.fromQName(name));
+ fault.setExplanation(explanation);
+ fault.setLineNo(lineNo);
+ fault.setActivityId(activityId);
+ if (faultData != null) {
+ HLargeData ld = new HLargeData(DOMUtils.domToString(faultData));
+ fault.setData(ld);
+ getSession().save(ld);
+ }
+
+ _instance.setFault(fault);
+ getSession().save(fault);
+ getSession().update(_instance);
+ }
+
+ /**
+ * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getFault()
+ */
+ public FaultDAO getFault() {
+ if (_instance.getFault() == null)
+ return null;
+ else
+ return new FaultDAOImpl(_sm, _instance.getFault());
+ }
+
+ /**
+ * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getExecutionState()
+ */
+ public byte[] getExecutionState() {
+ if (_instance.getJacobState() == null)
+ return null;
+ return _instance.getJacobState().getBinary();
+ }
+
+ /**
+ * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#setExecutionState(byte[])
+ */
+ public void setExecutionState(byte[] bytes) {
+ if (_instance.getJacobState() != null)
+ getSession().delete(_instance.getJacobState());
+ if (bytes.length > 0) {
+ HLargeData ld = new HLargeData(bytes);
+ _instance.setJacobState(ld);
+ getSession().save(ld);
+ }
+ getSession().update(_instance);
+ }
+
+ /**
+ * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getProcess()
+ */
+ public ProcessDAO getProcess() {
+ return new ProcessDaoImpl(_sm, _instance.getProcess());
+ }
+
+ /**
+ * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getRootScope()
+ */
+ public ScopeDAO getRootScope() {
+ if (_root != null)
+ return _root;
+ Query rootQry = getSession().createFilter(_instance.getScopes(), "where this.parentScope is null");
+ HScope hroot = (HScope) rootQry.uniqueResult();
+ if (hroot == null)
+ return null;
+ return _root = new ScopeDaoImpl(_sm, hroot);
+ }
+
+ /**
+ * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#setState(short)
+ */
+ public void setState(short state) {
_instance.setPreviousState(_instance.getState());
- _instance.setState(state);
- if(state==ProcessState.STATE_TERMINATED) {
- clearSelectors();
- }
- getSession().update(_instance);
- }
- /**
- * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getState()
- */
- public short getState() {
- return _instance.getState();
- }
- /**
- * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getPreviousState()
- */
- public short getPreviousState() {
- return _instance.getPreviousState();
- }
-
-
- public ScopeDAO createScope(ScopeDAO parentScope, String name, int scopeModelId) {
- HScope scope = new HScope();
- scope.setParentScope(parentScope != null
- ? (HScope)((ScopeDaoImpl)parentScope).getHibernateObj()
- : null);
- scope.setName(name);
- scope.setScopeModelId(scopeModelId);
- scope.setState(ScopeStateEnum.ACTIVE.toString());
- scope.setInstance(_instance);
- scope.setCreated(new Date());
- _instance.getScopes().add(scope);
- getSession().save(scope);
-
- return new ScopeDaoImpl(_sm, scope);
- }
-
- /**
- * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getInstanceId()
- */
- public Long getInstanceId() {
- return _instance.getId();
- }
-
- public ScopeDAO getScope(Long scopeInstanceId) {
- Long id = Long.valueOf(scopeInstanceId);
- HScope scope = (HScope)getSession().get(HScope.class, id);
- return scope != null
- ? new ScopeDaoImpl(_sm, scope)
- : null;
- }
-
- /**
- * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getScopes(java.lang.String)
- */
- @SuppressWarnings("unchecked")
- public Collection<ScopeDAO> getScopes(String scopeName) {
- Collection<HScope> hscopes;
- if (scopeName != null) {
- Query filter = _sm.getSession().createFilter(_instance.getScopes(),
- "where this.name=?");
- filter.setString(0,scopeName);
- hscopes = filter.list();
- } else
- hscopes = _instance.getScopes();
- ArrayList<ScopeDAO> ret = new ArrayList<ScopeDAO>();
- CollectionsX.transform(ret, hscopes, new UnaryFunction<HScope,ScopeDAO> () {
- public ScopeDAO apply(HScope x) {
- return new ScopeDaoImpl(_sm, x);
- }
- });
- return ret;
- }
-
- public Collection<ScopeDAO> getScopes() {
- return getScopes(null);
- }
-
- /**
- * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getInstantiatingCorrelator()
- */
- public CorrelatorDAO getInstantiatingCorrelator() {
- return new CorrelatorDaoImpl(_sm, _instance.getInstantiatingCorrelator());
- }
-
- /**
- * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getLastActiveTime()
- */
- public Date getLastActiveTime() {
- return _instance.getLastActiveTime();
- }
- /**
- * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#setLastActiveTime(java.util.Date)
- */
- public void setLastActiveTime(Date dt) {
- _instance.setLastActiveTime(dt);
- }
-
- public Set<CorrelationSetDAO> getCorrelationSets() {
- Set<CorrelationSetDAO> results = new HashSet<CorrelationSetDAO>();
-
- for (HCorrelationSet hCorrelationSet : _instance.getCorrelationSets()) {
- results.add(new CorrelationSetDaoImpl(_sm, hCorrelationSet));
- }
-
- return results;
- }
-
- public CorrelationSetDAO getCorrelationSet(String name) {
- for (HCorrelationSet hCorrelationSet : _instance.getCorrelationSets()) {
- if (hCorrelationSet.getName().equals(name))
- return new CorrelationSetDaoImpl(_sm, hCorrelationSet);
- }
- return null;
- }
-
- /**
- * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getVariables(java.lang.String, int)
- */
- public XmlDataDAO[] getVariables(String variableName, int scopeModelId) {
- List<XmlDataDAO> results = new ArrayList<XmlDataDAO>();
-
- Iterator iter;
- Query qry = getSession().createQuery(QRY_VARIABLES);
- qry.setString(0, variableName);
- qry.setInteger(1, scopeModelId);
- qry.setLong(2, _instance.getId());
- iter = qry.iterate();
-
- while(iter.hasNext()) {
- results.add(new XmlDataDaoImpl(_sm, (HXmlData)iter.next()));
- }
- Hibernate.close(iter);
-
- return results.toArray(new XmlDataDAO[results.size()]);
- }
-
- /**
- * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#finishCompletion()
- */
- public void finishCompletion() {
- // make sure we have completed.
- assert (ProcessState.isFinished(this.getState()));
- // let our process know that we've done our work.
- this.getProcess().instanceCompleted(this);
- }
-
- public void delete() {
- _sm.getSession().delete(_instance);
- }
-
- public void insertBpelEvent(ProcessInstanceEvent event) {
- // Defer to the BpelDAOConnectionImpl
- BpelDAOConnectionImpl._insertBpelEvent(_sm.getSession(),event, this.getProcess(), this);
- }
-
- public EventsFirstLastCountTuple getEventsFirstLastCount() {
-
- // Using a criteria, find the min,max, and count of event tstamps.
- Criteria c = _sm.getSession().createCriteria(HBpelEvent.class);
- c.add(Restrictions.eq("instance",_instance));
- c.setProjection(Projections.projectionList().add(Projections.min("tstamp"))
- .add(Projections.max("tstamp"))
- .add(Projections.count("tstamp")));
-
- Object[] ret = (Object[]) c.uniqueResult();
- EventsFirstLastCountTuple flc = new EventsFirstLastCountTuple();
- flc.first = (Date) ret[0];
- flc.last = (Date) ret[1];
- flc.count = (Integer)ret[2];
- return flc;
- }
-
- public long genMonotonic() {
- long seq = _instance.getSequence()+1;
- _instance.setSequence(seq);
- return seq;
- }
-
- protected void clearSelectors() {
- Query q = getSession().createQuery(QRY_DELSELECTORS);
- q.setEntity(0, _instance);
- q.executeUpdate();
- }
-
- public int getActivityFailureCount() {
- return _instance.getActivityFailureCount();
- }
-
- public Date getActivityFailureDateTime() {
- return _instance.getActivityFailureDateTime();
- }
-
- public Collection<ActivityRecoveryDAO> getActivityRecoveries() {
- List<ActivityRecoveryDAO> results = new ArrayList<ActivityRecoveryDAO>();
- Query qry = getSession().createQuery(QRY_RECOVERIES);
- qry.setLong(0, _instance.getId());
- Iterator iter = qry.iterate();
- while (iter.hasNext())
- results.add(new ActivityRecoveryDaoImpl(_sm, (HActivityRecovery) iter.next()));
- Hibernate.close(iter);
- return results;
- }
-
- public void createActivityRecovery(String channel, long activityId, String reason,
- Date dateTime, Element data, String[] actions, int retries) {
- HActivityRecovery recovery = new HActivityRecovery();
- recovery.setInstance(_instance);
- recovery.setChannel(channel);
- recovery.setActivityId(activityId);
- recovery.setReason(reason);
- recovery.setDateTime(dateTime);
- recovery.setRetries(retries);
- if (data != null) {
- HLargeData ld = new HLargeData(DOMUtils.domToString(data));
- recovery.setDetails(ld);
- getSession().save(ld);
- }
- String list = actions[0];
- for (int i = 1; i < actions.length; ++i)
- list += " " + actions[i];
- recovery.setActions(list);
- _instance.getActivityRecoveries().add(recovery);
- getSession().save(recovery);
- _instance.setActivityFailureDateTime(dateTime);
- _instance.setActivityFailureCount(_instance.getActivityFailureCount() + 1);
- getSession().update(_instance);
- }
-
- /**
- * Delete previously registered activity recovery.
- */
- public void deleteActivityRecovery(String channel) {
- for (HActivityRecovery recovery : _instance.getActivityRecoveries()) {
- if (recovery.getChannel().equals(channel)) {
- getSession().delete(recovery);
- _instance.setActivityFailureCount(_instance.getActivityFailureCount() - 1);
+ _instance.setState(state);
+ if (state == ProcessState.STATE_TERMINATED) {
+ clearSelectors();
+ }
getSession().update(_instance);
- return;
- }
}
- }
-
- public BpelDAOConnection getConnection() {
- return new BpelDAOConnectionImpl(_sm);
- }
+
+ /**
+ * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getState()
+ */
+ public short getState() {
+ return _instance.getState();
+ }
+
+ /**
+ * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getPreviousState()
+ */
+ public short getPreviousState() {
+ return _instance.getPreviousState();
+ }
+
+ public ScopeDAO createScope(ScopeDAO parentScope, String name, int scopeModelId) {
+ HScope scope = new HScope();
+ scope.setParentScope(parentScope != null ? (HScope) ((ScopeDaoImpl) parentScope).getHibernateObj() : null);
+ scope.setName(name);
+ scope.setScopeModelId(scopeModelId);
+ scope.setState(ScopeStateEnum.ACTIVE.toString());
+ scope.setInstance(_instance);
+ scope.setCreated(new Date());
+ _instance.getScopes().add(scope);
+ getSession().save(scope);
+
+ return new ScopeDaoImpl(_sm, scope);
+ }
+
+ /**
+ * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getInstanceId()
+ */
+ public Long getInstanceId() {
+ return _instance.getId();
+ }
+
+ public ScopeDAO getScope(Long scopeInstanceId) {
+ Long id = Long.valueOf(scopeInstanceId);
+ HScope scope = (HScope) getSession().get(HScope.class, id);
+ return scope != null ? new ScopeDaoImpl(_sm, scope) : null;
+ }
+
+ /**
+ * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getScopes(java.lang.String)
+ */
+ @SuppressWarnings("unchecked")
+ public Collection<ScopeDAO> getScopes(String scopeName) {
+ Collection<HScope> hscopes;
+ if (scopeName != null) {
+ Query filter = _sm.getSession().createFilter(_instance.getScopes(), "where this.name=?");
+ filter.setString(0, scopeName);
+ hscopes = filter.list();
+ } else
+ hscopes = _instance.getScopes();
+ ArrayList<ScopeDAO> ret = new ArrayList<ScopeDAO>();
+ CollectionsX.transform(ret, hscopes, new UnaryFunction<HScope, ScopeDAO>() {
+ public ScopeDAO apply(HScope x) {
+ return new ScopeDaoImpl(_sm, x);
+ }
+ });
+ return ret;
+ }
+
+ public Collection<ScopeDAO> getScopes() {
+ return getScopes(null);
+ }
+
+ /**
+ * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getInstantiatingCorrelator()
+ */
+ public CorrelatorDAO getInstantiatingCorrelator() {
+ return new CorrelatorDaoImpl(_sm, _instance.getInstantiatingCorrelator());
+ }
+
+ /**
+ * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getLastActiveTime()
+ */
+ public Date getLastActiveTime() {
+ return _instance.getLastActiveTime();
+ }
+
+ /**
+ * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#setLastActiveTime(java.util.Date)
+ */
+ public void setLastActiveTime(Date dt) {
+ _instance.setLastActiveTime(dt);
+ }
+
+ public Set<CorrelationSetDAO> getCorrelationSets() {
+ Set<CorrelationSetDAO> results = new HashSet<CorrelationSetDAO>();
+
+ for (HCorrelationSet hCorrelationSet : _instance.getCorrelationSets()) {
+ results.add(new CorrelationSetDaoImpl(_sm, hCorrelationSet));
+ }
+
+ return results;
+ }
+
+ public CorrelationSetDAO getCorrelationSet(String name) {
+ for (HCorrelationSet hCorrelationSet : _instance.getCorrelationSets()) {
+ if (hCorrelationSet.getName().equals(name))
+ return new CorrelationSetDaoImpl(_sm, hCorrelationSet);
+ }
+ return null;
+ }
+
+ /**
+ * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getVariables(java.lang.String, int)
+ */
+ public XmlDataDAO[] getVariables(String variableName, int scopeModelId) {
+ List<XmlDataDAO> results = new ArrayList<XmlDataDAO>();
+
+ Iterator iter;
+ Query qry = getSession().createQuery(QRY_VARIABLES);
+ qry.setString(0, variableName);
+ qry.setInteger(1, scopeModelId);
+ qry.setLong(2, _instance.getId());
+ iter = qry.iterate();
+
+ while (iter.hasNext()) {
+ results.add(new XmlDataDaoImpl(_sm, (HXmlData) iter.next()));
+ }
+ Hibernate.close(iter);
+
+ return results.toArray(new XmlDataDAO[results.size()]);
+ }
+
+ /**
+ * @see org.apache.ode.bpel.dao.ProcessInstanceDAO#finishCompletion()
+ */
+ public void finishCompletion() {
+ // make sure we have completed.
+ assert (ProcessState.isFinished(this.getState()));
+ // let our process know that we've done our work.
+ this.getProcess().instanceCompleted(this);
+ }
+
+ public void delete() {
+ _sm.getSession().delete(_instance);
+ }
+
+ public void insertBpelEvent(ProcessInstanceEvent event) {
+ // Defer to the BpelDAOConnectionImpl
+ BpelDAOConnectionImpl._insertBpelEvent(_sm.getSession(), event, this.getProcess(), this);
+ }
+
+ public EventsFirstLastCountTuple getEventsFirstLastCount() {
+
+ // Using a criteria, find the min,max, and count of event tstamps.
+ Criteria c = _sm.getSession().createCriteria(HBpelEvent.class);
+ c.add(Restrictions.eq("instance", _instance));
+ c.setProjection(Projections.projectionList().add(Projections.min("tstamp")).add(Projections.max("tstamp")).add(
+ Projections.count("tstamp")));
+
+ Object[] ret = (Object[]) c.uniqueResult();
+ EventsFirstLastCountTuple flc = new EventsFirstLastCountTuple();
+ flc.first = (Date) ret[0];
+ flc.last = (Date) ret[1];
+ flc.count = (Integer) ret[2];
+ return flc;
+ }
+
+ public long genMonotonic() {
+ long seq = _instance.getSequence() + 1;
+ _instance.setSequence(seq);
+ return seq;
+ }
+
+ protected void clearSelectors() {
+ Query q = getSession().createQuery(QRY_DELSELECTORS);
+ q.setEntity(0, _instance);
+ q.executeUpdate();
+ }
+
+ public int getActivityFailureCount() {
+ return _instance.getActivityFailureCount();
+ }
+
+ public Date getActivityFailureDateTime() {
+ return _instance.getActivityFailureDateTime();
+ }
+
+ public Collection<ActivityRecoveryDAO> getActivityRecoveries() {
+ List<ActivityRecoveryDAO> results = new ArrayList<ActivityRecoveryDAO>();
+ Query qry = getSession().createQuery(QRY_RECOVERIES);
+ qry.setLong(0, _instance.getId());
+ Iterator iter = qry.iterate();
+ while (iter.hasNext())
+ results.add(new ActivityRecoveryDaoImpl(_sm, (HActivityRecovery) iter.next()));
+ Hibernate.close(iter);
+ return results;
+ }
+
+ public void createActivityRecovery(String channel, long activityId, String reason, Date dateTime, Element data,
+ String[] actions, int retries) {
+ HActivityRecovery recovery = new HActivityRecovery();
+ recovery.setInstance(_instance);
+ recovery.setChannel(channel);
+ recovery.setActivityId(activityId);
+ recovery.setReason(reason);
+ recovery.setDateTime(dateTime);
+ recovery.setRetries(retries);
+ if (data != null) {
+ HLargeData ld = new HLargeData(DOMUtils.domToString(data));
+ recovery.setDetails(ld);
+ getSession().save(ld);
+ }
+ String list = actions[0];
+ for (int i = 1; i < actions.length; ++i)
+ list += " " + actions[i];
+ recovery.setActions(list);
+ _instance.getActivityRecoveries().add(recovery);
+ getSession().save(recovery);
+ _instance.setActivityFailureDateTime(dateTime);
+ _instance.setActivityFailureCount(_instance.getActivityFailureCount() + 1);
+ getSession().update(_instance);
+ }
+
+ /**
+ * Delete previously registered activity recovery.
+ */
+ public void deleteActivityRecovery(String channel) {
+ for (HActivityRecovery recovery : _instance.getActivityRecoveries()) {
+ if (recovery.getChannel().equals(channel)) {
+ getSession().delete(recovery);
+ _instance.setActivityFailureCount(_instance.getActivityFailureCount() - 1);
+ getSession().update(_instance);
+ return;
+ }
+ }
+ }
+
+ public BpelDAOConnection getConnection() {
+ return new BpelDAOConnectionImpl(_sm);
+ }
+
+ public int getExecutionStateCounter() {
+ return _instance.getExecutionStateCounter();
+ }
+
+ public void setExecutionStateCounter(int stateCounter) {
+ _instance.setExecutionStateCounter(stateCounter);
+
+ }
}
Modified: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessage.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessage.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessage.java (original)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessage.java Wed Sep 5 22:46:42 2007
@@ -27,20 +27,9 @@
*/
public class HMessage extends HObject {
- private HMessageExchange _mex;
private String _type;
private HLargeData _data;
-
- public void setMessageExchange(HMessageExchange mex) {
- _mex = mex;
- }
-
- /** @hibernate.many-to-one column="MEX" */
- public HMessageExchange getMessageExchange() {
- return _mex;
- }
-
public void setType(String type) {
_type = type;
}
Modified: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessageExchange.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessageExchange.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessageExchange.java (original)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessageExchange.java Wed Sep 5 22:46:42 2007
@@ -18,11 +18,12 @@
*/
package org.apache.ode.daohib.bpel.hobj;
-
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
+import org.apache.ode.bpel.iapi.MessageExchange.AckType;
+
/**
* Hibernate-managed table for keeping track of message exchanges.
*
@@ -47,7 +48,7 @@
private HMessage _request;
private HMessage _response;
-
+
private HPartnerLink _partnerLink;
private String _clientKey;
@@ -70,10 +71,22 @@
private String _callee;
- private String _pipedMessageExchangeId;
+ private String _p2pPeer;
private Map<String, String> _properties = new HashMap<String, String>();
+ private long _timeout;
+
+ private String _istyle;
+
+ private String _failureType;
+
+ private String _mexId;
+
+ private String _ackType;
+
+ private String _pipedPid;
+
/**
*
*/
@@ -82,6 +95,20 @@
}
/**
+ *
+ * @hibernate.property
+ * @hibernate.column name="MEXID" not-null="true" unique="true"
+ */
+
+ public String getMexId() {
+ return _mexId;
+ }
+
+ public void setMexId(String mexId) {
+ _mexId = mexId;
+ }
+
+ /**
* @hibernate.property column="PORT_TYPE"
*/
public String getPortType() {
@@ -299,8 +326,7 @@
}
/**
- * @hibernate.map name="properties" table="BPEL_MEX_PROPS" lazy="false"
- * cascade="delete"
+ * @hibernate.map name="properties" table="BPEL_MEX_PROPS" lazy="false" cascade="delete"
* @hibernate.collection-key column="MEX"
* @hibernate.collection-index column="NAME" type="string"
* @hibernate.collection-element column="VALUE" type="string" length="8000"
@@ -318,20 +344,80 @@
}
/**
- * @hibernate.many-to-one column="PARTNERLINK"
+ * @hibernate.many-to-one column="PARTNERLINK"
*/
public HPartnerLink getPartnerLink() {
return _partnerLink;
}
/**
- * @hibernate.property column="PIPED_ID"
+ * @hibernate.property column="TIMEOUT"
+ *
+ */
+ public long getTimeout() {
+ return _timeout;
+ }
+
+ public void setTimeout(long timeout) {
+ _timeout = timeout;
+ }
+
+ /**
+ * @hibernate.property column="ISTYLE"
+ */
+ public String getInvocationStyle() {
+ return _istyle;
+ }
+
+ /**
+ * @hibernate.property column="P2P_PEER"
+ * @return
+ */
+ public String getPipedMessageExchange() {
+ return _p2pPeer;
+ }
+
+ public void setPipedMesageExchange(String p2ppeer) {
+ _p2pPeer = p2ppeer;
+ }
+
+ public void setFailureType(String failureType) {
+ _failureType = failureType;
+ }
+
+ /**
+ * @hibernate.property column="FAILURE_TYPE"
+ * @return
+ */
+ public String getFailureType() {
+ return _failureType;
+ }
+
+ public void setInvocationStyle(String invocationStyle) {
+ _istyle = invocationStyle;
+ }
+
+ /**
+ * @hibernate.property column="ACK_TYPE"
+ * @return
+ */
+ public String getAckType() {
+ return _ackType;
+ }
+
+ public void setAckType(String ackType) {
+ _ackType = ackType;
+ }
+
+ /**
+ * @hibernate.property column="PIPED_PID"
+ * @return
*/
- public String getPipedMessageExchangeId() {
- return _pipedMessageExchangeId;
+ public String getPipedPID() {
+ return _pipedPid;
}
- public void setPipedMessageExchangeId(String pipedMessageExchangeId) {
- _pipedMessageExchangeId = pipedMessageExchangeId;
+ public void setPipedPID(String ppid) {
+ _pipedPid = ppid;
}
}
Modified: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java (original)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java Wed Sep 5 22:46:42 2007
@@ -66,6 +66,8 @@
private long _seq;
+ private int _execStateCounter;
+
/**
*
*/
@@ -263,6 +265,19 @@
public void setActivityFailureDateTime(Date dateTime) {
_activityFailureDateTime = dateTime;
+ }
+
+ /**
+ * @hibernate.property column="EXEC_STATE_COUNT"
+ * @return
+ */
+ public int getExecutionStateCounter() {
+ return _execStateCounter;
+ }
+
+ public void setExecutionStateCounter(int stateCounter) {
+ _execStateCounter = stateCounter;
+
}
}
Modified: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ql/HibernateInstancesQueryCompiler.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ql/HibernateInstancesQueryCompiler.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ql/HibernateInstancesQueryCompiler.java (original)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ql/HibernateInstancesQueryCompiler.java Wed Sep 5 22:46:42 2007
@@ -334,7 +334,6 @@
try {
value.setValue(ISO8601DateParser.parse((String) value.getValue()));
} catch (ParseException ex) {
- // TODO
throw new RuntimeException(ex);
}
}
@@ -521,7 +520,6 @@
objValues.add(ISO8601DateParser.parse((String) value.getValue()));
}
} catch (ParseException ex) {
- // TODO
throw new RuntimeException(ex);
}
} else {
Modified: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/BPELDAOConnectionImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/BPELDAOConnectionImpl.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/BPELDAOConnectionImpl.java (original)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/BPELDAOConnectionImpl.java Wed Sep 5 22:46:42 2007
@@ -69,8 +69,8 @@
_em = null;
}
- public MessageExchangeDAO createMessageExchange(char dir) {
- MessageExchangeDAOImpl ret = new MessageExchangeDAOImpl(dir);
+ public MessageExchangeDAO createMessageExchange(String mexId, char dir) {
+ MessageExchangeDAOImpl ret = new MessageExchangeDAOImpl(mexId, dir);
_em.persist(ret);
return ret;
}
@@ -152,7 +152,11 @@
}
public MessageExchangeDAO getMessageExchange(String mexid) {
- return _em.find(MessageExchangeDAOImpl.class, mexid);
+ List l = _em.createQuery("select x from MessageExchangeDAOImpl x where x._id = ?1")
+ .setParameter(1, mexid).getResultList();
+ if (l.size() == 0) return null;
+ MessageExchangeDAOImpl m = (MessageExchangeDAOImpl) l.get(0);
+ return m;
}
public EntityManager getEntityManager() {
Modified: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageDAOImpl.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageDAOImpl.java (original)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageDAOImpl.java Wed Sep 5 22:46:42 2007
@@ -20,25 +20,21 @@
package org.apache.ode.dao.jpa;
-import org.apache.ode.bpel.dao.MessageDAO;
-import org.apache.ode.bpel.dao.MessageExchangeDAO;
-import org.apache.ode.utils.DOMUtils;
-import org.w3c.dom.Element;
-
import javax.persistence.Basic;
-import javax.persistence.CascadeType;
import javax.persistence.Column;
import javax.persistence.Entity;
-import javax.persistence.FetchType;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Lob;
-import javax.persistence.ManyToOne;
import javax.persistence.Table;
import javax.persistence.Transient;
import javax.xml.namespace.QName;
+import org.apache.ode.bpel.dao.MessageDAO;
+import org.apache.ode.utils.DOMUtils;
+import org.w3c.dom.Element;
+
@Entity
@Table(name="ODE_MESSAGE")
@@ -53,15 +49,12 @@
private String _data;
@Transient
private Element _element;
- @ManyToOne(fetch=FetchType.LAZY,cascade={CascadeType.ALL}) @Column(name="MESSAGE_EXCHANGE_ID")
- private MessageExchangeDAOImpl _messageExchange;
public MessageDAOImpl() {
}
public MessageDAOImpl(QName type, MessageExchangeDAOImpl me) {
_type = type.toString();
- _messageExchange = me;
}
public Element getData() {
@@ -74,10 +67,6 @@
}
return _element;
- }
-
- public MessageExchangeDAO getMessageExchange() {
- return _messageExchange;
}
public QName getType() {
Modified: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java (original)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java Wed Sep 5 22:46:42 2007
@@ -25,6 +25,10 @@
import org.apache.ode.bpel.dao.PartnerLinkDAO;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.iapi.InvocationStyle;
+import org.apache.ode.bpel.iapi.MessageExchange.AckType;
+import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
import org.apache.ode.utils.DOMUtils;
import org.apache.ode.utils.uuid.UUID;
import org.w3c.dom.Element;
@@ -91,6 +95,9 @@
private String _correlationKeys;
@Basic @Column(name="PIPED_ID")
private String _pipedMessageExchangeId;
+
+ @Basic @Column(name="ACK_TYPE")
+ private String _ackType;
@OneToMany(targetEntity=MexProperty.class,mappedBy="_mex",fetch=FetchType.EAGER,cascade={CascadeType.ALL})
private Collection<MexProperty> _props = new ArrayList<MexProperty>();
@@ -100,19 +107,31 @@
private PartnerLinkDAOImpl _partnerLink;
@ManyToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) @Column(name="PROCESS_ID")
private ProcessDAOImpl _process;
- @OneToOne(fetch=FetchType.LAZY,cascade={CascadeType.ALL}) @Column(name="REQUEST_MESSAGE_ID")
+ @OneToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) @Column(name="REQUEST_MESSAGE_ID")
private MessageDAOImpl _request;
- @OneToOne(fetch=FetchType.LAZY,cascade={CascadeType.ALL}) @Column(name="RESPONSE_MESSAGE_ID")
+ @OneToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) @Column(name="RESPONSE_MESSAGE_ID")
private MessageDAOImpl _response;
@ManyToOne(fetch= FetchType.LAZY,cascade={CascadeType.PERSIST}) @Column(name="CORR_ID")
private CorrelatorDAOImpl _correlator;
+
+ @Basic @Column(name="ISTYLE")
+ private String _istyle;
+
+ @Basic @Column(name="TIMEOUT")
+ private long _timeout;
+
+ @Basic @Column(name="FAILURE_TYPE")
+ private String _failureType;
+
+ @Basic @Column(name="PIPED_PID")
+ private String _pipedPid;
public MessageExchangeDAOImpl() {}
- public MessageExchangeDAOImpl(char direction){
+ public MessageExchangeDAOImpl(String mexId, char direction){
_direction = direction;
- _id = new UUID().toString();
+ _id = mexId;
}
public MessageDAO createMessage(QName type) {
@@ -128,7 +147,7 @@
return _channel;
}
- public String getCorrelationId() {
+ public String getPartnersKey() {
return _correlationId;
}
@@ -223,8 +242,8 @@
return _response;
}
- public String getStatus() {
- return _status;
+ public Status getStatus() {
+ return _status == null ? null : Status.valueOf(_status);
}
public void setCallee(QName callee) {
@@ -235,7 +254,7 @@
_channel = channel;
}
- public void setCorrelationId(String correlationId) {
+ public void setPartnersKey(String correlationId) {
_correlationId = correlationId;
}
@@ -296,8 +315,8 @@
_response = (MessageDAOImpl)msg;
}
- public void setStatus(String status) {
- _status = status;
+ public void setStatus(Status status) {
+ _status = status == null ? null : status.toString();
}
public String getPipedMessageExchangeId() {
@@ -338,5 +357,46 @@
public void setCorrelator(CorrelatorDAOImpl correlator) {
_correlator = correlator;
+ }
+
+ public InvocationStyle getInvocationStyle() {
+ return _istyle == null ? null : InvocationStyle.valueOf(_istyle);
+ }
+
+
+ public long getTimeout() {
+ return _timeout;
+ }
+
+ public void setFailureType(FailureType failureType) {
+ _failureType = failureType == null ? null :failureType.toString();
+ }
+
+ public FailureType getFailureType() {
+ return _failureType == null ? null : FailureType.valueOf(_failureType);
+ }
+
+ public void setInvocationStyle(InvocationStyle invocationStyle) {
+ _istyle = invocationStyle == null ? null : invocationStyle.toString();
+ }
+
+ public void setTimeout(long timeout) {
+ _timeout = timeout;
+ }
+
+ public AckType getAckType() {
+ return _ackType == null ? null : AckType.valueOf(_ackType);
+ }
+
+ public void setAckType(AckType ackType) {
+ _ackType = ackType == null ? null :ackType.toString();
+ }
+
+ public QName getPipedPID() {
+ return _pipedPid == null ? null : QName.valueOf(_pipedPid);
+ }
+
+ public void setPipedPID(QName pipedPid) {
+ _pipedPid = pipedPid == null ? null : pipedPid.toString();
}
}
Modified: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java (original)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessInstanceDAOImpl.java Wed Sep 5 22:46:42 2007
@@ -77,10 +77,13 @@
private short _previousState;
@Lob @Column(name="EXECUTION_STATE")
private byte[] _executionState;
- @Basic @Column(name="SEQUENCE")
+ @Basic @Column(name="SEQUENCE")
private long _sequence;
@Basic @Column(name="DATE_CREATED")
private Date _dateCreated = new Date();
+
+ @Basic @Column(name="EXEC_STATE_COUNTER")
+ private int _execStateCounter;
@OneToOne(fetch=FetchType.LAZY,cascade={CascadeType.ALL}) @Column(name="ROOT_SCOPE_ID")
private ScopeDAOImpl _rootScope;
@@ -295,5 +298,11 @@
public BpelDAOConnection getConnection() {
return new BPELDAOConnectionImpl(getEM());
+ }
+ public int getExecutionStateCounter() {
+ return _execStateCounter;
+ }
+ public void setExecutionStateCounter(int stateCounter) {
+ _execStateCounter = stateCounter;
}
}
Modified: ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobMessages.java
URL: http://svn.apache.org/viewvc/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobMessages.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobMessages.java (original)
+++ ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobMessages.java Wed Sep 5 22:46:42 2007
@@ -44,24 +44,7 @@
methodName, className);
}
- // TODO
- public String msgContDeHydrationErr(String channel, String name) {
- throw new UnsupportedOperationException();
- }
- /**
- * Error indicating that a re-hydration of a saved _continuation object could
- * not be completed.
- *
- * @param channel
- * channel with the dangling _continuation
- * @param mlClassName
- * name of de-hydrated {@link org.apache.ode.jacob.ChannelListener} object
- *
- */
- public String msgContHydrationErr(String channel, String mlClassName) {
- throw new UnsupportedOperationException();
- }
/**
* Internal error indicating that a required client method was not accessible
Modified: ode/trunk/jbi/src/main/java/org/apache/ode/jbi/MessageExchangeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/MessageExchangeContextImpl.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/jbi/src/main/java/org/apache/ode/jbi/MessageExchangeContextImpl.java (original)
+++ ode/trunk/jbi/src/main/java/org/apache/ode/jbi/MessageExchangeContextImpl.java Wed Sep 5 22:46:42 2007
@@ -19,44 +19,73 @@
package org.apache.ode.jbi;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.ContextException;
+import org.apache.ode.bpel.iapi.EndpointReference;
+import org.apache.ode.bpel.iapi.InvocationStyle;
import org.apache.ode.bpel.iapi.MessageExchangeContext;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
+import org.apache.ode.bpel.iapi.PartnerRoleChannel;
import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
+
/**
- * Implementation of the ODE {@link org.apache.ode.bpel.iapi.MessageExchangeContext}
- * interface. This class is used by the ODE engine to make invocation on JBI
- * services provided by other engines (i.e. the BPEL engine is acting as
- * client/consumer of services).
+ * Implementation of the ODE {@link org.apache.ode.bpel.iapi.MessageExchangeContext} interface. This class is used by the ODE engine
+ * to make invocation on JBI services provided by other engines (i.e. the BPEL engine is acting as client/consumer of services).
*/
public class MessageExchangeContextImpl implements MessageExchangeContext {
- private static final Log __log = LogFactory
- .getLog(MessageExchangeContextImpl.class);
+ private static final Log __log = LogFactory.getLog(MessageExchangeContextImpl.class);
+
+ private OdeContext _ode;
+
+ /** Supported invocation styles. For now this is fixed. */
+ private static final Set<InvocationStyle> __supported;
+ static {
+ HashSet<InvocationStyle> supported = new HashSet<InvocationStyle>();
+ supported.add(InvocationStyle.UNRELIABLE);
+ __supported = Collections.unmodifiableSet(supported);
+ }
+
+ public MessageExchangeContextImpl(OdeContext ode) {
+ _ode = ode;
+ }
+
+
+ public void invokePartnerUnreliable(PartnerRoleMessageExchange mex) throws ContextException {
+ _ode._consumer.invokePartner(mex);
+ }
+
+ public void invokePartnerReliable(PartnerRoleMessageExchange mex) throws ContextException {
+ throw new ContextException("Unsupported.");
- private OdeContext _ode;
+ }
+
+ public void invokePartnerTransacted(PartnerRoleMessageExchange mex) throws ContextException {
+ throw new ContextException("Unsupported.");
+
+ }
+
+
+ public void cancel(PartnerRoleMessageExchange mex) throws ContextException {
+ // What can we do in JBI to cancel? --- not much.
+
+ }
+
+ public Set<InvocationStyle> getSupportedInvocationStyle(PartnerRoleChannel prc, EndpointReference partnerEpr) {
+ return __supported ;
+ }
+
+
+ public void onMyRoleMessageExchangeStateChanged(MyRoleMessageExchange myRoleMex) throws BpelEngineException {
+ }
+
- public MessageExchangeContextImpl(OdeContext ode) {
- _ode = ode;
- }
-
- public void onAsyncReply(MyRoleMessageExchange myrolemex)
- throws BpelEngineException {
- OdeService ode = _ode.getService(myrolemex.getServiceName());
- if (ode != null)
- ode.onResponse(myrolemex);
- else {
- __log.error("No active service for message exchange: " + myrolemex);
- }
- }
-
- public void invokePartner(PartnerRoleMessageExchange mex) throws ContextException {
- _ode._consumer.invokePartner(mex);
- }
+
}
Modified: ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java
URL: http://svn.apache.org/viewvc/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java (original)
+++ ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java Wed Sep 5 22:46:42 2007
@@ -18,40 +18,37 @@
*/
package org.apache.ode.jbi;
-import org.apache.ode.bpel.iapi.ContextException;
-import org.apache.ode.bpel.iapi.Message;
-import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
-import org.apache.ode.bpel.iapi.Scheduler;
-import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
-import org.apache.ode.jbi.msgmap.Mapper;
-import org.apache.ode.jbi.msgmap.MessageTranslationException;
-
import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import javax.jbi.messaging.*;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.Fault;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessageExchangeFactory;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.xml.namespace.QName;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.iapi.ContextException;
+import org.apache.ode.bpel.iapi.Message;
+import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
+import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
+import org.apache.ode.jbi.msgmap.Mapper;
+import org.apache.ode.jbi.msgmap.MessageTranslationException;
/**
* Bridge between ODE (consumers) and JBI (providers). An single object of this type handles all communications initiated by ODE
- * that is destined for other JBI providers.
+ * that is destined for other JBI providers.
*/
-abstract class OdeConsumer extends ServiceBridge implements JbiMessageExchangeProcessor {
+class OdeConsumer extends ServiceBridge implements JbiMessageExchangeProcessor {
private static final Log __log = LogFactory.getLog(OdeConsumer.class);
- private static final long DEFAULT_RESPONSE_TIMEOUT = Long.getLong("org.apache.ode.jbi.timeout", 2 * 60 * 1000L);
protected OdeContext _ode;
- protected long _responseTimeout = DEFAULT_RESPONSE_TIMEOUT;
-
-
- protected Map<String, PartnerRoleMessageExchange> _outstandingExchanges = new ConcurrentHashMap<String, PartnerRoleMessageExchange>();
-
OdeConsumer(OdeContext ode) {
_ode = ode;
}
@@ -100,35 +97,15 @@
NormalizedMessage nmsg = inonly.createMessage();
mapper.toNMS(nmsg, odeMex.getRequest(), odeMex.getOperation().getInput().getMessage(), null);
inonly.setInMessage(nmsg);
- _ode._scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
- public void afterCompletion(boolean success) {
- if (success) {
- doSendOneWay(odeMex, inonly);
- }
- }
- public void beforeCompletion() {
- }
-
- });
+ doSendJBI(odeMex, inonly);
odeMex.replyOneWayOk();
} else {
final InOut inout = (InOut) jbiMex;
NormalizedMessage nmsg = inout.createMessage();
mapper.toNMS(nmsg, odeMex.getRequest(), odeMex.getOperation().getInput().getMessage(), null);
inout.setInMessage(nmsg);
- _ode._scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
- public void afterCompletion(boolean success) {
- if (success) {
- doSendTwoWay(odeMex, inout);
- }
- }
-
- public void beforeCompletion() {
- }
-
- });
-
- odeMex.replyAsync();
+ doSendJBI(odeMex, inout);
+ odeMex.replyAsync(inout.getExchangeId());
}
} catch (MessagingException me) {
String errmsg = "JBI messaging error for ODE MEX " + odeMex;
@@ -142,83 +119,58 @@
}
- protected abstract void doSendOneWay(PartnerRoleMessageExchange odeMex, InOnly inonly);
-
- protected abstract void doSendTwoWay(PartnerRoleMessageExchange odeMex, InOut inout);
-
- protected abstract void inOutDone(InOut inout);
public void onJbiMessageExchange(MessageExchange jbiMex) throws MessagingException {
- if (!jbiMex.getPattern().equals(MessageExchangePattern.IN_ONLY) &&
- !jbiMex.getPattern().equals(MessageExchangePattern.IN_OUT)) {
+ if (!jbiMex.getPattern().equals(MessageExchangePattern.IN_ONLY)
+ && !jbiMex.getPattern().equals(MessageExchangePattern.IN_OUT)) {
__log.error("JBI MessageExchange " + jbiMex.getExchangeId() + " is of an unsupported pattern " + jbiMex.getPattern());
return;
}
if (jbiMex.getStatus() == ExchangeStatus.ACTIVE) {
if (jbiMex.getPattern().equals(MessageExchangePattern.IN_OUT)) {
- inOutDone((InOut) jbiMex);
outResponse((InOut) jbiMex);
}
jbiMex.setStatus(ExchangeStatus.DONE);
_ode.getChannel().send(jbiMex);
} else if (jbiMex.getStatus() == ExchangeStatus.ERROR) {
- inOutDone((InOut) jbiMex);
outFailure((InOut) jbiMex);
} else if (jbiMex.getStatus() == ExchangeStatus.DONE) {
- _outstandingExchanges.remove(jbiMex.getExchangeId());
+ ; // anything todo here?
} else {
__log.error("Unexpected status " + jbiMex.getStatus() + " for JBI message exchange: " + jbiMex.getExchangeId());
}
}
private void outFailure(final InOut jbiMex) {
- final PartnerRoleMessageExchange pmex = _outstandingExchanges.remove(jbiMex.getExchangeId());
+ PartnerRoleMessageExchange pmex = (PartnerRoleMessageExchange) _ode._server.getMessageExchangeByForeignKey(jbiMex.getExchangeId());
if (pmex == null) {
- __log.warn("Received a response for unknown JBI message exchange " + jbiMex.getExchangeId());
+ __log.warn("Received a response for unknown partner role message exchange " + pmex.getMessageExchangeId());
return;
}
-
- try {
- _ode._scheduler.execTransaction(new Callable<Boolean>() {
- public Boolean call() throws Exception {
- pmex.replyWithFailure(FailureType.OTHER, "Error: " + jbiMex.getError(), null);
- return null;
- }
- });
- } catch (Exception ex) {
- __log.error("error delivering failure: ", ex);
- }
-
+
+ pmex.replyWithFailure(FailureType.OTHER, "Error: " + jbiMex.getError(), null);
}
private void outResponse(final InOut jbiMex) {
- final PartnerRoleMessageExchange outstanding = _outstandingExchanges.remove(jbiMex.getExchangeId());
- if (outstanding == null) {
- __log.warn("Received a response for unknown JBI message exchange " + jbiMex.getExchangeId());
+
+ PartnerRoleMessageExchange pmex = (PartnerRoleMessageExchange) _ode._server.getMessageExchangeByForeignKey(jbiMex.getExchangeId());
+ if (pmex == null) {
+ __log.warn("Received a response for unknown partner role message exchange " + pmex.getMessageExchangeId());
return;
}
-
- try {
- _ode._scheduler.execTransaction(new Callable<Boolean>() {
- @SuppressWarnings("unchecked")
- public Boolean call() throws Exception {
- // need to reload mex since we're in a different transaction
- PartnerRoleMessageExchange pmex = (PartnerRoleMessageExchange) _ode._server.getEngine().getMessageExchange(outstanding.getMessageExchangeId());
- if (pmex == null) {
- __log.warn("Received a response for unknown partner role message exchange " + pmex.getMessageExchangeId());
- return Boolean.FALSE;
- }
- String mapperName = pmex.getProperty(Mapper.class.getName());
- Mapper mapper = mapperName == null ? _ode.getDefaultMapper() : _ode.getMapper(mapperName);
- if (mapper == null) {
- String errmsg = "Mapper not found.";
- __log.error(errmsg);
- pmex.replyWithFailure(FailureType.FORMAT_ERROR, errmsg, null);
- } else {
- try {
- Fault jbiFlt = jbiMex.getFault();
- if (jbiFlt != null) {
- javax.wsdl.Fault wsdlFlt = mapper.toFaultType(jbiFlt, (Collection<javax.wsdl.Fault>) pmex.getOperation().getFaults().values());
+
+ String mapperName = pmex.getProperty(Mapper.class.getName());
+ Mapper mapper = mapperName == null ? _ode.getDefaultMapper() : _ode.getMapper(mapperName);
+ if (mapper == null) {
+ String errmsg = "Mapper not found.";
+ __log.error(errmsg);
+ pmex.replyWithFailure(FailureType.FORMAT_ERROR, errmsg, null);
+ } else {
+ try {
+ Fault jbiFlt = jbiMex.getFault();
+ if (jbiFlt != null) {
+ javax.wsdl.Fault wsdlFlt = mapper.toFaultType(jbiFlt, (Collection<javax.wsdl.Fault>) pmex
+ .getOperation().getFaults().values());
if (wsdlFlt == null) {
pmex.replyWithFailure(FailureType.FORMAT_ERROR, "Unrecognized fault message.", null);
} else {
@@ -234,30 +186,25 @@
+ wsdlFlt.getName(), null);
}
}
- } else {
- Message response = pmex.createMessage(pmex.getOperation().getOutput().getMessage().getQName());
- mapper.toODE(response, jbiMex.getOutMessage(), pmex.getOperation().getOutput().getMessage());
- pmex.reply(response);
- }
- } catch (MessageTranslationException mte) {
- __log.error("Error translating message.", mte);
- pmex.replyWithFailure(FailureType.FORMAT_ERROR, mte.getMessage(), null);
- }
- }
- return null;
+ } else {
+ Message response = pmex.createMessage(pmex.getOperation().getOutput().getMessage().getQName());
+ mapper.toODE(response, jbiMex.getOutMessage(), pmex.getOperation().getOutput().getMessage());
+ pmex.reply(response);
}
- });
- } catch (Exception ex) {
- __log.error("error delivering RESPONSE: ", ex);
-
+ } catch (MessageTranslationException mte) {
+ __log.error("Error translating message.", mte);
+ pmex.replyWithFailure(FailureType.FORMAT_ERROR, mte.getMessage(), null);
+ }
}
}
- public void setResponseTimeout(long timeout) {
- _responseTimeout = timeout;
+ protected void doSendJBI(final PartnerRoleMessageExchange odeMex, final MessageExchange jbiMex) {
+ try {
+ _ode.getChannel().send(jbiMex);
+ } catch (MessagingException e) {
+ String errmsg = "Error sending request-only message to JBI for ODE mex " + odeMex;
+ __log.error(errmsg, e);
+ }
}
- public long getResponseTimeout() {
- return _responseTimeout;
- }
}
Modified: ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeContext.java
URL: http://svn.apache.org/viewvc/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeContext.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeContext.java (original)
+++ ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeContext.java Wed Sep 5 22:46:42 2007
@@ -40,12 +40,12 @@
import org.apache.ode.bpel.engine.BpelServerImpl;
import org.apache.ode.bpel.iapi.Endpoint;
import org.apache.ode.bpel.iapi.ProcessConf;
+import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.ode.bpel.o.OPartnerLink;
import org.apache.ode.bpel.o.OProcess;
import org.apache.ode.bpel.o.Serializer;
import org.apache.ode.jbi.msgmap.Mapper;
import org.apache.ode.jbi.util.WSDLFlattener;
-import org.apache.ode.scheduler.simple.SimpleScheduler;
import org.apache.ode.store.ProcessStoreImpl;
import org.w3c.dom.Document;
@@ -81,7 +81,7 @@
MessageExchangeContextImpl _mexContext;
- SimpleScheduler _scheduler;
+ Scheduler _scheduler;
ExecutorService _executorService;
@@ -95,6 +95,7 @@
/** Mapping of Endpoint to OdeService */
private Map<Endpoint, OdeService> _activeOdeServices = new ConcurrentHashMap<Endpoint, OdeService>();
+
/**
* Gets the delivery channel.