You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by je...@apache.org on 2010/05/21 17:41:07 UTC

svn commit: r947046 [11/12] - in /ode/trunk: ./ axis2/src/main/java/org/apache/ode/axis2/ bpel-dao/ bpel-dao/src/main/java/org/apache/ode/dao/ bpel-dao/src/main/java/org/apache/ode/dao/bpel/ bpel-dao/src/main/java/org/apache/ode/dao/store/ bpel-epr/ bp...

Added: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/BpelDAOConnectionImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/BpelDAOConnectionImpl.java?rev=947046&view=auto
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/BpelDAOConnectionImpl.java (added)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/BpelDAOConnectionImpl.java Fri May 21 15:40:59 2010
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.jpa.bpel;
+
+import java.io.Serializable;
+import java.sql.Timestamp;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+import javax.transaction.TransactionManager;
+import javax.xml.namespace.QName;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.common.BpelEventFilter;
+import org.apache.ode.bpel.common.Filter;
+import org.apache.ode.bpel.common.InstanceFilter;
+import org.apache.ode.bpel.common.ProcessState;
+import org.apache.ode.bpel.evt.BpelEvent;
+import org.apache.ode.bpel.evt.ScopeEvent;
+import org.apache.ode.dao.bpel.BpelDAOConnection;
+import org.apache.ode.dao.bpel.CorrelationSetDAO;
+import org.apache.ode.dao.bpel.MessageExchangeDAO;
+import org.apache.ode.dao.bpel.ProcessDAO;
+import org.apache.ode.dao.bpel.ProcessInstanceDAO;
+import org.apache.ode.dao.bpel.ProcessManagementDAO;
+import org.apache.ode.dao.bpel.ScopeDAO;
+import org.apache.ode.dao.jpa.JpaConnection;
+import org.apache.ode.dao.jpa.JpaOperator;
+import org.apache.ode.utils.ISO8601DateParser;
+
+/**
+ * @author Matthieu Riou <mriou at apache dot org>
+ */
+public class BpelDAOConnectionImpl extends JpaConnection implements BpelDAOConnection {
+    static final Log __log = LogFactory.getLog(BpelDAOConnectionImpl.class);
+    
+    static final ThreadLocal<BpelDAOConnectionImpl> _connections = new ThreadLocal<BpelDAOConnectionImpl>();
+
+    public BpelDAOConnectionImpl(EntityManager mgr, TransactionManager txMgr, JpaOperator operator) {
+        super(mgr, txMgr, operator);
+    }
+
+    public static ThreadLocal<BpelDAOConnectionImpl> getThreadLocal() {
+        return _connections;
+    }
+    
+    public List<BpelEvent> bpelEventQuery(InstanceFilter ifilter,
+                                          BpelEventFilter efilter) {
+        // TODO
+        throw new UnsupportedOperationException();
+    }
+
+    public List<Date> bpelEventTimelineQuery(InstanceFilter ifilter,
+                                             BpelEventFilter efilter) {
+        // TODO
+        throw new UnsupportedOperationException();
+    }
+
+    public ProcessInstanceDAO getInstance(Long iid) {
+    	_txCtx.begin();
+        ProcessInstanceDAOImpl instance = _em.find(ProcessInstanceDAOImpl.class, iid);
+        _txCtx.commit();
+        return instance;
+    }
+
+    public MessageExchangeDAO createMessageExchange(char dir) {
+    	_txCtx.begin();
+        MessageExchangeDAOImpl ret = new MessageExchangeDAOImpl(dir);
+        _em.persist(ret);
+        _txCtx.commit();
+        return ret;
+    }
+
+    public ProcessDAO createProcess(QName pid, QName type, String guid, long version) {
+    	_txCtx.begin();
+        ProcessDAOImpl ret = new ProcessDAOImpl(pid,type,guid,version);
+        _em.persist(ret);
+        _txCtx.commit();
+        return ret;
+    }
+    
+    public ProcessDAO createTransientProcess(Serializable id) {
+    	_txCtx.begin();
+        ProcessDAOImpl ret = new ProcessDAOImpl(null, null, null, 0);
+        ret.setId((Long)id);
+        _txCtx.commit();
+        return ret;
+    }
+
+    @SuppressWarnings("unchecked")
+    public ProcessDAO getProcess(QName processId) {
+    	_txCtx.begin();
+        List l = _em.createQuery("select x from ProcessDAOImpl x where x._processId = ?1")
+                .setParameter(1, processId.toString()).getResultList();
+        _txCtx.commit();
+        if (l.size() == 0) return null;
+        ProcessDAOImpl p = (ProcessDAOImpl) l.get(0);
+        return p;
+    }
+
+    public int getNumInstances(QName processId) {
+        ProcessDAO process = getProcess(processId);
+        if (process != null)
+            return process.getNumInstances();
+        else return -1;
+    }    
+
+    public ScopeDAO getScope(Long siidl) {
+    	_txCtx.begin();
+        ScopeDAO dao = _em.find(ScopeDAOImpl.class, siidl);
+        _txCtx.commit();
+        return dao;
+    }
+
+    public void insertBpelEvent(BpelEvent event, ProcessDAO process, ProcessInstanceDAO instance) {
+    	_txCtx.begin();
+        EventDAOImpl eventDao = new EventDAOImpl();
+        eventDao.setTstamp(new Timestamp(System.currentTimeMillis()));
+        eventDao.setType(BpelEvent.eventName(event));
+        String evtStr = event.toString();
+        eventDao.setDetail(evtStr.substring(0, Math.min(254, evtStr.length())));
+        if (process != null)
+            eventDao.setProcess((ProcessDAOImpl) process);
+        if (instance != null)
+            eventDao.setInstance((ProcessInstanceDAOImpl) instance);
+        if (event instanceof ScopeEvent)
+            eventDao.setScopeId(((ScopeEvent) event).getScopeId());
+        eventDao.setEvent(event);
+        _em.persist(eventDao);
+        _txCtx.commit();
+    }
+
+    private static String dateFilter(String filter) {
+        String date = Filter.getDateWithoutOp(filter);
+        String op = filter.substring(0,filter.indexOf(date));
+        Date dt = null;
+        try {
+            dt = ISO8601DateParser.parse(date);
+        } catch (ParseException e) {
+            e.printStackTrace();
+        }
+        Timestamp ts = new Timestamp(dt.getTime());
+        return op + " '" + ts.toString() + "'";
+    }
+
+    @SuppressWarnings("unchecked")
+    public Collection<ProcessInstanceDAO> instanceQuery(InstanceFilter criteria) {
+    	_txCtx.begin();
+        StringBuffer query = new StringBuffer();
+        query.append("select pi from ProcessInstanceDAOImpl as pi left join fetch pi._fault ");
+
+        if (criteria != null) {
+            // Building each clause
+            ArrayList<String> clauses = new ArrayList<String>();
+
+            // iid filter
+            if ( criteria.getIidFilter() != null ) {
+                StringBuffer filters = new StringBuffer();
+                List<String> iids = criteria.getIidFilter();
+                for (int m = 0; m < iids.size(); m++) {
+                    filters.append(" pi._instanceId = ").append(iids.get(m));
+                    if (m < iids.size() - 1) filters.append(" or");
+                }
+                clauses.add(" (" + filters + ")");
+            }
+
+            // pid filter
+            if (criteria.getPidFilter() != null) {
+                StringBuffer filters = new StringBuffer();
+                List<String> pids = criteria.getPidFilter();
+                for (int m = 0; m < pids.size(); m++) {
+                    filters.append(" pi._process._processId = '").append(pids.get(m)).append("'");
+                    if (m < pids.size() - 1) filters.append(" or");
+                }
+                clauses.add(" (" + filters + ")");
+            }
+
+            // name filter
+            if (criteria.getNameFilter() != null) {
+                String val = criteria.getNameFilter();
+                if (val.endsWith("*")) {
+                    val = val.substring(0, val.length()-1) + "%";
+                }
+                //process type string begins with name space
+                //this could possibly match more than you want
+                //because the name space and name are stored together 
+                clauses.add(" pi._process._processType like '%" + val + "'");
+            }
+
+            // name space filter
+            if (criteria.getNamespaceFilter() != null) {
+                //process type string begins with name space
+                //this could possibly match more than you want
+                //because the name space and name are stored together
+                clauses.add(" pi._process._processType like '{" +
+                        criteria.getNamespaceFilter() + "%'");
+            }
+
+            // started filter
+            if (criteria.getStartedDateFilter() != null) {
+                for ( String ds : criteria.getStartedDateFilter() ) {
+                    clauses.add(" pi._dateCreated " + dateFilter(ds));
+                }
+            }
+
+            // last-active filter
+            if (criteria.getLastActiveDateFilter() != null) {
+                for ( String ds : criteria.getLastActiveDateFilter() ) {
+                    clauses.add(" pi._lastActive " + dateFilter(ds));
+                }
+            }
+
+            // status filter
+            if (criteria.getStatusFilter() != null) {
+                StringBuffer filters = new StringBuffer();
+                List<Short> states = criteria.convertFilterState();
+                for (int m = 0; m < states.size(); m++) {
+                    filters.append(" pi._state = ").append(states.get(m));
+                    if (m < states.size() - 1) filters.append(" or");
+                }
+                clauses.add(" (" + filters.toString() + ")");
+            }
+
+            // $property filter
+            if (criteria.getPropertyValuesFilter() != null) {
+                Map<String,String> props = criteria.getPropertyValuesFilter();
+                // join to correlation sets
+                query.append(" inner join pi._rootScope._correlationSets as cs");
+                int i = 0;
+                for (String propKey : props.keySet()) {
+                    i++;
+                    // join to props for each prop
+                    query.append(" inner join cs._props as csp"+i);
+                    // add clause for prop key and value
+                    clauses.add(" csp"+i+".propertyKey = '"+propKey+
+                            "' and csp"+i+".propertyValue = '"+
+                            // spaces have to be escaped, might be better handled in InstanceFilter
+                            props.get(propKey).replaceAll("&#32;", " ")+"'");
+                }
+            }
+
+            // order by
+            StringBuffer orderby = new StringBuffer("");
+            if (criteria.getOrders() != null) {
+                orderby.append(" order by");
+                List<String> orders = criteria.getOrders();
+                for (int m = 0; m < orders.size(); m++) {
+                    String field = orders.get(m);
+                    String ord = " asc";
+                    if (field.startsWith("-")) {
+                        ord = " desc";
+                    }
+                    String fieldName = " pi._instanceId";
+                    if ( field.endsWith("name") || field.endsWith("namespace")) {
+                        fieldName = " pi._process._processType";
+                    }
+                    if ( field.endsWith("version")) {
+                        fieldName = " pi._process._version";
+                    }
+                    if ( field.endsWith("status")) {
+                        fieldName = " pi._state";
+                    }
+                    if ( field.endsWith("started")) {
+                        fieldName = " pi._dateCreated";
+                    }
+                    if ( field.endsWith("last-active")) {
+                        fieldName = " pi._lastActive";
+                    }
+                    orderby.append(fieldName + ord);
+                    if (m < orders.size() - 1) orderby.append(", ");
+                }
+
+            }
+
+            // Preparing the statement
+            if (clauses.size() > 0) {
+                query.append(" where");
+                for (int m = 0; m < clauses.size(); m++) {
+                    query.append(clauses.get(m));
+                    if (m < clauses.size() - 1) query.append(" and");
+                }
+            }
+
+            query.append(orderby);
+        }
+
+        if (__log.isDebugEnabled()) {
+            __log.debug(query.toString());
+        }
+
+        // criteria limit
+        Query pq = _em.createQuery(query.toString());
+        getJPADaoOperator().setBatchSize(pq, criteria.getLimit());
+        List<ProcessInstanceDAO> ql = pq.getResultList();
+
+        Collection<ProcessInstanceDAO> list = new ArrayList<ProcessInstanceDAO>();
+        int num = 0;
+        for (Iterator iterator = ql.iterator(); iterator.hasNext();) {
+            if(num++ > criteria.getLimit()) break;
+            ProcessInstanceDAO processInstanceDAO = (ProcessInstanceDAO) iterator.next();
+            list.add(processInstanceDAO);
+        }
+        _txCtx.commit();
+        return list;
+    }
+
+
+    public Collection<ProcessInstanceDAO> instanceQuery(String expression) {
+        return instanceQuery(new InstanceFilter(expression));
+    }
+
+    public MessageExchangeDAO getMessageExchange(String mexid) {
+    	_txCtx.begin();
+        MessageExchangeDAO dao = _em.find(MessageExchangeDAOImpl.class, mexid);
+        _txCtx.commit();
+        return dao;
+    }
+
+    public void deleteMessageExchange(MessageExchangeDAO mexDao) {
+    	_txCtx.begin();
+        _em.remove(mexDao);
+        _txCtx.commit();
+    }
+
+    public EntityManager getEntityManager() {
+        return _em;
+    }
+
+    @SuppressWarnings("unchecked")
+    public Map<Long, Collection<CorrelationSetDAO>> getCorrelationSets(Collection<ProcessInstanceDAO> instances) {
+        if (instances.size() == 0) {
+            return new HashMap<Long, Collection<CorrelationSetDAO>>();
+        }
+        _txCtx.begin();
+        ArrayList<Long> iids = new ArrayList<Long>(instances.size());
+        for (ProcessInstanceDAO dao: instances) {
+            iids.add(dao.getInstanceId());
+        }
+        Collection<CorrelationSetDAOImpl> csets = _em.createNamedQuery(CorrelationSetDAOImpl.SELECT_CORRELATION_SETS_BY_INSTANCES).setParameter("instances", iids).getResultList();
+        Map<Long, Collection<CorrelationSetDAO>> map = new HashMap<Long, Collection<CorrelationSetDAO>>();
+        for (CorrelationSetDAOImpl cset: csets) {
+            Long id = cset.getScope().getProcessInstance().getInstanceId();
+            Collection<CorrelationSetDAO> existing = map.get(id);
+            if (existing == null) {
+                existing = new ArrayList<CorrelationSetDAO>();
+                map.put(id, existing);
+            }
+            existing.add(cset);
+        }
+        _txCtx.commit();
+        return map;
+    }
+
+    @SuppressWarnings("unchecked")
+    public Collection<CorrelationSetDAO> getActiveCorrelationSets() {
+    	_txCtx.begin();
+    	Collection<CorrelationSetDAO> dao = _em.createNamedQuery(CorrelationSetDAOImpl.SELECT_ACTIVE_SETS).setParameter("state", ProcessState.STATE_ACTIVE).getResultList();
+    	_txCtx.commit();
+    	return dao;
+    }
+
+    public ProcessManagementDAO getProcessManagement() {
+        return new ProcessManagementDAOImpl(_em);
+    }
+}

Added: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/CorrSetProperty.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/CorrSetProperty.java?rev=947046&view=auto
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/CorrSetProperty.java (added)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/CorrSetProperty.java Fri May 21 15:40:59 2010
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.dao.jpa.bpel;
+
+import javax.persistence.Basic;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.JoinColumn;
+import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+
+/**
+ * @author Matthieu Riou <mriou at apache dot org>
+ */
+@Entity
+@Table(name="ODE_CORSET_PROP")
+@NamedQueries({
+	@NamedQuery(name=CorrSetProperty.DELETE_CORSET_PROPERTIES_BY_PROPERTY_IDS, query="delete from CorrSetProperty as p where p.corrSetId in(:corrSetIds)")
+})
+public class CorrSetProperty {
+	public final static String DELETE_CORSET_PROPERTIES_BY_PROPERTY_IDS = "DELETE_CORSET_PROPERTIES_BY_PROPERTY_IDS";
+	
+    @Id @Column(name="ID")
+    @GeneratedValue(strategy=GenerationType.AUTO)
+    @SuppressWarnings("unused")
+    private Long _id;
+    @Basic @Column(name="PROP_KEY")
+    private String propertyKey;
+    @Basic @Column(name="PROP_VALUE")
+    private String propertyValue;
+
+    @SuppressWarnings("unused")
+    @Basic @Column(name="CORRSET_ID", insertable=false, updatable=false, nullable=true)
+    private Long corrSetId;
+    
+    @ManyToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) @JoinColumn(name="CORRSET_ID")
+    private CorrelationSetDAOImpl _corrSet;
+
+    public CorrSetProperty() {
+    }
+    public CorrSetProperty(String propertyKey, String propertyValue) {
+        this.propertyKey = propertyKey;
+        this.propertyValue = propertyValue;
+    }
+
+    public String getPropertyKey() {
+        return propertyKey;
+    }
+
+    public void setPropertyKey(String propertyKey) {
+        this.propertyKey = propertyKey;
+    }
+
+    public String getPropertyValue() {
+        return propertyValue;
+    }
+
+    public void setPropertyValue(String propertyValue) {
+        this.propertyValue = propertyValue;
+    }
+
+    public CorrelationSetDAOImpl getCorrSet() {
+        return _corrSet;
+    }
+
+    public void setCorrSet(CorrelationSetDAOImpl corrSet) {
+        _corrSet = corrSet;
+    }
+}

Added: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/CorrelationSetDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/CorrelationSetDAOImpl.java?rev=947046&view=auto
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/CorrelationSetDAOImpl.java (added)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/CorrelationSetDAOImpl.java Fri May 21 15:40:59 2010
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.jpa.bpel;
+
+import org.apache.ode.bpel.common.CorrelationKey;
+import org.apache.ode.dao.bpel.CorrelationSetDAO;
+import org.apache.ode.dao.bpel.ProcessDAO;
+import org.apache.ode.dao.bpel.ProcessInstanceDAO;
+import org.apache.ode.dao.bpel.ScopeDAO;
+
+import javax.persistence.Basic;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.JoinColumn;
+import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.OneToMany;
+import javax.persistence.Table;
+import javax.xml.namespace.QName;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+@Entity
+@Table(name="ODE_CORRELATION_SET")
+@NamedQueries({
+    @NamedQuery(name=CorrelationSetDAOImpl.DELETE_CORRELATION_SETS_BY_IDS, query="delete from CorrelationSetDAOImpl as c where c._correlationSetId in (:ids)"),
+    @NamedQuery(name=CorrelationSetDAOImpl.SELECT_CORRELATION_SETS_BY_INSTANCES, query="select c from CorrelationSetDAOImpl as c left join fetch c._scope left join fetch c._props where c._scope._processInstance._instanceId in (:instances)"),
+    @NamedQuery(name=CorrelationSetDAOImpl.SELECT_CORRELATION_SET_IDS_BY_PROCESS, query="select c._correlationSetId from CorrelationSetDAOImpl as c where c._scope._processInstance._process = :process"),
+    @NamedQuery(name=CorrelationSetDAOImpl.SELECT_CORRELATION_SET_IDS_BY_INSTANCE, query="select c._correlationSetId from CorrelationSetDAOImpl as c where c._scope._processInstance = :instance"),
+    @NamedQuery(name=CorrelationSetDAOImpl.SELECT_ACTIVE_SETS, query="select c from CorrelationSetDAOImpl as c left join fetch c._scope where c._scope._processInstance._state = (:state)")
+})
+public class CorrelationSetDAOImpl implements CorrelationSetDAO {
+	public final static String DELETE_CORRELATION_SETS_BY_IDS = "DELETE_CORRELATION_SETS_BY_IDS";
+    public final static String SELECT_CORRELATION_SETS_BY_INSTANCES = "SELECT_CORRELATION_SETS_BY_INSTANCES";
+    public final static String SELECT_CORRELATION_SET_IDS_BY_PROCESS = "SELECT_CORRELATION_SET_IDS_BY_PROCESS";
+    public final static String SELECT_CORRELATION_SET_IDS_BY_INSTANCE = "SELECT_CORRELATION_SET_IDS_BY_INSTANCE";
+    public final static String SELECT_ACTIVE_SETS = "SELECT_ACTIVE_SETS";
+	
+	@Id @Column(name="CORRELATION_SET_ID") 
+	@GeneratedValue(strategy=GenerationType.AUTO)
+	private Long _correlationSetId;
+	@Basic @Column(name="NAME")
+    private String _name;
+	@Basic @Column(name="CORRELATION_KEY")
+    private String _correlationKey;
+
+    @OneToMany(targetEntity=CorrSetProperty.class,mappedBy="_corrSet",fetch=FetchType.LAZY,cascade={CascadeType.MERGE, CascadeType.PERSIST, CascadeType.REFRESH})
+    private Collection<CorrSetProperty> _props = new ArrayList<CorrSetProperty>();
+    @ManyToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) @JoinColumn(name="SCOPE_ID")
+    private ScopeDAOImpl _scope;
+
+    public CorrelationSetDAOImpl() {}
+	public CorrelationSetDAOImpl(ScopeDAOImpl scope, String name) {
+		_name = name;
+		_scope = scope;
+	}
+	
+	public Long getCorrelationSetId() {
+		return _correlationSetId;
+	}
+
+	public String getName() {
+		return _name;
+	}
+
+	public Map<QName, String> getProperties() {
+        HashMap<QName, String> map = new HashMap<QName, String>();
+        for (CorrSetProperty prop : _props) {
+            map.put(QName.valueOf(prop.getPropertyKey()), prop.getPropertyValue());
+        }
+        return map;
+	}
+
+	public ScopeDAO getScope() {
+		return _scope;
+	}
+
+	public CorrelationKey getValue() {
+        if (_correlationKey == null) return null;
+        return new CorrelationKey(_correlationKey);
+	}
+
+	public void setValue(QName[] names, CorrelationKey values) {
+		_correlationKey = values.toCanonicalString();
+        if (names != null)
+            for (int m = 0; m < names.length; m++) {
+                CorrSetProperty prop = new CorrSetProperty(names[m].toString(), values.getValues()[m]);
+                _props.add(prop);
+                prop.setCorrSet(this);
+            }
+	}
+
+    public ProcessDAO getProcess() {
+        return _scope.getProcessInstance().getProcess();
+    }
+    public ProcessInstanceDAO getInstance() {
+        return _scope.getProcessInstance();
+    }
+}

Added: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/CorrelatorDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/CorrelatorDAOImpl.java?rev=947046&view=auto
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/CorrelatorDAOImpl.java (added)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/CorrelatorDAOImpl.java Fri May 21 15:40:59 2010
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.jpa.bpel;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.persistence.Basic;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.JoinColumn;
+import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.OneToMany;
+import javax.persistence.Query;
+import javax.persistence.Table;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.common.CorrelationKeySet;
+import org.apache.ode.dao.bpel.CorrelatorDAO;
+import org.apache.ode.dao.bpel.CorrelatorMessageDAO;
+import org.apache.ode.dao.bpel.MessageExchangeDAO;
+import org.apache.ode.dao.bpel.MessageRouteDAO;
+import org.apache.ode.dao.bpel.ProcessInstanceDAO;
+
+@Entity
+@Table(name = "ODE_CORRELATOR")
+@NamedQueries( { @NamedQuery(name = CorrelatorDAOImpl.DELETE_CORRELATORS_BY_PROCESS, query = "delete from CorrelatorDAOImpl as c where c._process = :process") })
+public class CorrelatorDAOImpl extends BpelDAO implements CorrelatorDAO {
+    private static Log __log = LogFactory.getLog(CorrelatorDAOImpl.class);
+    public final static String DELETE_CORRELATORS_BY_PROCESS = "DELETE_CORRELATORS_BY_PROCESS";
+    private final static String ROUTE_BY_CKEY_HEADER = "select route from MessageRouteDAOImpl as route where route._correlator._process._processType = :ptype and route._correlator._correlatorKey = :corrkey";
+
+    @Id
+    @Column(name = "CORRELATOR_ID")
+    @GeneratedValue(strategy = GenerationType.AUTO)
+    @SuppressWarnings("unused")
+    private Long _correlatorId;
+    @Basic
+    @Column(name = "CORRELATOR_KEY")
+    private String _correlatorKey;
+    @OneToMany(targetEntity = MessageRouteDAOImpl.class, mappedBy = "_correlator", fetch = FetchType.EAGER, cascade = { CascadeType.MERGE, CascadeType.PERSIST, CascadeType.REFRESH })
+    private Collection<MessageRouteDAOImpl> _routes = new ArrayList<MessageRouteDAOImpl>();
+    @OneToMany(targetEntity = MessageExchangeDAOImpl.class, mappedBy = "_correlator", fetch = FetchType.LAZY, cascade = { CascadeType.MERGE, CascadeType.PERSIST, CascadeType.REFRESH })
+    private Collection<MessageExchangeDAOImpl> _exchanges = new ArrayList<MessageExchangeDAOImpl>();
+    @ManyToOne(fetch = FetchType.LAZY, cascade = { CascadeType.PERSIST })
+    @JoinColumn(name = "PROC_ID")
+    private ProcessDAOImpl _process;
+
+    public CorrelatorDAOImpl() {
+    }
+
+    public CorrelatorDAOImpl(String correlatorKey, ProcessDAOImpl process) {
+        _correlatorKey = correlatorKey;
+        _process = process;
+    }
+
+    public void addRoute(String routeGroupId, ProcessInstanceDAO target, int index, CorrelationKeySet correlationKeySet, String routePolicy) {
+        if (__log.isDebugEnabled()) {
+            __log.debug("addRoute " + routeGroupId + " " + target + " " + index + " " + correlationKeySet + " " + routePolicy);
+        }
+        MessageRouteDAOImpl mr = new MessageRouteDAOImpl(correlationKeySet, routeGroupId, index, (ProcessInstanceDAOImpl) target, this, routePolicy);
+        _routes.add(mr);
+        getEM().flush();
+    }
+
+    public MessageExchangeDAO dequeueMessage(CorrelationKeySet correlationKeySet) {
+        // TODO: this thing does not seem to be scalable: loading up based on a correlator???
+        for (Iterator<MessageExchangeDAOImpl> itr = _exchanges.iterator(); itr.hasNext();) {
+            MessageExchangeDAOImpl mex = itr.next();
+            if (mex.getCorrelationKeySet().isRoutableTo(correlationKeySet, false)) {
+                itr.remove();
+                return mex;
+            }
+        }
+        return null;
+    }
+
+    public void enqueueMessage(MessageExchangeDAO mex, CorrelationKeySet correlationKeySet) {
+        MessageExchangeDAOImpl mexImpl = (MessageExchangeDAOImpl) mex;
+        mexImpl.setCorrelationKeySet(correlationKeySet);
+        _exchanges.add(mexImpl);
+        mexImpl.setCorrelator(this);
+    }
+
+    public Collection<CorrelatorMessageDAO> getAllMessages() {
+        return new ArrayList<CorrelatorMessageDAO>(_exchanges);
+    }
+
+    @SuppressWarnings("unchecked")
+    public List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet) {
+        if (__log.isDebugEnabled()) {
+            __log.debug("findRoute " + correlationKeySet);
+        }
+        List<CorrelationKeySet> subSets = correlationKeySet.findSubSets();
+        Query qry = getEM().createQuery(generateSelectorQuery(ROUTE_BY_CKEY_HEADER, subSets));
+        qry.setParameter("ptype", _process.getType().toString());
+        qry.setParameter("corrkey", _correlatorKey);
+        for (int i = 0; i < subSets.size(); i++) {
+            qry.setParameter("s" + i, subSets.get(i).toCanonicalString());
+        }
+
+        List<MessageRouteDAO> candidateRoutes = (List<MessageRouteDAO>) qry.getResultList();
+        if (candidateRoutes.size() > 0) {
+            List<MessageRouteDAO> matchingRoutes = new ArrayList<MessageRouteDAO>();
+            boolean routed = false;
+            for (int i = 0; i < candidateRoutes.size(); i++) {
+                MessageRouteDAO route = candidateRoutes.get(i);
+                if ("all".equals(route.getRoute())) {
+                    matchingRoutes.add(route);
+                } else {
+                    if (!routed) {
+                        matchingRoutes.add(route);
+                    }
+                    routed = true;
+                }
+            }
+            if (__log.isDebugEnabled()) {
+                __log.debug("findRoute found " + matchingRoutes);
+            }
+            return matchingRoutes;
+        } else {
+            if (__log.isDebugEnabled()) {
+                __log.debug("findRoute found nothing");
+            }
+            return null;
+        }
+    }
+
+    private String generateSelectorQuery(String header, List<CorrelationKeySet> subSets) {
+        StringBuffer filterQuery = new StringBuffer(header);
+
+        if (subSets.size() == 1) {
+            filterQuery.append(" and route._correlationKey = :s0");
+        } else if (subSets.size() > 1) {
+            filterQuery.append(" and route._correlationKey in(");
+            for (int i = 0; i < subSets.size(); i++) {
+                if (i > 0) {
+                    filterQuery.append(", ");
+                }
+                filterQuery.append(":s").append(i);
+            }
+            filterQuery.append(")");
+        }
+
+        return filterQuery.toString();
+    }
+
+    public String getCorrelatorId() {
+        return _correlatorKey;
+    }
+
+    public void setCorrelatorId(String newId) {
+        _correlatorKey = newId;
+    }
+
+    public void removeRoutes(String routeGroupId, ProcessInstanceDAO target) {
+        // remove route across all correlators of the process
+        ((ProcessInstanceDAOImpl) target).removeRoutes(routeGroupId);
+    }
+
+    void removeLocalRoutes(String routeGroupId, ProcessInstanceDAO target) {
+        if (__log.isDebugEnabled()) {
+            __log.debug("removeLocalRoutes " + routeGroupId);
+        }
+        boolean flush = false;
+        for (Iterator<MessageRouteDAOImpl> itr = _routes.iterator(); itr.hasNext();) {
+            MessageRouteDAOImpl mr = itr.next();
+            if (mr.getGroupId().equals(routeGroupId) && mr.getTargetInstance().equals(target)) {
+                if (__log.isDebugEnabled()) {
+                    __log.debug("removing " + mr.getCorrelationKey() + " " + mr.getIndex() + " " + mr.getRoute());
+                }
+                itr.remove();
+                getEM().remove(mr);
+                flush = true;
+            }
+        }
+        if (flush) {
+            getEM().flush();
+        }
+    }
+
+    public Collection<MessageRouteDAO> getAllRoutes() {
+        return new ArrayList<MessageRouteDAO>(_routes);
+    }
+
+    public boolean checkRoute(CorrelationKeySet correlationKeySet) {
+        // TODO Auto-generated method stub
+        return true;
+    }
+}

Added: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/EventDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/EventDAOImpl.java?rev=947046&view=auto
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/EventDAOImpl.java (added)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/EventDAOImpl.java Fri May 21 15:40:59 2010
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.jpa.bpel;
+
+import org.apache.ode.bpel.evt.BpelEvent;
+
+import javax.persistence.Basic;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.JoinColumn;
+import javax.persistence.Lob;
+import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import java.sql.Timestamp;
+
+/**
+ * @author Matthieu Riou <mriou at apache dot org>
+ */
+@Entity
+@Table(name="ODE_EVENT")
+@NamedQueries({
+    @NamedQuery(name=EventDAOImpl.SELECT_EVENT_IDS_BY_PROCESS, query="select e._id from EventDAOImpl as e where e._instance._process = :process"),
+    @NamedQuery(name=EventDAOImpl.DELETE_EVENTS_BY_IDS, query="delete from EventDAOImpl as e where e._id in (:ids)"),
+    @NamedQuery(name=EventDAOImpl.DELETE_EVENTS_BY_INSTANCE, query="delete from EventDAOImpl as e where e._instance = :instance")
+})
+public class EventDAOImpl extends BpelDAO {
+	public final static String SELECT_EVENT_IDS_BY_PROCESS = "SELECT_EVENT_IDS_BY_PROCESS";
+	public final static String DELETE_EVENTS_BY_IDS = "DELETE_EVENTS_BY_IDS";
+	public final static String DELETE_EVENTS_BY_INSTANCE = "DELETE_EVENTS_BY_INSTANCE";
+	
+    @Id @Column(name="EVENT_ID")
+	@GeneratedValue(strategy= GenerationType.AUTO)
+	private Long _id;
+    @Basic @Column(name="TSTAMP")
+    private Timestamp _tstamp;
+    @Basic @Column(name="TYPE")
+    private String _type;
+    @Basic @Column(name="DETAIL")
+    private String _detail;
+
+    /** Scope identifier, possibly null. */
+    @Basic @Column(name="SCOPE_ID")
+    private Long _scopeId;
+
+    @ManyToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) @JoinColumn(name="PROCESS_ID")
+    private ProcessDAOImpl _process;
+    @ManyToOne(fetch= FetchType.LAZY,cascade={CascadeType.PERSIST})	@JoinColumn(name="INSTANCE_ID")
+    private ProcessInstanceDAOImpl _instance;
+    @Lob  @Column(name="DATA")
+    private BpelEvent _event;
+
+    public BpelEvent getEvent() {
+        return _event;
+    }
+
+    public void setEvent(BpelEvent event) {
+        _event = event;
+    }
+
+    public String getDetail() {
+        return _detail;
+    }
+
+    public void setDetail(String detail) {
+        _detail = detail;
+    }
+
+    public Long getId() {
+        return _id;
+    }
+
+    public void setId(Long id) {
+        _id = id;
+    }
+
+    public ProcessInstanceDAOImpl getInstance() {
+        return _instance;
+    }
+
+    public void setInstance(ProcessInstanceDAOImpl instance) {
+        _instance = instance;
+    }
+
+    public ProcessDAOImpl getProcess() {
+        return _process;
+    }
+
+    public void setProcess(ProcessDAOImpl process) {
+        _process = process;
+    }
+
+    public Timestamp getTstamp() {
+        return _tstamp;
+    }
+
+    public void setTstamp(Timestamp tstamp) {
+        _tstamp = tstamp;
+    }
+
+    public String getType() {
+        return _type;
+    }
+
+    public void setType(String type) {
+        _type = type;
+    }
+
+    public Long getScopeId() {
+        return _scopeId;
+    }
+
+    public void setScopeId(Long scopeId) {
+        _scopeId = scopeId;
+    }
+}

Added: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/FaultDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/FaultDAOImpl.java?rev=947046&view=auto
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/FaultDAOImpl.java (added)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/FaultDAOImpl.java Fri May 21 15:40:59 2010
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.jpa.bpel;
+
+import org.apache.ode.dao.bpel.FaultDAO;
+import org.apache.ode.utils.DOMUtils;
+import org.w3c.dom.Element;
+
+import javax.persistence.Basic;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Lob;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.xml.namespace.QName;
+
+
+@Entity
+@Table(name="ODE_FAULT")
+@NamedQueries({
+	@NamedQuery(name=FaultDAOImpl.DELETE_FAULTS_BY_IDS, query="delete from FaultDAOImpl as f where f._id in(:ids)")
+})
+public class FaultDAOImpl implements FaultDAO {
+	public final static String DELETE_FAULTS_BY_IDS = "DELETE_FAULTS_BY_IDS";
+	
+	@Id @Column(name="FAULT_ID") 
+	@GeneratedValue(strategy=GenerationType.AUTO)
+	@SuppressWarnings("unused")
+	private Long _id;
+	@Basic @Column(name="NAME")
+    private String _name;
+	@Basic @Column(name="MESSAGE", length=4000)
+    private String _explanation;
+	@Lob @Column(name="DATA")
+    private String _data;
+	@Basic @Column(name="LINE_NUMBER")
+    private int _lineNo;
+	@Basic @Column(name="ACTIVITY_ID")
+    private int _activityId;
+
+	public FaultDAOImpl() {}
+	public FaultDAOImpl(QName faultName, String explanation, int faultLineNo,
+			int activityId, Element faultMessage) {
+		_name = faultName.toString();
+		_explanation = explanation;
+		_lineNo = faultLineNo;
+		_activityId = activityId;
+		_data = (faultMessage == null)?null:DOMUtils.domToString(faultMessage);
+	}
+	
+	public int getActivityId() {
+		return _activityId;
+	}
+
+	public Element getData() {
+		Element ret = null;
+		
+		try {
+			ret = (_data == null)?null:DOMUtils.stringToDOM(_data);
+		} catch (Exception e) {
+			throw new RuntimeException(e);
+		}
+		
+		return ret;
+	}
+
+	public String getExplanation() {
+		return _explanation;
+	}
+
+	public int getLineNo() {
+		return _lineNo;
+	}
+
+	public QName getName() {
+		return _name == null ? null : QName.valueOf(_name);
+	}
+
+}

Added: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/JpaTxMgrProvider.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/JpaTxMgrProvider.java?rev=947046&view=auto
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/JpaTxMgrProvider.java (added)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/JpaTxMgrProvider.java Fri May 21 15:40:59 2010
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.jpa.bpel;
+
+import javax.transaction.NotSupportedException;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+
+import org.apache.openjpa.ee.ManagedRuntime;
+import org.apache.openjpa.util.GeneralException;
+
+public class JpaTxMgrProvider implements ManagedRuntime {
+	private TransactionManager _txMgr;
+	
+    public JpaTxMgrProvider(TransactionManager txMgr) {
+    	_txMgr = txMgr;
+    }
+    
+    public TransactionManager getTransactionManager() throws Exception {
+        return _txMgr;
+    }
+    
+    public void setRollbackOnly(Throwable cause) throws Exception {
+        // there is no generic support for setting the rollback cause
+        getTransactionManager().getTransaction().setRollbackOnly();
+    }
+    
+    public Throwable getRollbackCause() throws Exception {
+        // there is no generic support for setting the rollback cause
+        return null;
+    }
+    
+    public Object getTransactionKey() throws Exception, SystemException {
+        return _txMgr.getTransaction();
+    }
+    
+    public void doNonTransactionalWork(java.lang.Runnable runnable) throws NotSupportedException {
+        TransactionManager tm = null;
+        Transaction transaction = null;
+        
+        try { 
+            tm = getTransactionManager(); 
+            transaction = tm.suspend();
+        } catch (Exception e) {
+            NotSupportedException nse =
+                new NotSupportedException(e.getMessage());
+            nse.initCause(e);
+            throw nse;
+        }
+        
+        runnable.run();
+        
+        try {
+            tm.resume(transaction);
+        } catch (Exception e) {
+            try {
+                transaction.setRollbackOnly();
+            }
+            catch(SystemException se2) {
+                throw new GeneralException(se2);
+            }
+            NotSupportedException nse =
+                new NotSupportedException(e.getMessage());
+            nse.initCause(e);
+            throw nse;
+        } 
+    }
+}
\ No newline at end of file

Added: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MessageDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MessageDAOImpl.java?rev=947046&view=auto
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MessageDAOImpl.java (added)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MessageDAOImpl.java Fri May 21 15:40:59 2010
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.jpa.bpel;
+
+import javax.persistence.Basic;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.JoinColumn;
+import javax.persistence.Lob;
+import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.xml.namespace.QName;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.dao.bpel.MessageDAO;
+import org.apache.ode.dao.bpel.MessageExchangeDAO;
+import org.apache.ode.utils.DOMUtils;
+import org.w3c.dom.Element;
+
+@Entity
+@Table(name = "ODE_MESSAGE")
+@NamedQueries( { @NamedQuery(name = MessageDAOImpl.DELETE_MESSAGES_BY_PROCESS, query = "delete from MessageDAOImpl as m where m._messageExchange._process = :process") })
+public class MessageDAOImpl implements MessageDAO {
+    private static Log __log = LogFactory.getLog(MessageDAOImpl.class);
+    public final static String DELETE_MESSAGES_BY_PROCESS = "DELETE_MESSAGES_BY_PROCESS";
+
+    @Id
+    @Column(name = "MESSAGE_ID")
+    @GeneratedValue(strategy = GenerationType.AUTO)
+    @SuppressWarnings("unused")
+    private Long _id;
+    @Basic
+    @Column(name = "TYPE")
+    private String _type;
+    @Lob
+    @Column(name = "DATA")
+    private String _data;
+    @Lob
+    @Column(name = "HEADER")
+    private String _header;
+    @ManyToOne(fetch = FetchType.LAZY, cascade = { CascadeType.ALL })
+    @JoinColumn(name = "MESSAGE_EXCHANGE_ID")
+    private MessageExchangeDAOImpl _messageExchange;
+
+    public MessageDAOImpl() {
+    }
+
+    public MessageDAOImpl(QName type, MessageExchangeDAOImpl me) {
+        _type = type.toString();
+        _messageExchange = me;
+    }
+
+    public Element getData() {
+        if (__log.isDebugEnabled()) {
+            __log.debug("getData " + _id + " " + _data);
+        }
+        try {
+            return _data == null ? null : DOMUtils.stringToDOM(_data);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void setData(Element value) {
+        if (value == null) {
+            if (__log.isDebugEnabled()) {
+                __log.debug("setData " + _id + " null");
+            }
+            return;
+        }
+        _data = DOMUtils.domToString(value);
+        
+        if (__log.isDebugEnabled()) {
+            __log.debug("setData " + _id + " " + _data);
+        }
+    }
+
+    public Element getHeader() {
+        try {
+            return _header == null ? null : DOMUtils.stringToDOM(_header);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void setHeader(Element value) {
+        if (value == null)
+            return;
+        _header = DOMUtils.domToString(value);
+    }
+
+    public MessageExchangeDAO getMessageExchange() {
+        return _messageExchange;
+    }
+
+    public QName getType() {
+        return _type == null ? null : QName.valueOf(_type);
+    }
+
+    public void setType(QName type) {
+        _type = type.toString();
+    }
+
+}

Added: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MessageExchangeDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MessageExchangeDAOImpl.java?rev=947046&view=auto
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MessageExchangeDAOImpl.java (added)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MessageExchangeDAOImpl.java Fri May 21 15:40:59 2010
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.jpa.bpel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.common.CorrelationKeySet;
+import org.apache.ode.bpel.common.CorrelationKey;
+import org.apache.ode.dao.bpel.CorrelatorMessageDAO;
+import org.apache.ode.dao.bpel.MessageDAO;
+import org.apache.ode.dao.bpel.MessageExchangeDAO;
+import org.apache.ode.dao.bpel.PartnerLinkDAO;
+import org.apache.ode.dao.bpel.ProcessDAO;
+import org.apache.ode.dao.bpel.ProcessInstanceDAO;
+import org.apache.ode.utils.DOMUtils;
+import org.apache.ode.utils.uuid.UUID;
+import org.w3c.dom.Element;
+
+import javax.persistence.Basic;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.Id;
+import javax.persistence.JoinColumn;
+import javax.persistence.Lob;
+import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.OneToMany;
+import javax.persistence.OneToOne;
+import javax.persistence.Table;
+import javax.persistence.Transient;
+import javax.xml.namespace.QName;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
+
+@Entity
+@Table(name="ODE_MESSAGE_EXCHANGE")
+@NamedQueries({
+    @NamedQuery(name=MessageExchangeDAOImpl.DELETE_MEXS_BY_PROCESS, query="delete from MessageExchangeDAOImpl as m where m._process = :process"),
+    @NamedQuery(name=MessageExchangeDAOImpl.SELECT_MEX_IDS_BY_PROCESS, query="select m._id from MessageExchangeDAOImpl as m where m._process = :process")
+})
+public class MessageExchangeDAOImpl extends BpelDAO implements MessageExchangeDAO, CorrelatorMessageDAO {
+    private static final Log __log = LogFactory.getLog(MessageExchangeDAOImpl.class);
+    
+    public final static String DELETE_MEXS_BY_PROCESS = "DELETE_MEXS_BY_PROCESS";
+    public final static String SELECT_MEX_IDS_BY_PROCESS = "SELECT_MEX_IDS_BY_PROCESS";
+    
+    @Id @Column(name="MESSAGE_EXCHANGE_ID") 
+    private String _id;
+    @Basic @Column(name="CALLEE")
+    private String _callee;
+    @Basic @Column(name="CHANNEL")
+    private String _channel;
+    @Basic @Column(name="CORRELATION_ID")
+    private String _correlationId;
+    @Basic @Column(name="CORRELATION_STATUS")
+    private String _correlationStatus;
+    @Basic @Column(name="CREATE_TIME")
+    private Date _createTime;
+    @Basic @Column(name="DIRECTION")
+    private char _direction;
+    @Lob   @Column(name="EPR")
+    private String _epr;
+    @Transient private
+    Element _eprElement;
+    @Basic @Column(name="FAULT")
+    private String _fault;
+    @Basic @Column(name="FAULT_EXPLANATION")
+    private String _faultExplanation;
+    @Basic @Column(name="OPERATION")
+    private String _operation;
+    @Basic @Column(name="PARTNER_LINK_MODEL_ID")
+    private int _partnerLinkModelId;
+    @Basic @Column(name="PATTERN")
+    private String _pattern;
+    @Basic @Column(name="PORT_TYPE")
+    private String _portType;
+    @Basic @Column(name="PROPAGATE_TRANS")
+    private boolean _propagateTransactionFlag;
+    @Basic @Column(name="STATUS")
+    private String _status;
+    @Basic @Column(name="CORRELATION_KEYS")
+    private String _correlationKeys;
+    @Basic @Column(name="PIPED_ID")
+    private String _pipedMessageExchangeId;
+    @Basic @Column(name="SUBSCRIBER_COUNT")
+    private int _subscriberCount;
+
+    @OneToMany(targetEntity=MexProperty.class,mappedBy="_mex",fetch=FetchType.EAGER,cascade={CascadeType.ALL})
+    private Collection<MexProperty> _props = new ArrayList<MexProperty>();
+    @ManyToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) @JoinColumn(name="PROCESS_INSTANCE_ID")
+    private ProcessInstanceDAOImpl _processInst;
+    @ManyToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) @JoinColumn(name="PARTNER_LINK_ID")
+    private PartnerLinkDAOImpl _partnerLink;
+    @ManyToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) @JoinColumn(name="PROCESS_ID")
+    private ProcessDAOImpl _process;
+    @OneToOne(fetch=FetchType.LAZY,cascade={CascadeType.ALL}) @JoinColumn(name="REQUEST_MESSAGE_ID")
+    private MessageDAOImpl _request;
+    @OneToOne(fetch=FetchType.LAZY,cascade={CascadeType.ALL}) @JoinColumn(name="RESPONSE_MESSAGE_ID")
+    private MessageDAOImpl _response;
+
+    @ManyToOne(fetch= FetchType.LAZY,cascade={CascadeType.PERSIST}) @JoinColumn(name="CORR_ID")
+    private CorrelatorDAOImpl _correlator;
+
+    public MessageExchangeDAOImpl() {
+    }
+    
+    public MessageExchangeDAOImpl(char direction){
+        _direction = direction;
+        _id = new UUID().toString();
+    }
+    
+    public MessageDAO createMessage(QName type) {
+        MessageDAOImpl ret = new MessageDAOImpl(type,this);
+        return ret ;
+    }
+
+    public QName getCallee() {
+        return _callee == null ? null : QName.valueOf(_callee);
+    }
+
+    public String getChannel() {
+        return _channel;
+    }
+
+    public String getCorrelationId() {
+        return _correlationId;
+    }
+
+    public String getCorrelationStatus() {
+        return _correlationStatus;
+    }
+
+    public Date getCreateTime() {
+        return _createTime;
+    }
+
+    public char getDirection() {
+        return _direction;
+    }
+
+    public Element getEPR() {
+        if ( _eprElement == null && _epr != null && !"".equals(_epr)) {
+            try {
+                _eprElement = DOMUtils.stringToDOM(_epr);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+        return _eprElement;
+    }
+
+    public QName getFault() {
+        return _fault == null ? null : QName.valueOf(_fault);
+    }
+
+    public String getFaultExplanation() {
+        return _faultExplanation;
+    }
+
+    public ProcessInstanceDAO getInstance() {
+        return _processInst;
+    }
+
+    public String getMessageExchangeId() {
+        //return _messageExchangeId;
+        return _id.toString();
+    }
+
+    public String getOperation() {
+        return _operation;
+    }
+
+    public PartnerLinkDAO getPartnerLink() {
+        return _partnerLink;
+    }
+
+    public int getPartnerLinkModelId() {
+        return _partnerLinkModelId;
+    }
+
+    public String getPattern() {
+        return _pattern;
+    }
+
+    public QName getPortType() {
+        return _portType == null ? null : QName.valueOf(_portType);
+    }
+
+    public ProcessDAO getProcess() {
+        return _process;
+    }
+
+    public boolean getPropagateTransactionFlag() {
+        return _propagateTransactionFlag;
+    }
+
+    public String getProperty(String key) {
+        for (MexProperty prop : _props) {
+            if (prop.getPropertyKey().equals(key)) return prop.getPropertyValue();
+        }
+        return null;
+    }
+
+    public Set<String> getPropertyNames() {
+        HashSet<String> propNames = new HashSet<String>();
+        for (MexProperty prop : _props) {
+            propNames.add(prop.getPropertyKey());
+        }
+        return propNames;
+    }
+
+    public MessageDAO getRequest() {
+        return _request;
+    }
+
+    public MessageDAO getResponse() {
+        return _response;
+    }
+
+    public String getStatus() {
+        return _status;
+    }
+
+    public void setCallee(QName callee) {
+        _callee = callee.toString();
+    }
+
+    public void setChannel(String channel) {
+        _channel = channel;
+    }
+
+    public void setCorrelationId(String correlationId) {
+        _correlationId = correlationId;
+    }
+
+    public void setCorrelationStatus(String cstatus) {
+        _correlationStatus = cstatus;
+    }
+
+    public void setEPR(Element epr) {
+        _eprElement = epr;
+        _epr = DOMUtils.domToString(epr);
+    }
+
+    public void setFault(QName faultType) {
+        _fault = faultType == null ? null : faultType.toString();
+    }
+
+    public void setFaultExplanation(String explanation) {
+        if(explanation != null && explanation.length() > 255)
+             explanation = explanation.substring(0,254);
+        _faultExplanation = explanation;
+    }
+
+    public void setInstance(ProcessInstanceDAO dao) {
+        _processInst = (ProcessInstanceDAOImpl)dao;
+    }
+
+    public void setOperation(String opname) {
+        _operation = opname;
+    }
+
+    public void setPartnerLink(PartnerLinkDAO plinkDAO) {
+        _partnerLink = (PartnerLinkDAOImpl)plinkDAO;
+    }
+
+    public void setPartnerLinkModelId(int modelId) {
+        _partnerLinkModelId = modelId;
+    }
+
+    public void setPattern(String pattern) {
+        _pattern = pattern;
+    }
+
+    public void setPortType(QName porttype) {
+        _portType = porttype.toString();
+    }
+
+    public void setProcess(ProcessDAO process) {
+        _process = (ProcessDAOImpl)process;
+    }
+
+    public void setProperty(String key, String value) {
+        _props.add(new MexProperty(key, value, this));
+    }
+
+    public void setRequest(MessageDAO msg) {
+        _request = (MessageDAOImpl)msg;
+    }
+
+    public void setResponse(MessageDAO msg) {
+        _response = (MessageDAOImpl)msg;
+    }
+
+    public void setStatus(String status) {
+        _status = status;
+    }
+
+    public String getPipedMessageExchangeId() {
+        return _pipedMessageExchangeId;
+    }
+
+    public void setPipedMessageExchangeId(String pipedMessageExchangeId) {
+        _pipedMessageExchangeId = pipedMessageExchangeId;
+    }
+
+    void setCorrelationKeySet(CorrelationKeySet correlationKeySet) {
+        _correlationKeys = correlationKeySet.toCanonicalString();
+    }
+
+    CorrelationKeySet getCorrelationKeySet() {
+        return new CorrelationKeySet(_correlationKeys);
+    }
+
+    public CorrelationKey getCorrelationKey() {
+        if (_correlationKeys == null) return null;
+        return getCorrelationKeySet().iterator().next();
+    }
+
+    public void setCorrelationKey(CorrelationKey ckey) {
+        _correlationKeys = ckey.toCanonicalString();
+    }
+
+
+    public void release(boolean doClean) {
+        if( doClean ) {
+            deleteMessages();
+        }
+    }
+
+    public void releasePremieMessages() {
+        // do nothing; early messages are deleted during CorrelatorDaoImpl().dequeueMessage()
+    }
+
+    public void deleteMessages() {
+        if( __log.isDebugEnabled() ) __log.debug("Deleting message on MEX release.");
+        
+        getEM().remove(this); // This deletes MexProperty, REQUEST MessageDAO, RESPONSE MessageDAO
+    }
+
+    public CorrelatorDAOImpl getCorrelator() {
+        return _correlator;
+    }
+
+    public void setCorrelator(CorrelatorDAOImpl correlator) {
+        _correlator = correlator;
+    }
+
+    public int getSubscriberCount() {
+        return _subscriberCount;
+    }
+    
+    public void setSubscriberCount(int subscriberCount) {
+        this._subscriberCount = subscriberCount;
+    }
+
+    public void incrementSubscriberCount() {
+        ++_subscriberCount;
+    }
+    
+    public void release() {
+        // no-op for now, could be used to do some cleanup
+    }
+
+    public void setCreateTime(Date createTime) {
+        _createTime = createTime;
+    }
+}

Added: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MessageRouteDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MessageRouteDAOImpl.java?rev=947046&view=auto
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MessageRouteDAOImpl.java (added)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MessageRouteDAOImpl.java Fri May 21 15:40:59 2010
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.jpa.bpel;
+
+import org.apache.ode.bpel.common.CorrelationKey;
+import org.apache.ode.bpel.common.CorrelationKeySet;
+import org.apache.ode.dao.bpel.MessageRouteDAO;
+import org.apache.ode.dao.bpel.ProcessInstanceDAO;
+
+import javax.persistence.Basic;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.JoinColumn;
+import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+
+@Entity
+@Table(name="ODE_MESSAGE_ROUTE")
+@NamedQueries ({
+	@NamedQuery(name=MessageRouteDAOImpl.DELETE_MESSAGE_ROUTES_BY_INSTANCE_IDS, query="delete from MessageRouteDAOImpl as r where r._instanceId in(:instanceIds)"),
+	@NamedQuery(name=MessageRouteDAOImpl.DELETE_MESSAGE_ROUTES_BY_INSTANCE, query="delete from MessageRouteDAOImpl as r where r._processInst = :instance")
+})
+public class MessageRouteDAOImpl implements MessageRouteDAO {
+	public final static String DELETE_MESSAGE_ROUTES_BY_INSTANCE = "DELETE_MESSAGE_ROUTES_BY_INSTANCE";
+	public final static String DELETE_MESSAGE_ROUTES_BY_INSTANCE_IDS = "DELETE_MESSAGE_ROUTES_BY_INSTANCE_IDS";
+	
+	@Id @Column(name="MESSAGE_ROUTE_ID") 
+	@GeneratedValue(strategy=GenerationType.AUTO)
+	private Long _id;
+	@Basic @Column(name="GROUP_ID")
+    private String _groupId;
+	@Basic @Column(name="ROUTE_INDEX")
+    private int _index;
+	@Basic @Column(name="CORRELATION_KEY")
+    private String _correlationKey;
+	@Basic @Column(name="ROUTE_POLICY", length=16)
+    private String _routePolicy;	
+
+	@SuppressWarnings("unused")
+	@Basic @Column(name="PROCESS_INSTANCE_ID", insertable=false, updatable=false, nullable=true)
+    private int _instanceId;
+    @ManyToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) @JoinColumn(name="PROCESS_INSTANCE_ID")
+    private ProcessInstanceDAOImpl _processInst;
+    
+    @ManyToOne(fetch= FetchType.LAZY,cascade={CascadeType.PERSIST}) @JoinColumn(name="CORR_ID")
+    @SuppressWarnings("unused")
+    private CorrelatorDAOImpl _correlator;
+
+    public MessageRouteDAOImpl() {}
+	public MessageRouteDAOImpl(CorrelationKeySet keySet, String groupId, int index,
+                               ProcessInstanceDAOImpl processInst, CorrelatorDAOImpl correlator, String routePolicy) {
+		_correlationKey = keySet.toCanonicalString();
+		_groupId = groupId;
+		_index = index;
+		_processInst = processInst;
+        _correlator = correlator;
+        _routePolicy = routePolicy;
+    }
+
+    public Long getId() {
+        return _id;
+    }
+
+    public CorrelationKey getCorrelationKey() {
+		return new CorrelationKey(_correlationKey);
+	}
+    
+    public void setCorrelationKey(CorrelationKey key) {
+        _correlationKey = key.toCanonicalString();
+    }
+
+	public String getGroupId() {
+		return _groupId;
+	}
+
+	public int getIndex() {
+		return _index;
+	}
+
+	public ProcessInstanceDAO getTargetInstance() {
+		return _processInst;
+	}
+	
+	public String getRoute() {
+		return _routePolicy;
+	}
+	
+	public CorrelationKeySet getCorrelationKeySet() {
+		return new CorrelationKeySet(_correlationKey);
+	}
+
+    public void setCorrelationKeySet(CorrelationKeySet keySet) {
+        _correlationKey = keySet.toCanonicalString();
+    }
+}

Added: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MexProperty.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MexProperty.java?rev=947046&view=auto
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MexProperty.java (added)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MexProperty.java Fri May 21 15:40:59 2010
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.jpa.bpel;
+
+import javax.persistence.Basic;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.JoinColumn;
+import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+
+/**
+ * @author Matthieu Riou <mriou at apache dot org>
+ */
+@Entity
+@Table(name="ODE_MEX_PROP")
+@NamedQueries({
+	@NamedQuery(name=MexProperty.DELETE_MEX_PROPERTIES_BY_MEX_IDS, query="delete from MexProperty as p where p._mexId in (:mexIds)")
+})
+public class MexProperty {
+	public final static String DELETE_MEX_PROPERTIES_BY_MEX_IDS = "DELETE_MEX_PROPERTIES_BY_MEX_IDS";
+
+    @Id @Column(name="ID")
+    @GeneratedValue(strategy=GenerationType.AUTO)
+    @SuppressWarnings("unused")
+    private Long _id;
+    @Basic @Column(name="PROP_KEY")
+    private String propertyKey;
+    @Basic @Column(name="PROP_VALUE", length=2000)
+    private String propertyValue;
+    
+    @SuppressWarnings("unused")
+    @Basic @Column(name="MEX_ID", insertable=false, updatable=false, nullable=true)
+    private String _mexId;
+    @ManyToOne(fetch= FetchType.LAZY,cascade={CascadeType.PERSIST})
+    @JoinColumn(name="MEX_ID")
+    @SuppressWarnings("unused")
+    private MessageExchangeDAOImpl _mex;
+
+    public MexProperty() {
+    }
+    public MexProperty(String propertyKey, String propertyValue, MessageExchangeDAOImpl mex) {
+        this.propertyKey = propertyKey;
+        this.propertyValue = propertyValue;
+        this._mex = mex;
+    }
+
+    public String getPropertyKey() {
+        return propertyKey;
+    }
+
+    public void setPropertyKey(String propertyKey) {
+        this.propertyKey = propertyKey;
+    }
+
+    public String getPropertyValue() {
+        return propertyValue;
+    }
+
+    public void setPropertyValue(String propertyValue) {
+        this.propertyValue = propertyValue;
+    }
+}

Added: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/PartnerLinkDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/PartnerLinkDAOImpl.java?rev=947046&view=auto
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/PartnerLinkDAOImpl.java (added)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/PartnerLinkDAOImpl.java Fri May 21 15:40:59 2010
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.jpa.bpel;
+
+
+import org.apache.ode.dao.bpel.PartnerLinkDAO;
+import org.apache.ode.utils.DOMUtils;
+import org.w3c.dom.Element;
+
+import javax.persistence.Basic;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.JoinColumn;
+import javax.persistence.Lob;
+import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.persistence.Transient;
+import javax.xml.namespace.QName;
+
+@Entity
+@Table(name="ODE_PARTNER_LINK")
+@NamedQueries({
+    @NamedQuery(name=PartnerLinkDAOImpl.DELETE_PARTNER_LINKS_BY_SCOPE_IDS, query="delete from PartnerLinkDAOImpl as l where l._scopeId in (:scopeIds)")
+})
+public class PartnerLinkDAOImpl implements PartnerLinkDAO {
+	public final static String DELETE_PARTNER_LINKS_BY_SCOPE_IDS = "DELETE_PARTNER_LINKS_BY_SCOPE_IDS";
+	
+	@Id @Column(name="PARTNER_LINK_ID") 
+	@GeneratedValue(strategy=GenerationType.AUTO)
+	@SuppressWarnings("unused")
+	private Long _id;
+	@Lob @Column(name="MY_EPR")
+    private String _myEPR;
+	@Transient
+    private Element _myEPRElement;
+	@Basic @Column(name="MY_ROLE_NAME")
+    private String _myRoleName;
+	@Basic @Column(name="MY_ROLE_SERVICE_NAME")
+    private String _myRoleServiceName;
+	@Basic @Column(name="MY_SESSION_ID")
+    private String _mySessionId;
+	@Lob @Column(name="PARTNER_EPR")
+    private String _partnerEPR;
+	@Transient
+    private Element _partnerEPRElement;
+	@Basic @Column(name="PARTNER_LINK_MODEL_ID")
+    private int _partnerLinkModelId;
+	@Basic @Column(name="PARTNER_LINK_NAME")
+    private String _partnerLinkName;
+	@Basic @Column(name="PARTNER_ROLE_NAME")
+    private String _partnerRoleName;
+	@Basic @Column(name="PARTNER_SESSION_ID")
+    private String _partnerSessionId;
+
+	@SuppressWarnings("unused")
+	@Basic @Column(name="SCOPE_ID", nullable=true, insertable=false, updatable=false)
+    private Long _scopeId;
+    @ManyToOne(fetch= FetchType.LAZY,cascade={CascadeType.PERSIST}) @JoinColumn(name="SCOPE_ID")
+    @SuppressWarnings("unused")
+    private ScopeDAOImpl _scope;
+
+    public PartnerLinkDAOImpl() {}
+	public PartnerLinkDAOImpl(int modelId, String name, String myRole, String partnerRole) {
+		_partnerLinkModelId = modelId;
+		_partnerLinkName = name;
+		_myRoleName = myRole;
+		_partnerRoleName = partnerRole;
+	}
+
+	public Element getMyEPR() {
+		if (_myEPRElement == null && _myEPR != null && !"".equals(_myEPR)) {
+			try {
+				_myEPRElement = DOMUtils.stringToDOM(_myEPR);
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}	
+		}
+		
+		return _myEPRElement;
+	}
+
+	public String getMyRoleName() {
+		return _myRoleName;
+	}
+
+	public QName getMyRoleServiceName() {
+		return _myRoleServiceName == null ? null : QName.valueOf(_myRoleServiceName);
+	}
+
+	public String getMySessionId() {
+		return _mySessionId;
+	}
+
+	public Element getPartnerEPR() {
+		if ( _partnerEPRElement == null && _partnerEPR != null && !"".equals(_partnerEPR)) {
+			try {
+				_partnerEPRElement = DOMUtils.stringToDOM(_partnerEPR);
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}	
+		}
+		return _partnerEPRElement;
+	}
+
+	public int getPartnerLinkModelId() {
+		return _partnerLinkModelId;
+	}
+
+	public String getPartnerLinkName() {
+		return _partnerLinkName;
+	}
+
+	public String getPartnerRoleName() {
+		return _partnerRoleName;
+	}
+
+	public String getPartnerSessionId() {
+		return _partnerSessionId;
+	}
+
+	public void setMyEPR(Element val) {
+		_myEPRElement = val;
+		_myEPR = DOMUtils.domToString(val);
+
+	}
+
+	public void setMyRoleServiceName(QName svcName) {
+		_myRoleServiceName = svcName.toString();
+
+	}
+
+	public void setMySessionId(String sessionId) {
+		_mySessionId = sessionId;
+
+	}
+
+	public void setPartnerEPR(Element val) {
+		_partnerEPRElement = val;
+		_partnerEPR = DOMUtils.domToString(val);
+
+	}
+
+	public void setPartnerSessionId(String session) {
+		_partnerSessionId = session;
+
+	}
+
+    public void setScope(ScopeDAOImpl scope) {
+        _scope = scope;
+    }
+}

Added: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/ProcessDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/ProcessDAOImpl.java?rev=947046&view=auto
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/ProcessDAOImpl.java (added)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/ProcessDAOImpl.java Fri May 21 15:40:59 2010
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.jpa.bpel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.common.CorrelationKey;
+import org.apache.ode.bpel.common.ProcessState;
+import org.apache.ode.dao.bpel.CorrelatorDAO;
+import org.apache.ode.dao.bpel.MessageExchangeDAO;
+import org.apache.ode.dao.bpel.ProcessDAO;
+import org.apache.ode.dao.bpel.ProcessInstanceDAO;
+
+import javax.persistence.*;
+import javax.xml.namespace.QName;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * @author Matthieu Riou <mriou at apache dot org>
+ */
+@Entity
+@Table(name="ODE_PROCESS")
+@NamedQueries({
+    @NamedQuery(name="ActiveInstances", query="select i from ProcessInstanceDAOImpl as i where i._process = :process and i._state = :state"),
+    @NamedQuery(name="InstanceByCKey", query="select cs._scope._processInstance from CorrelationSetDAOImpl as cs where cs._correlationKey = :ckey"),
+    @NamedQuery(name="CorrelatorByKey", query="select c from CorrelatorDAOImpl as c where c._correlatorKey = :ckey and c._process = :process")
+})
+public class ProcessDAOImpl extends BpelDAO implements ProcessDAO {
+    private static final Log __log = LogFactory.getLog(ProcessDAOImpl.class);
+
+    @Id @Column(name="ID")
+    @GeneratedValue(strategy= GenerationType.AUTO)
+    private Long _id;
+
+    @Basic @Column(name="PROCESS_ID")
+    private String _processId;
+    @Basic @Column(name="PROCESS_TYPE")
+    private String _processType;
+    @Basic @Column(name="GUID")
+    private String _guid;
+    @Basic @Column(name="VERSION")
+    private long _version;
+
+    @OneToMany(targetEntity=CorrelatorDAOImpl.class,mappedBy="_process",fetch=FetchType.LAZY,cascade={CascadeType.ALL})
+    private Collection<CorrelatorDAOImpl> _correlators = new ArrayList<CorrelatorDAOImpl>();
+
+    public ProcessDAOImpl() {}
+    public ProcessDAOImpl(QName pid, QName type, String guid, long version) {
+        _processId = pid.toString();
+        _processType = type.toString();
+        _guid = guid;
+        _version = version;
+    }
+
+    public Serializable getId() {
+        return _id; 
+    }
+    
+    public void setId(Long id) {
+        _id = id;
+    }
+
+    public CorrelatorDAO addCorrelator(String correlator) {
+        CorrelatorDAOImpl corr = new CorrelatorDAOImpl(correlator, this);
+        _correlators.add(corr);
+        return corr;
+    }
+
+    @SuppressWarnings("unchecked")
+    public CorrelatorDAO getCorrelator(String correlatorId) {
+        Query qry = getEM().createNamedQuery("CorrelatorByKey");
+        qry.setParameter("ckey", correlatorId);
+        qry.setParameter("process", this);
+        List res = qry.getResultList();
+        if (res.size() == 0) return null;
+        return (CorrelatorDAO) res.get(0);
+    }
+
+    public ProcessInstanceDAO createInstance(CorrelatorDAO instantiatingCorrelator) {
+        ProcessInstanceDAOImpl inst = new ProcessInstanceDAOImpl((CorrelatorDAOImpl)instantiatingCorrelator, this);
+        getEM().persist(inst);
+        return inst;
+    }
+
+    public ProcessInstanceDAO createInstance(CorrelatorDAO instantiatingCorrelator, MessageExchangeDAO mex) {
+        ProcessInstanceDAOImpl inst = new ProcessInstanceDAOImpl((CorrelatorDAOImpl)instantiatingCorrelator, this);
+        getEM().persist(inst);
+        return inst;
+    }
+
+    @SuppressWarnings("unchecked")
+    public Collection<ProcessInstanceDAO> findInstance(CorrelationKey ckey) {
+        Query qry = getEM().createNamedQuery("InstanceByCKey");
+        qry.setParameter("ckey", ckey.toCanonicalString());
+        return qry.getResultList();
+    }
+
+    public ProcessInstanceDAO getInstance(Long iid) {
+        return getEM().find(ProcessInstanceDAOImpl.class, iid);
+    }
+
+    public QName getProcessId() {
+        return QName.valueOf(_processId);
+    }
+
+    public QName getType() {
+        return QName.valueOf(_processType);
+    }
+
+    @SuppressWarnings("unchecked")
+    public void deleteProcessAndRoutes() {
+        // delete routes
+        Collection instanceIds = getEM().createNamedQuery(ProcessInstanceDAOImpl.SELECT_INSTANCE_IDS_BY_PROCESS).setParameter("process", this).getResultList();
+        batchUpdateByIds(instanceIds.iterator(), getEM().createNamedQuery(MessageRouteDAOImpl.DELETE_MESSAGE_ROUTES_BY_INSTANCE_IDS), "instanceIds");
+        getEM().createNamedQuery(CorrelatorDAOImpl.DELETE_CORRELATORS_BY_PROCESS).setParameter("process", this).executeUpdate();
+
+        deleteInstances(Integer.MAX_VALUE);
+        
+        // delete process dao
+        getEM().remove(this); // This deletes CorrelatorDAO
+        getEM().flush();
+    }
+    
+    private int deleteInstances(int transactionSize) {
+        if(__log.isDebugEnabled()) __log.debug("Cleaning up process data.");
+
+        deleteEvents();
+        deleteCorrelations();
+        deleteMessages();
+        deleteVariables();
+        deleteProcessInstances();
+        
+        return 0;
+    }
+
+    @SuppressWarnings("unchecked")
+    private void deleteProcessInstances() {
+        Collection faultIds = getEM().createNamedQuery(ProcessInstanceDAOImpl.SELECT_FAULT_IDS_BY_PROCESS).setParameter("process", this).getResultList();
+        batchUpdateByIds(faultIds.iterator(), getEM().createNamedQuery(FaultDAOImpl.DELETE_FAULTS_BY_IDS), "ids");
+        Collection instanceIds = getEM().createNamedQuery(ProcessInstanceDAOImpl.SELECT_INSTANCE_IDS_BY_PROCESS).setParameter("process", this).getResultList();
+        batchUpdateByIds(instanceIds.iterator(), getEM().createNamedQuery(ActivityRecoveryDAOImpl.DELETE_ACTIVITY_RECOVERIES_BY_IDS), "ids");
+        getEM().createNamedQuery(ProcessInstanceDAOImpl.DELETE_INSTANCES_BY_PROCESS).setParameter("process", this).executeUpdate();
+    }
+
+    @SuppressWarnings("unchecked")
+    private void deleteVariables() {
+        Collection xmlDataIds = getEM().createNamedQuery(XmlDataDAOImpl.SELECT_XMLDATA_IDS_BY_PROCESS).setParameter("process", this).getResultList();
+        batchUpdateByIds(xmlDataIds.iterator(), getEM().createNamedQuery(XmlDataProperty.DELETE_XML_DATA_PROPERTIES_BY_XML_DATA_IDS), "xmlDataIds");
+        Collection scopeIds = getEM().createNamedQuery(ScopeDAOImpl.SELECT_SCOPE_IDS_BY_PROCESS).setParameter("process", this).getResultList();
+        batchUpdateByIds(scopeIds.iterator(), getEM().createNamedQuery(XmlDataDAOImpl.DELETE_XMLDATA_BY_SCOPE_IDS), "scopeIds");
+
+//      Collection scopeIds = getEM().createNamedQuery(ScopeDAOImpl.SELECT_SCOPE_IDS_BY_PROCESS).setParameter("process", this).getResultList();
+        batchUpdateByIds(scopeIds.iterator(), getEM().createNamedQuery(PartnerLinkDAOImpl.DELETE_PARTNER_LINKS_BY_SCOPE_IDS), "scopeIds");
+        batchUpdateByIds(scopeIds.iterator(), getEM().createNamedQuery(ScopeDAOImpl.DELETE_SCOPES_BY_SCOPE_IDS), "ids");
+    }
+
+    @SuppressWarnings("unchecked")
+    private void deleteMessages() {
+        getEM().createNamedQuery(MessageDAOImpl.DELETE_MESSAGES_BY_PROCESS).setParameter("process", this).executeUpdate();
+        Collection mexIds = getEM().createNamedQuery(MessageExchangeDAOImpl.SELECT_MEX_IDS_BY_PROCESS).setParameter("process", this).getResultList();
+        batchUpdateByIds(mexIds.iterator(), getEM().createNamedQuery(MexProperty.DELETE_MEX_PROPERTIES_BY_MEX_IDS), "mexIds");
+        getEM().createNamedQuery(MessageExchangeDAOImpl.DELETE_MEXS_BY_PROCESS).setParameter("process", this).executeUpdate();
+    }
+
+    @SuppressWarnings("unchecked")
+    private void deleteCorrelations() {
+        Collection corrSetIds = getEM().createNamedQuery(CorrelationSetDAOImpl.SELECT_CORRELATION_SET_IDS_BY_PROCESS).setParameter("process", this).getResultList();
+        batchUpdateByIds(corrSetIds.iterator(), getEM().createNamedQuery(CorrSetProperty.DELETE_CORSET_PROPERTIES_BY_PROPERTY_IDS), "corrSetIds");
+        batchUpdateByIds(corrSetIds.iterator(), getEM().createNamedQuery(CorrelationSetDAOImpl.DELETE_CORRELATION_SETS_BY_IDS), "ids");
+    }
+
+    @SuppressWarnings("unchecked")
+    private void deleteEvents() {
+        Collection eventIds = getEM().createNamedQuery(EventDAOImpl.SELECT_EVENT_IDS_BY_PROCESS).setParameter("process", this).getResultList();
+        batchUpdateByIds(eventIds.iterator(), getEM().createNamedQuery(EventDAOImpl.DELETE_EVENTS_BY_IDS), "ids");
+    }
+
+    public int getNumInstances() {
+        Long instanceCount = (Long) getSingleResult(getEM().createNamedQuery(ProcessInstanceDAOImpl.COUNT_INSTANCE_IDS_BY_PROCESS).setParameter("process", this));
+        return (instanceCount == null ? 0 : instanceCount.intValue());
+    }
+
+    public long getVersion() {
+        return _version;
+    }
+
+    public void instanceCompleted(ProcessInstanceDAO instance) {
+        // nothing to do here (yet?)
+    }
+
+    public void removeRoutes(String routeId, ProcessInstanceDAO target) {
+        for (CorrelatorDAO c : _correlators) {
+            ((CorrelatorDAOImpl)c).removeLocalRoutes(routeId, target);
+        }
+    }
+
+    public String getGuid() {
+        return _guid;
+    }
+
+    @SuppressWarnings("unchecked")
+    public Collection<ProcessInstanceDAO> getActiveInstances() {
+        Query qry = getEM().createNamedQuery("ActiveInstances");
+        qry.setParameter("process", this);
+        qry.setParameter("state", ProcessState.STATE_ACTIVE);
+        return qry.getResultList();
+    }
+}
\ No newline at end of file