You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by je...@apache.org on 2010/08/18 06:12:59 UTC
svn commit: r986561 [12/13] - in /ode/trunk: ./ axis2-war/
axis2-war/src/main/assembly/ axis2-war/src/test/java/org/apache/ode/axis2/
axis2-war/src/test/java/org/apache/ode/axis2/instancecleanup/
axis2-war/src/test/java/org/apache/ode/bpel/dao/ axis2-w...
Added: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/CorrelatorDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/CorrelatorDAOImpl.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/CorrelatorDAOImpl.java (added)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/CorrelatorDAOImpl.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.jpa.bpel;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.persistence.Basic;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.JoinColumn;
+import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.OneToMany;
+import javax.persistence.Query;
+import javax.persistence.Table;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.common.CorrelationKeySet;
+import org.apache.ode.dao.bpel.CorrelatorDAO;
+import org.apache.ode.dao.bpel.CorrelatorMessageDAO;
+import org.apache.ode.dao.bpel.MessageExchangeDAO;
+import org.apache.ode.dao.bpel.MessageRouteDAO;
+import org.apache.ode.dao.bpel.ProcessInstanceDAO;
+
+@Entity
+@Table(name = "ODE_CORRELATOR")
+@NamedQueries( { @NamedQuery(name = CorrelatorDAOImpl.DELETE_CORRELATORS_BY_PROCESS, query = "delete from CorrelatorDAOImpl as c where c._process = :process") })
+public class CorrelatorDAOImpl extends BpelDAO implements CorrelatorDAO {
+ private static Log __log = LogFactory.getLog(CorrelatorDAOImpl.class);
+ public final static String DELETE_CORRELATORS_BY_PROCESS = "DELETE_CORRELATORS_BY_PROCESS";
+ private final static String ROUTE_BY_CKEY_HEADER = "select route from MessageRouteDAOImpl as route where route._correlator._process._processType = :ptype and route._correlator._correlatorKey = :corrkey";
+
+ @Id
+ @Column(name = "CORRELATOR_ID")
+ @GeneratedValue(strategy = GenerationType.AUTO)
+ @SuppressWarnings("unused")
+ private Long _correlatorId;
+ @Basic
+ @Column(name = "CORRELATOR_KEY")
+ private String _correlatorKey;
+ @OneToMany(targetEntity = MessageRouteDAOImpl.class, mappedBy = "_correlator", fetch = FetchType.EAGER, cascade = { CascadeType.MERGE, CascadeType.PERSIST, CascadeType.REFRESH })
+ private Collection<MessageRouteDAOImpl> _routes = new ArrayList<MessageRouteDAOImpl>();
+ @OneToMany(targetEntity = MessageExchangeDAOImpl.class, mappedBy = "_correlator", fetch = FetchType.LAZY, cascade = { CascadeType.MERGE, CascadeType.PERSIST, CascadeType.REFRESH })
+ private Collection<MessageExchangeDAOImpl> _exchanges = new ArrayList<MessageExchangeDAOImpl>();
+ @ManyToOne(fetch = FetchType.LAZY, cascade = { CascadeType.PERSIST })
+ @JoinColumn(name = "PROC_ID")
+ private ProcessDAOImpl _process;
+
+ public CorrelatorDAOImpl() {
+ }
+
+ public CorrelatorDAOImpl(String correlatorKey, ProcessDAOImpl process) {
+ _correlatorKey = correlatorKey;
+ _process = process;
+ }
+
+ public void addRoute(String routeGroupId, ProcessInstanceDAO target, int index, CorrelationKeySet correlationKeySet, String routePolicy) {
+ if (__log.isDebugEnabled()) {
+ __log.debug("addRoute " + routeGroupId + " " + target + " " + index + " " + correlationKeySet + " " + routePolicy);
+ }
+ MessageRouteDAOImpl mr = new MessageRouteDAOImpl(correlationKeySet, routeGroupId, index, (ProcessInstanceDAOImpl) target, this, routePolicy);
+ _routes.add(mr);
+ getEM().flush();
+ }
+
+ public MessageExchangeDAO dequeueMessage(CorrelationKeySet correlationKeySet) {
+ // TODO: this thing does not seem to be scalable: loading up based on a correlator???
+ for (Iterator<MessageExchangeDAOImpl> itr = _exchanges.iterator(); itr.hasNext();) {
+ MessageExchangeDAOImpl mex = itr.next();
+ if (mex.getCorrelationKeySet().isRoutableTo(correlationKeySet, false)) {
+ itr.remove();
+ return mex;
+ }
+ }
+ return null;
+ }
+
+ public void enqueueMessage(MessageExchangeDAO mex, CorrelationKeySet correlationKeySet) {
+ MessageExchangeDAOImpl mexImpl = (MessageExchangeDAOImpl) mex;
+ mexImpl.setCorrelationKeySet(correlationKeySet);
+ _exchanges.add(mexImpl);
+ mexImpl.setCorrelator(this);
+ }
+
+ public Collection<CorrelatorMessageDAO> getAllMessages() {
+ return new ArrayList<CorrelatorMessageDAO>(_exchanges);
+ }
+
+ @SuppressWarnings("unchecked")
+ public List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet) {
+ if (__log.isDebugEnabled()) {
+ __log.debug("findRoute " + correlationKeySet);
+ }
+ List<CorrelationKeySet> subSets = correlationKeySet.findSubSets();
+ Query qry = getEM().createQuery(generateSelectorQuery(ROUTE_BY_CKEY_HEADER, subSets));
+ qry.setParameter("ptype", _process.getType().toString());
+ qry.setParameter("corrkey", _correlatorKey);
+ for (int i = 0; i < subSets.size(); i++) {
+ qry.setParameter("s" + i, subSets.get(i).toCanonicalString());
+ }
+
+ List<MessageRouteDAO> candidateRoutes = (List<MessageRouteDAO>) qry.getResultList();
+ if (candidateRoutes.size() > 0) {
+ List<MessageRouteDAO> matchingRoutes = new ArrayList<MessageRouteDAO>();
+ boolean routed = false;
+ for (int i = 0; i < candidateRoutes.size(); i++) {
+ MessageRouteDAO route = candidateRoutes.get(i);
+ if ("all".equals(route.getRoute())) {
+ matchingRoutes.add(route);
+ } else {
+ if (!routed) {
+ matchingRoutes.add(route);
+ }
+ routed = true;
+ }
+ }
+ if (__log.isDebugEnabled()) {
+ __log.debug("findRoute found " + matchingRoutes);
+ }
+ return matchingRoutes;
+ } else {
+ if (__log.isDebugEnabled()) {
+ __log.debug("findRoute found nothing");
+ }
+ return null;
+ }
+ }
+
+ private String generateSelectorQuery(String header, List<CorrelationKeySet> subSets) {
+ StringBuffer filterQuery = new StringBuffer(header);
+
+ if (subSets.size() == 1) {
+ filterQuery.append(" and route._correlationKey = :s0");
+ } else if (subSets.size() > 1) {
+ filterQuery.append(" and route._correlationKey in(");
+ for (int i = 0; i < subSets.size(); i++) {
+ if (i > 0) {
+ filterQuery.append(", ");
+ }
+ filterQuery.append(":s").append(i);
+ }
+ filterQuery.append(")");
+ }
+
+ return filterQuery.toString();
+ }
+
+ public String getCorrelatorId() {
+ return _correlatorKey;
+ }
+
+ public void setCorrelatorId(String newId) {
+ _correlatorKey = newId;
+ }
+
+ public void removeRoutes(String routeGroupId, ProcessInstanceDAO target) {
+ // remove route across all correlators of the process
+ ((ProcessInstanceDAOImpl) target).removeRoutes(routeGroupId);
+ }
+
+ void removeLocalRoutes(String routeGroupId, ProcessInstanceDAO target) {
+ if (__log.isDebugEnabled()) {
+ __log.debug("removeLocalRoutes " + routeGroupId);
+ }
+ boolean flush = false;
+ for (Iterator<MessageRouteDAOImpl> itr = _routes.iterator(); itr.hasNext();) {
+ MessageRouteDAOImpl mr = itr.next();
+ if (mr.getGroupId().equals(routeGroupId) && mr.getTargetInstance().equals(target)) {
+ if (__log.isDebugEnabled()) {
+ __log.debug("removing " + mr.getCorrelationKey() + " " + mr.getIndex() + " " + mr.getRoute());
+ }
+ itr.remove();
+ getEM().remove(mr);
+ flush = true;
+ }
+ }
+ if (flush) {
+ getEM().flush();
+ }
+ }
+
+ public Collection<MessageRouteDAO> getAllRoutes() {
+ return new ArrayList<MessageRouteDAO>(_routes);
+ }
+
+ public boolean checkRoute(CorrelationKeySet correlationKeySet) {
+ // TODO Auto-generated method stub
+ return true;
+ }
+}
Added: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/EventDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/EventDAOImpl.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/EventDAOImpl.java (added)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/EventDAOImpl.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.jpa.bpel;
+
+import org.apache.ode.bpel.evt.BpelEvent;
+
+import javax.persistence.Basic;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.JoinColumn;
+import javax.persistence.Lob;
+import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import java.sql.Timestamp;
+
+/**
+ * @author Matthieu Riou <mriou at apache dot org>
+ */
+@Entity
+@Table(name="ODE_EVENT")
+@NamedQueries({
+ @NamedQuery(name=EventDAOImpl.SELECT_EVENT_IDS_BY_PROCESS, query="select e._id from EventDAOImpl as e where e._instance._process = :process"),
+ @NamedQuery(name=EventDAOImpl.DELETE_EVENTS_BY_IDS, query="delete from EventDAOImpl as e where e._id in (:ids)"),
+ @NamedQuery(name=EventDAOImpl.DELETE_EVENTS_BY_INSTANCE, query="delete from EventDAOImpl as e where e._instance = :instance")
+})
+public class EventDAOImpl extends BpelDAO {
+ public final static String SELECT_EVENT_IDS_BY_PROCESS = "SELECT_EVENT_IDS_BY_PROCESS";
+ public final static String DELETE_EVENTS_BY_IDS = "DELETE_EVENTS_BY_IDS";
+ public final static String DELETE_EVENTS_BY_INSTANCE = "DELETE_EVENTS_BY_INSTANCE";
+
+ @Id @Column(name="EVENT_ID")
+ @GeneratedValue(strategy= GenerationType.AUTO)
+ private Long _id;
+ @Basic @Column(name="TSTAMP")
+ private Timestamp _tstamp;
+ @Basic @Column(name="TYPE")
+ private String _type;
+ @Basic @Column(name="DETAIL")
+ private String _detail;
+
+ /** Scope identifier, possibly null. */
+ @Basic @Column(name="SCOPE_ID")
+ private Long _scopeId;
+
+ @ManyToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) @JoinColumn(name="PROCESS_ID")
+ private ProcessDAOImpl _process;
+ @ManyToOne(fetch= FetchType.LAZY,cascade={CascadeType.PERSIST}) @JoinColumn(name="INSTANCE_ID")
+ private ProcessInstanceDAOImpl _instance;
+ @Lob @Column(name="DATA")
+ private BpelEvent _event;
+
+ public BpelEvent getEvent() {
+ return _event;
+ }
+
+ public void setEvent(BpelEvent event) {
+ _event = event;
+ }
+
+ public String getDetail() {
+ return _detail;
+ }
+
+ public void setDetail(String detail) {
+ _detail = detail;
+ }
+
+ public Long getId() {
+ return _id;
+ }
+
+ public void setId(Long id) {
+ _id = id;
+ }
+
+ public ProcessInstanceDAOImpl getInstance() {
+ return _instance;
+ }
+
+ public void setInstance(ProcessInstanceDAOImpl instance) {
+ _instance = instance;
+ }
+
+ public ProcessDAOImpl getProcess() {
+ return _process;
+ }
+
+ public void setProcess(ProcessDAOImpl process) {
+ _process = process;
+ }
+
+ public Timestamp getTstamp() {
+ return _tstamp;
+ }
+
+ public void setTstamp(Timestamp tstamp) {
+ _tstamp = tstamp;
+ }
+
+ public String getType() {
+ return _type;
+ }
+
+ public void setType(String type) {
+ _type = type;
+ }
+
+ public Long getScopeId() {
+ return _scopeId;
+ }
+
+ public void setScopeId(Long scopeId) {
+ _scopeId = scopeId;
+ }
+}
Added: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/FaultDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/FaultDAOImpl.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/FaultDAOImpl.java (added)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/FaultDAOImpl.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.jpa.bpel;
+
+import org.apache.ode.dao.bpel.FaultDAO;
+import org.apache.ode.utils.DOMUtils;
+import org.w3c.dom.Element;
+
+import javax.persistence.Basic;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Lob;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.xml.namespace.QName;
+
+
+@Entity
+@Table(name="ODE_FAULT")
+@NamedQueries({
+ @NamedQuery(name=FaultDAOImpl.DELETE_FAULTS_BY_IDS, query="delete from FaultDAOImpl as f where f._id in(:ids)")
+})
+public class FaultDAOImpl implements FaultDAO {
+ public final static String DELETE_FAULTS_BY_IDS = "DELETE_FAULTS_BY_IDS";
+
+ @Id @Column(name="FAULT_ID")
+ @GeneratedValue(strategy=GenerationType.AUTO)
+ @SuppressWarnings("unused")
+ private Long _id;
+ @Basic @Column(name="NAME")
+ private String _name;
+ @Basic @Column(name="MESSAGE", length=4000)
+ private String _explanation;
+ @Lob @Column(name="DATA")
+ private String _data;
+ @Basic @Column(name="LINE_NUMBER")
+ private int _lineNo;
+ @Basic @Column(name="ACTIVITY_ID")
+ private int _activityId;
+
+ public FaultDAOImpl() {}
+ public FaultDAOImpl(QName faultName, String explanation, int faultLineNo,
+ int activityId, Element faultMessage) {
+ _name = faultName.toString();
+ _explanation = explanation;
+ _lineNo = faultLineNo;
+ _activityId = activityId;
+ _data = (faultMessage == null)?null:DOMUtils.domToString(faultMessage);
+ }
+
+ public int getActivityId() {
+ return _activityId;
+ }
+
+ public Element getData() {
+ Element ret = null;
+
+ try {
+ ret = (_data == null)?null:DOMUtils.stringToDOM(_data);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ return ret;
+ }
+
+ public String getExplanation() {
+ return _explanation;
+ }
+
+ public int getLineNo() {
+ return _lineNo;
+ }
+
+ public QName getName() {
+ return _name == null ? null : QName.valueOf(_name);
+ }
+
+}
Added: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/JpaTxMgrProvider.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/JpaTxMgrProvider.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/JpaTxMgrProvider.java (added)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/JpaTxMgrProvider.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.jpa.bpel;
+
+import javax.transaction.NotSupportedException;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+
+import org.apache.openjpa.ee.ManagedRuntime;
+import org.apache.openjpa.util.GeneralException;
+
+public class JpaTxMgrProvider implements ManagedRuntime {
+ private TransactionManager _txMgr;
+
+ public JpaTxMgrProvider(TransactionManager txMgr) {
+ _txMgr = txMgr;
+ }
+
+ public TransactionManager getTransactionManager() throws Exception {
+ return _txMgr;
+ }
+
+ public void setRollbackOnly(Throwable cause) throws Exception {
+ // there is no generic support for setting the rollback cause
+ getTransactionManager().getTransaction().setRollbackOnly();
+ }
+
+ public Throwable getRollbackCause() throws Exception {
+ // there is no generic support for setting the rollback cause
+ return null;
+ }
+
+ public Object getTransactionKey() throws Exception, SystemException {
+ return _txMgr.getTransaction();
+ }
+
+ public void doNonTransactionalWork(java.lang.Runnable runnable) throws NotSupportedException {
+ TransactionManager tm = null;
+ Transaction transaction = null;
+
+ try {
+ tm = getTransactionManager();
+ transaction = tm.suspend();
+ } catch (Exception e) {
+ NotSupportedException nse =
+ new NotSupportedException(e.getMessage());
+ nse.initCause(e);
+ throw nse;
+ }
+
+ runnable.run();
+
+ try {
+ tm.resume(transaction);
+ } catch (Exception e) {
+ try {
+ transaction.setRollbackOnly();
+ }
+ catch(SystemException se2) {
+ throw new GeneralException(se2);
+ }
+ NotSupportedException nse =
+ new NotSupportedException(e.getMessage());
+ nse.initCause(e);
+ throw nse;
+ }
+ }
+}
\ No newline at end of file
Added: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MessageDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MessageDAOImpl.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MessageDAOImpl.java (added)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MessageDAOImpl.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.jpa.bpel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.dao.bpel.MessageDAO;
+import org.apache.ode.dao.bpel.MessageExchangeDAO;
+import org.apache.ode.utils.DOMUtils;
+import org.w3c.dom.Element;
+
+import javax.persistence.Basic;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.JoinColumn;
+import javax.persistence.Lob;
+import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.persistence.Transient;
+import javax.xml.namespace.QName;
+
+@Entity
+@Table(name = "ODE_MESSAGE")
+@NamedQueries( { @NamedQuery(name = MessageDAOImpl.DELETE_MESSAGES_BY_PROCESS, query = "delete from MessageDAOImpl as m where m._messageExchange._process = :process") })
+public class MessageDAOImpl implements MessageDAO {
+ private static Log __log = LogFactory.getLog(MessageDAOImpl.class);
+ public final static String DELETE_MESSAGES_BY_PROCESS = "DELETE_MESSAGES_BY_PROCESS";
+
+ @Id
+ @Column(name = "MESSAGE_ID")
+ @GeneratedValue(strategy = GenerationType.AUTO)
+ @SuppressWarnings("unused")
+ private Long _id;
+ @Basic
+ @Column(name = "TYPE")
+ private String _type;
+ @Lob
+ @Column(name = "DATA")
+ private String _data;
+ @Lob
+ @Column(name = "HEADER")
+ private String _header;
+ @ManyToOne(fetch = FetchType.LAZY, cascade = { CascadeType.ALL })
+ @JoinColumn(name = "MESSAGE_EXCHANGE_ID")
+ private MessageExchangeDAOImpl _messageExchange;
+
+ public MessageDAOImpl() {
+ }
+
+ public MessageDAOImpl(QName type, MessageExchangeDAOImpl me) {
+ _type = type.toString();
+ _messageExchange = me;
+ }
+
+ public Element getData() {
+ if (__log.isDebugEnabled()) {
+ __log.debug("getData " + _id + " " + _data);
+ }
+ try {
+ return _data == null ? null : DOMUtils.stringToDOM(_data);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void setData(Element value) {
+ if (value == null) {
+ if (__log.isDebugEnabled()) {
+ __log.debug("setData " + _id + " null");
+ }
+ return;
+ }
+ _data = DOMUtils.domToString(value);
+
+ if (__log.isDebugEnabled()) {
+ __log.debug("setData " + _id + " " + _data);
+ }
+ }
+
+ public Element getHeader() {
+ try {
+ return _header == null ? null : DOMUtils.stringToDOM(_header);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void setHeader(Element value) {
+ if (value == null)
+ return;
+ _header = DOMUtils.domToString(value);
+ }
+
+ public MessageExchangeDAO getMessageExchange() {
+ return _messageExchange;
+ }
+
+ public QName getType() {
+ return _type == null ? null : QName.valueOf(_type);
+ }
+
+ public void setType(QName type) {
+ _type = type.toString();
+ }
+
+}
Added: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MessageExchangeDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MessageExchangeDAOImpl.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MessageExchangeDAOImpl.java (added)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MessageExchangeDAOImpl.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.jpa.bpel;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.persistence.Basic;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.Id;
+import javax.persistence.JoinColumn;
+import javax.persistence.Lob;
+import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.OneToMany;
+import javax.persistence.OneToOne;
+import javax.persistence.Table;
+import javax.persistence.Transient;
+import javax.xml.namespace.QName;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.common.CorrelationKey;
+import org.apache.ode.bpel.common.CorrelationKeySet;
+import org.apache.ode.dao.bpel.CorrelatorMessageDAO;
+import org.apache.ode.dao.bpel.MessageDAO;
+import org.apache.ode.dao.bpel.MessageExchangeDAO;
+import org.apache.ode.dao.bpel.PartnerLinkDAO;
+import org.apache.ode.dao.bpel.ProcessDAO;
+import org.apache.ode.dao.bpel.ProcessInstanceDAO;
+import org.apache.ode.utils.DOMUtils;
+import org.apache.ode.utils.uuid.UUID;
+import org.w3c.dom.Element;
+
+@Entity
+@Table(name="ODE_MESSAGE_EXCHANGE")
+@NamedQueries({
+ @NamedQuery(name=MessageExchangeDAOImpl.DELETE_MEXS_BY_PROCESS, query="delete from MessageExchangeDAOImpl as m where m._process = :process"),
+ @NamedQuery(name=MessageExchangeDAOImpl.SELECT_MEX_IDS_BY_PROCESS, query="select m._id from MessageExchangeDAOImpl as m where m._process = :process")
+})
+public class MessageExchangeDAOImpl extends BpelDAO implements MessageExchangeDAO, CorrelatorMessageDAO {
+ private static final Log __log = LogFactory.getLog(MessageExchangeDAOImpl.class);
+
+ public final static String DELETE_MEXS_BY_PROCESS = "DELETE_MEXS_BY_PROCESS";
+ public final static String SELECT_MEX_IDS_BY_PROCESS = "SELECT_MEX_IDS_BY_PROCESS";
+
+ @Id @Column(name="MESSAGE_EXCHANGE_ID")
+ private String _id;
+ @Basic @Column(name="CALLEE")
+ private String _callee;
+ @Basic @Column(name="CHANNEL")
+ private String _channel;
+ @Basic @Column(name="CORRELATION_ID")
+ private String _correlationId;
+ @Basic @Column(name="CORRELATION_STATUS")
+ private String _correlationStatus;
+ @Basic @Column(name="CREATE_TIME")
+ private Date _createTime;
+ @Basic @Column(name="DIRECTION")
+ private char _direction;
+ @Lob @Column(name="EPR")
+ private String _epr;
+ @Transient private
+ Element _eprElement;
+ @Basic @Column(name="FAULT")
+ private String _fault;
+ @Basic @Column(name="FAULT_EXPLANATION")
+ private String _faultExplanation;
+ @Basic @Column(name="OPERATION")
+ private String _operation;
+ @Basic @Column(name="PARTNER_LINK_MODEL_ID")
+ private int _partnerLinkModelId;
+ @Basic @Column(name="PATTERN")
+ private String _pattern;
+ @Basic @Column(name="PORT_TYPE")
+ private String _portType;
+ @Basic @Column(name="PROPAGATE_TRANS")
+ private boolean _propagateTransactionFlag;
+ @Basic @Column(name="STATUS")
+ private String _status;
+ @Basic @Column(name="CORRELATION_KEYS")
+ private String _correlationKeys;
+ @Basic @Column(name="PIPED_ID")
+ private String _pipedMessageExchangeId;
+ @Basic @Column(name="SUBSCRIBER_COUNT")
+ private int _subscriberCount;
+
+ @OneToMany(targetEntity=MexProperty.class,mappedBy="_mex",fetch=FetchType.EAGER,cascade={CascadeType.ALL})
+ private Collection<MexProperty> _props = new ArrayList<MexProperty>();
+ @ManyToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) @JoinColumn(name="PROCESS_INSTANCE_ID")
+ private ProcessInstanceDAOImpl _processInst;
+ @ManyToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) @JoinColumn(name="PARTNER_LINK_ID")
+ private PartnerLinkDAOImpl _partnerLink;
+ @ManyToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) @JoinColumn(name="PROCESS_ID")
+ private ProcessDAOImpl _process;
+ @OneToOne(fetch=FetchType.LAZY,cascade={CascadeType.ALL}) @JoinColumn(name="REQUEST_MESSAGE_ID")
+ private MessageDAOImpl _request;
+ @OneToOne(fetch=FetchType.LAZY,cascade={CascadeType.ALL}) @JoinColumn(name="RESPONSE_MESSAGE_ID")
+ private MessageDAOImpl _response;
+
+ @ManyToOne(fetch= FetchType.LAZY,cascade={CascadeType.PERSIST}) @JoinColumn(name="CORR_ID")
+ private CorrelatorDAOImpl _correlator;
+
+ public MessageExchangeDAOImpl() {
+ }
+
+ public MessageExchangeDAOImpl(char direction){
+ _direction = direction;
+ _id = new UUID().toString();
+ }
+
+ public MessageDAO createMessage(QName type) {
+ MessageDAOImpl ret = new MessageDAOImpl(type,this);
+ return ret ;
+ }
+
+ public QName getCallee() {
+ return _callee == null ? null : QName.valueOf(_callee);
+ }
+
+ public String getChannel() {
+ return _channel;
+ }
+
+ public String getCorrelationId() {
+ return _correlationId;
+ }
+
+ public String getCorrelationStatus() {
+ return _correlationStatus;
+ }
+
+ public Date getCreateTime() {
+ return _createTime;
+ }
+
+ public char getDirection() {
+ return _direction;
+ }
+
+ public Element getEPR() {
+ if ( _eprElement == null && _epr != null && !"".equals(_epr)) {
+ try {
+ _eprElement = DOMUtils.stringToDOM(_epr);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return _eprElement;
+ }
+
+ public QName getFault() {
+ return _fault == null ? null : QName.valueOf(_fault);
+ }
+
+ public String getFaultExplanation() {
+ return _faultExplanation;
+ }
+
+ public ProcessInstanceDAO getInstance() {
+ return _processInst;
+ }
+
+ public String getMessageExchangeId() {
+ //return _messageExchangeId;
+ return _id.toString();
+ }
+
+ public String getOperation() {
+ return _operation;
+ }
+
+ public PartnerLinkDAO getPartnerLink() {
+ return _partnerLink;
+ }
+
+ public int getPartnerLinkModelId() {
+ return _partnerLinkModelId;
+ }
+
+ public String getPattern() {
+ return _pattern;
+ }
+
+ public QName getPortType() {
+ return _portType == null ? null : QName.valueOf(_portType);
+ }
+
+ public ProcessDAO getProcess() {
+ return _process;
+ }
+
+ public boolean getPropagateTransactionFlag() {
+ return _propagateTransactionFlag;
+ }
+
+ public String getProperty(String key) {
+ for (MexProperty prop : _props) {
+ if (prop.getPropertyKey().equals(key)) return prop.getPropertyValue();
+ }
+ return null;
+ }
+
+ public Set<String> getPropertyNames() {
+ HashSet<String> propNames = new HashSet<String>();
+ for (MexProperty prop : _props) {
+ propNames.add(prop.getPropertyKey());
+ }
+ return propNames;
+ }
+
+ public MessageDAO getRequest() {
+ return _request;
+ }
+
+ public MessageDAO getResponse() {
+ return _response;
+ }
+
+ public String getStatus() {
+ return _status;
+ }
+
+ public void setCallee(QName callee) {
+ _callee = callee.toString();
+ }
+
+ public void setChannel(String channel) {
+ _channel = channel;
+ }
+
+ public void setCorrelationId(String correlationId) {
+ _correlationId = correlationId;
+ }
+
+ public void setCorrelationStatus(String cstatus) {
+ _correlationStatus = cstatus;
+ }
+
+ public void setEPR(Element epr) {
+ _eprElement = epr;
+ _epr = DOMUtils.domToString(epr);
+ }
+
+ public void setFault(QName faultType) {
+ _fault = faultType == null ? null : faultType.toString();
+ }
+
+ public void setFaultExplanation(String explanation) {
+ if(explanation != null && explanation.length() > 255)
+ explanation = explanation.substring(0,254);
+ _faultExplanation = explanation;
+ }
+
+ public void setInstance(ProcessInstanceDAO dao) {
+ _processInst = (ProcessInstanceDAOImpl)dao;
+ }
+
+ public void setOperation(String opname) {
+ _operation = opname;
+ }
+
+ public void setPartnerLink(PartnerLinkDAO plinkDAO) {
+ _partnerLink = (PartnerLinkDAOImpl)plinkDAO;
+ }
+
+ public void setPartnerLinkModelId(int modelId) {
+ _partnerLinkModelId = modelId;
+ }
+
+ public void setPattern(String pattern) {
+ _pattern = pattern;
+ }
+
+ public void setPortType(QName porttype) {
+ _portType = porttype.toString();
+ }
+
+ public void setProcess(ProcessDAO process) {
+ _process = (ProcessDAOImpl)process;
+ }
+
+ public void setProperty(String key, String value) {
+ _props.add(new MexProperty(key, value, this));
+ }
+
+ public void setRequest(MessageDAO msg) {
+ _request = (MessageDAOImpl)msg;
+ }
+
+ public void setResponse(MessageDAO msg) {
+ _response = (MessageDAOImpl)msg;
+ }
+
+ public void setStatus(String status) {
+ _status = status;
+ }
+
+ public String getPipedMessageExchangeId() {
+ return _pipedMessageExchangeId;
+ }
+
+ public void setPipedMessageExchangeId(String pipedMessageExchangeId) {
+ _pipedMessageExchangeId = pipedMessageExchangeId;
+ }
+
+ void setCorrelationKeySet(CorrelationKeySet correlationKeySet) {
+ _correlationKeys = correlationKeySet.toCanonicalString();
+ }
+
+ CorrelationKeySet getCorrelationKeySet() {
+ return new CorrelationKeySet(_correlationKeys);
+ }
+
+ public CorrelationKey getCorrelationKey() {
+ if (_correlationKeys == null) return null;
+ return getCorrelationKeySet().iterator().next();
+ }
+
+ public void setCorrelationKey(CorrelationKey ckey) {
+ _correlationKeys = ckey.toCanonicalString();
+ }
+
+
+ public void release(boolean doClean) {
+ if( doClean ) {
+ deleteMessages();
+ }
+ }
+
+ public void releasePremieMessages() {
+ // do nothing; early messages are deleted during CorrelatorDaoImpl().dequeueMessage()
+ }
+
+ public void deleteMessages() {
+ if( __log.isDebugEnabled() ) __log.debug("Deleting message on MEX release.");
+
+ getEM().remove(this); // This deletes MexProperty, REQUEST MessageDAO, RESPONSE MessageDAO
+ }
+
+ public CorrelatorDAOImpl getCorrelator() {
+ return _correlator;
+ }
+
+ public void setCorrelator(CorrelatorDAOImpl correlator) {
+ _correlator = correlator;
+ }
+
+ public int getSubscriberCount() {
+ return _subscriberCount;
+ }
+
+ public void setSubscriberCount(int subscriberCount) {
+ this._subscriberCount = subscriberCount;
+ }
+
+ public void incrementSubscriberCount() {
+ ++_subscriberCount;
+ }
+
+ public void release() {
+ // no-op for now, could be used to do some cleanup
+ }
+
+ public void setCreateTime(Date createTime) {
+ _createTime = createTime;
+ }
+
+ public boolean lockPremieMessages() {
+ return true;
+ }
+}
Added: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MessageRouteDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MessageRouteDAOImpl.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MessageRouteDAOImpl.java (added)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MessageRouteDAOImpl.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.jpa.bpel;
+
+import org.apache.ode.bpel.common.CorrelationKey;
+import org.apache.ode.bpel.common.CorrelationKeySet;
+import org.apache.ode.dao.bpel.MessageRouteDAO;
+import org.apache.ode.dao.bpel.ProcessInstanceDAO;
+
+import javax.persistence.Basic;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.JoinColumn;
+import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+
+@Entity
+@Table(name="ODE_MESSAGE_ROUTE")
+@NamedQueries ({
+ @NamedQuery(name=MessageRouteDAOImpl.DELETE_MESSAGE_ROUTES_BY_INSTANCE_IDS, query="delete from MessageRouteDAOImpl as r where r._instanceId in(:instanceIds)"),
+ @NamedQuery(name=MessageRouteDAOImpl.DELETE_MESSAGE_ROUTES_BY_INSTANCE, query="delete from MessageRouteDAOImpl as r where r._processInst = :instance")
+})
+public class MessageRouteDAOImpl implements MessageRouteDAO {
+ public final static String DELETE_MESSAGE_ROUTES_BY_INSTANCE = "DELETE_MESSAGE_ROUTES_BY_INSTANCE";
+ public final static String DELETE_MESSAGE_ROUTES_BY_INSTANCE_IDS = "DELETE_MESSAGE_ROUTES_BY_INSTANCE_IDS";
+
+ @Id @Column(name="MESSAGE_ROUTE_ID")
+ @GeneratedValue(strategy=GenerationType.AUTO)
+ private Long _id;
+ @Basic @Column(name="GROUP_ID")
+ private String _groupId;
+ @Basic @Column(name="ROUTE_INDEX")
+ private int _index;
+ @Basic @Column(name="CORRELATION_KEY")
+ private String _correlationKey;
+ @Basic @Column(name="ROUTE_POLICY", length=16)
+ private String _routePolicy;
+
+ @SuppressWarnings("unused")
+ @Basic @Column(name="PROCESS_INSTANCE_ID", insertable=false, updatable=false, nullable=true)
+ private int _instanceId;
+ @ManyToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) @JoinColumn(name="PROCESS_INSTANCE_ID")
+ private ProcessInstanceDAOImpl _processInst;
+
+ @ManyToOne(fetch= FetchType.LAZY,cascade={CascadeType.PERSIST}) @JoinColumn(name="CORR_ID")
+ @SuppressWarnings("unused")
+ private CorrelatorDAOImpl _correlator;
+
+ public MessageRouteDAOImpl() {}
+ public MessageRouteDAOImpl(CorrelationKeySet keySet, String groupId, int index,
+ ProcessInstanceDAOImpl processInst, CorrelatorDAOImpl correlator, String routePolicy) {
+ _correlationKey = keySet.toCanonicalString();
+ _groupId = groupId;
+ _index = index;
+ _processInst = processInst;
+ _correlator = correlator;
+ _routePolicy = routePolicy;
+ }
+
+ public Long getId() {
+ return _id;
+ }
+
+ public CorrelationKey getCorrelationKey() {
+ return new CorrelationKey(_correlationKey);
+ }
+
+ public void setCorrelationKey(CorrelationKey key) {
+ _correlationKey = key.toCanonicalString();
+ }
+
+ public String getGroupId() {
+ return _groupId;
+ }
+
+ public int getIndex() {
+ return _index;
+ }
+
+ public ProcessInstanceDAO getTargetInstance() {
+ return _processInst;
+ }
+
+ public String getRoute() {
+ return _routePolicy;
+ }
+
+ public CorrelationKeySet getCorrelationKeySet() {
+ return new CorrelationKeySet(_correlationKey);
+ }
+
+ public void setCorrelationKeySet(CorrelationKeySet keySet) {
+ _correlationKey = keySet.toCanonicalString();
+ }
+}
Added: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MexProperty.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MexProperty.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MexProperty.java (added)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/MexProperty.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.jpa.bpel;
+
+import javax.persistence.Basic;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.JoinColumn;
+import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+
+/**
+ * @author Matthieu Riou <mriou at apache dot org>
+ */
+@Entity
+@Table(name="ODE_MEX_PROP")
+@NamedQueries({
+ @NamedQuery(name=MexProperty.DELETE_MEX_PROPERTIES_BY_MEX_IDS, query="delete from MexProperty as p where p._mexId in (:mexIds)")
+})
+public class MexProperty {
+ public final static String DELETE_MEX_PROPERTIES_BY_MEX_IDS = "DELETE_MEX_PROPERTIES_BY_MEX_IDS";
+
+ @Id @Column(name="ID")
+ @GeneratedValue(strategy=GenerationType.AUTO)
+ @SuppressWarnings("unused")
+ private Long _id;
+ @Basic @Column(name="PROP_KEY")
+ private String propertyKey;
+ @Basic @Column(name="PROP_VALUE", length=2000)
+ private String propertyValue;
+
+ @SuppressWarnings("unused")
+ @Basic @Column(name="MEX_ID", insertable=false, updatable=false, nullable=true)
+ private String _mexId;
+ @ManyToOne(fetch= FetchType.LAZY,cascade={CascadeType.PERSIST})
+ @JoinColumn(name="MEX_ID")
+ @SuppressWarnings("unused")
+ private MessageExchangeDAOImpl _mex;
+
+ public MexProperty() {
+ }
+ public MexProperty(String propertyKey, String propertyValue, MessageExchangeDAOImpl mex) {
+ this.propertyKey = propertyKey;
+ this.propertyValue = propertyValue;
+ this._mex = mex;
+ }
+
+ public String getPropertyKey() {
+ return propertyKey;
+ }
+
+ public void setPropertyKey(String propertyKey) {
+ this.propertyKey = propertyKey;
+ }
+
+ public String getPropertyValue() {
+ return propertyValue;
+ }
+
+ public void setPropertyValue(String propertyValue) {
+ this.propertyValue = propertyValue;
+ }
+}
Added: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/PartnerLinkDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/PartnerLinkDAOImpl.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/PartnerLinkDAOImpl.java (added)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/PartnerLinkDAOImpl.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.jpa.bpel;
+
+
+import org.apache.ode.dao.bpel.PartnerLinkDAO;
+import org.apache.ode.utils.DOMUtils;
+import org.w3c.dom.Element;
+
+import javax.persistence.Basic;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.JoinColumn;
+import javax.persistence.Lob;
+import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.persistence.Transient;
+import javax.xml.namespace.QName;
+
+@Entity
+@Table(name="ODE_PARTNER_LINK")
+@NamedQueries({
+ @NamedQuery(name=PartnerLinkDAOImpl.DELETE_PARTNER_LINKS_BY_SCOPE_IDS, query="delete from PartnerLinkDAOImpl as l where l._scopeId in (:scopeIds)")
+})
+public class PartnerLinkDAOImpl implements PartnerLinkDAO {
+ public final static String DELETE_PARTNER_LINKS_BY_SCOPE_IDS = "DELETE_PARTNER_LINKS_BY_SCOPE_IDS";
+
+ @Id @Column(name="PARTNER_LINK_ID")
+ @GeneratedValue(strategy=GenerationType.AUTO)
+ @SuppressWarnings("unused")
+ private Long _id;
+ @Lob @Column(name="MY_EPR")
+ private String _myEPR;
+ @Transient
+ private Element _myEPRElement;
+ @Basic @Column(name="MY_ROLE_NAME")
+ private String _myRoleName;
+ @Basic @Column(name="MY_ROLE_SERVICE_NAME")
+ private String _myRoleServiceName;
+ @Basic @Column(name="MY_SESSION_ID")
+ private String _mySessionId;
+ @Lob @Column(name="PARTNER_EPR")
+ private String _partnerEPR;
+ @Transient
+ private Element _partnerEPRElement;
+ @Basic @Column(name="PARTNER_LINK_MODEL_ID")
+ private int _partnerLinkModelId;
+ @Basic @Column(name="PARTNER_LINK_NAME")
+ private String _partnerLinkName;
+ @Basic @Column(name="PARTNER_ROLE_NAME")
+ private String _partnerRoleName;
+ @Basic @Column(name="PARTNER_SESSION_ID")
+ private String _partnerSessionId;
+
+ @SuppressWarnings("unused")
+ @Basic @Column(name="SCOPE_ID", nullable=true, insertable=false, updatable=false)
+ private Long _scopeId;
+ @ManyToOne(fetch= FetchType.LAZY,cascade={CascadeType.PERSIST}) @JoinColumn(name="SCOPE_ID")
+ @SuppressWarnings("unused")
+ private ScopeDAOImpl _scope;
+
+ public PartnerLinkDAOImpl() {}
+ public PartnerLinkDAOImpl(int modelId, String name, String myRole, String partnerRole) {
+ _partnerLinkModelId = modelId;
+ _partnerLinkName = name;
+ _myRoleName = myRole;
+ _partnerRoleName = partnerRole;
+ }
+
+ public Element getMyEPR() {
+ if (_myEPRElement == null && _myEPR != null && !"".equals(_myEPR)) {
+ try {
+ _myEPRElement = DOMUtils.stringToDOM(_myEPR);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ return _myEPRElement;
+ }
+
+ public String getMyRoleName() {
+ return _myRoleName;
+ }
+
+ public QName getMyRoleServiceName() {
+ return _myRoleServiceName == null ? null : QName.valueOf(_myRoleServiceName);
+ }
+
+ public String getMySessionId() {
+ return _mySessionId;
+ }
+
+ public Element getPartnerEPR() {
+ if ( _partnerEPRElement == null && _partnerEPR != null && !"".equals(_partnerEPR)) {
+ try {
+ _partnerEPRElement = DOMUtils.stringToDOM(_partnerEPR);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return _partnerEPRElement;
+ }
+
+ public int getPartnerLinkModelId() {
+ return _partnerLinkModelId;
+ }
+
+ public String getPartnerLinkName() {
+ return _partnerLinkName;
+ }
+
+ public String getPartnerRoleName() {
+ return _partnerRoleName;
+ }
+
+ public String getPartnerSessionId() {
+ return _partnerSessionId;
+ }
+
+ public void setMyEPR(Element val) {
+ _myEPRElement = val;
+ _myEPR = DOMUtils.domToString(val);
+
+ }
+
+ public void setMyRoleServiceName(QName svcName) {
+ _myRoleServiceName = svcName.toString();
+
+ }
+
+ public void setMySessionId(String sessionId) {
+ _mySessionId = sessionId;
+
+ }
+
+ public void setPartnerEPR(Element val) {
+ _partnerEPRElement = val;
+ _partnerEPR = DOMUtils.domToString(val);
+
+ }
+
+ public void setPartnerSessionId(String session) {
+ _partnerSessionId = session;
+
+ }
+
+ public void setScope(ScopeDAOImpl scope) {
+ _scope = scope;
+ }
+}
Added: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/ProcessDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/ProcessDAOImpl.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/ProcessDAOImpl.java (added)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/ProcessDAOImpl.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.jpa.bpel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.common.CorrelationKey;
+import org.apache.ode.bpel.common.ProcessState;
+import org.apache.ode.dao.bpel.CorrelatorDAO;
+import org.apache.ode.dao.bpel.MessageExchangeDAO;
+import org.apache.ode.dao.bpel.ProcessDAO;
+import org.apache.ode.dao.bpel.ProcessInstanceDAO;
+
+import javax.persistence.*;
+import javax.xml.namespace.QName;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * @author Matthieu Riou <mriou at apache dot org>
+ */
+@Entity
+@Table(name="ODE_PROCESS")
+@NamedQueries({
+ @NamedQuery(name="ActiveInstances", query="select i from ProcessInstanceDAOImpl as i where i._process = :process and i._state = :state"),
+ @NamedQuery(name="InstanceByCKey", query="select cs._scope._processInstance from CorrelationSetDAOImpl as cs where cs._correlationKey = :ckey"),
+ @NamedQuery(name="CorrelatorByKey", query="select c from CorrelatorDAOImpl as c where c._correlatorKey = :ckey and c._process = :process")
+})
+public class ProcessDAOImpl extends BpelDAO implements ProcessDAO {
+ private static final Log __log = LogFactory.getLog(ProcessDAOImpl.class);
+
+ @Id @Column(name="ID")
+ @GeneratedValue(strategy= GenerationType.AUTO)
+ private Long _id;
+
+ @Basic @Column(name="PROCESS_ID")
+ private String _processId;
+ @Basic @Column(name="PROCESS_TYPE")
+ private String _processType;
+ @Basic @Column(name="GUID")
+ private String _guid;
+ @Basic @Column(name="VERSION")
+ private long _version;
+
+ @OneToMany(targetEntity=CorrelatorDAOImpl.class,mappedBy="_process",fetch=FetchType.LAZY,cascade={CascadeType.ALL})
+ private Collection<CorrelatorDAOImpl> _correlators = new ArrayList<CorrelatorDAOImpl>();
+
+ public ProcessDAOImpl() {}
+ public ProcessDAOImpl(QName pid, QName type, String guid, long version) {
+ _processId = pid.toString();
+ _processType = type.toString();
+ _guid = guid;
+ _version = version;
+ }
+
+ public Serializable getId() {
+ return _id;
+ }
+
+ public void setId(Long id) {
+ _id = id;
+ }
+
+ public CorrelatorDAO addCorrelator(String correlator) {
+ CorrelatorDAOImpl corr = new CorrelatorDAOImpl(correlator, this);
+ _correlators.add(corr);
+ return corr;
+ }
+
+ @SuppressWarnings("unchecked")
+ public CorrelatorDAO getCorrelator(String correlatorId) {
+ Query qry = getEM().createNamedQuery("CorrelatorByKey");
+ qry.setParameter("ckey", correlatorId);
+ qry.setParameter("process", this);
+ List res = qry.getResultList();
+ if (res.size() == 0) return null;
+ return (CorrelatorDAO) res.get(0);
+ }
+
+ public ProcessInstanceDAO createInstance(CorrelatorDAO instantiatingCorrelator) {
+ ProcessInstanceDAOImpl inst = new ProcessInstanceDAOImpl((CorrelatorDAOImpl)instantiatingCorrelator, this);
+ getEM().persist(inst);
+ return inst;
+ }
+
+ public ProcessInstanceDAO createInstance(CorrelatorDAO instantiatingCorrelator, MessageExchangeDAO mex) {
+ ProcessInstanceDAOImpl inst = new ProcessInstanceDAOImpl((CorrelatorDAOImpl)instantiatingCorrelator, this);
+ getEM().persist(inst);
+ return inst;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Collection<ProcessInstanceDAO> findInstance(CorrelationKey ckey) {
+ Query qry = getEM().createNamedQuery("InstanceByCKey");
+ qry.setParameter("ckey", ckey.toCanonicalString());
+ return qry.getResultList();
+ }
+
+ public ProcessInstanceDAO getInstance(Long iid) {
+ return getEM().find(ProcessInstanceDAOImpl.class, iid);
+ }
+
+ public QName getProcessId() {
+ return QName.valueOf(_processId);
+ }
+
+ public QName getType() {
+ return QName.valueOf(_processType);
+ }
+
+ @SuppressWarnings("unchecked")
+ public void deleteProcessAndRoutes() {
+ // delete routes
+ Collection instanceIds = getEM().createNamedQuery(ProcessInstanceDAOImpl.SELECT_INSTANCE_IDS_BY_PROCESS).setParameter("process", this).getResultList();
+ batchUpdateByIds(instanceIds.iterator(), getEM().createNamedQuery(MessageRouteDAOImpl.DELETE_MESSAGE_ROUTES_BY_INSTANCE_IDS), "instanceIds");
+ getEM().createNamedQuery(CorrelatorDAOImpl.DELETE_CORRELATORS_BY_PROCESS).setParameter("process", this).executeUpdate();
+
+ deleteInstances(Integer.MAX_VALUE);
+
+ // delete process dao
+ getEM().remove(this); // This deletes CorrelatorDAO
+ getEM().flush();
+ }
+
+ private int deleteInstances(int transactionSize) {
+ if(__log.isDebugEnabled()) __log.debug("Cleaning up process data.");
+
+ deleteEvents();
+ deleteCorrelations();
+ deleteMessages();
+ deleteVariables();
+ deleteProcessInstances();
+
+ return 0;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void deleteProcessInstances() {
+ Collection faultIds = getEM().createNamedQuery(ProcessInstanceDAOImpl.SELECT_FAULT_IDS_BY_PROCESS).setParameter("process", this).getResultList();
+ batchUpdateByIds(faultIds.iterator(), getEM().createNamedQuery(FaultDAOImpl.DELETE_FAULTS_BY_IDS), "ids");
+ Collection instanceIds = getEM().createNamedQuery(ProcessInstanceDAOImpl.SELECT_INSTANCE_IDS_BY_PROCESS).setParameter("process", this).getResultList();
+ batchUpdateByIds(instanceIds.iterator(), getEM().createNamedQuery(ActivityRecoveryDAOImpl.DELETE_ACTIVITY_RECOVERIES_BY_IDS), "ids");
+ getEM().createNamedQuery(ProcessInstanceDAOImpl.DELETE_INSTANCES_BY_PROCESS).setParameter("process", this).executeUpdate();
+ }
+
+ @SuppressWarnings("unchecked")
+ private void deleteVariables() {
+ Collection xmlDataIds = getEM().createNamedQuery(XmlDataDAOImpl.SELECT_XMLDATA_IDS_BY_PROCESS).setParameter("process", this).getResultList();
+ batchUpdateByIds(xmlDataIds.iterator(), getEM().createNamedQuery(XmlDataProperty.DELETE_XML_DATA_PROPERTIES_BY_XML_DATA_IDS), "xmlDataIds");
+ Collection scopeIds = getEM().createNamedQuery(ScopeDAOImpl.SELECT_SCOPE_IDS_BY_PROCESS).setParameter("process", this).getResultList();
+ batchUpdateByIds(scopeIds.iterator(), getEM().createNamedQuery(XmlDataDAOImpl.DELETE_XMLDATA_BY_SCOPE_IDS), "scopeIds");
+
+// Collection scopeIds = getEM().createNamedQuery(ScopeDAOImpl.SELECT_SCOPE_IDS_BY_PROCESS).setParameter("process", this).getResultList();
+ batchUpdateByIds(scopeIds.iterator(), getEM().createNamedQuery(PartnerLinkDAOImpl.DELETE_PARTNER_LINKS_BY_SCOPE_IDS), "scopeIds");
+ batchUpdateByIds(scopeIds.iterator(), getEM().createNamedQuery(ScopeDAOImpl.DELETE_SCOPES_BY_SCOPE_IDS), "ids");
+ }
+
+ @SuppressWarnings("unchecked")
+ private void deleteMessages() {
+ getEM().createNamedQuery(MessageDAOImpl.DELETE_MESSAGES_BY_PROCESS).setParameter("process", this).executeUpdate();
+ Collection mexIds = getEM().createNamedQuery(MessageExchangeDAOImpl.SELECT_MEX_IDS_BY_PROCESS).setParameter("process", this).getResultList();
+ batchUpdateByIds(mexIds.iterator(), getEM().createNamedQuery(MexProperty.DELETE_MEX_PROPERTIES_BY_MEX_IDS), "mexIds");
+ getEM().createNamedQuery(MessageExchangeDAOImpl.DELETE_MEXS_BY_PROCESS).setParameter("process", this).executeUpdate();
+ }
+
+ @SuppressWarnings("unchecked")
+ private void deleteCorrelations() {
+ Collection corrSetIds = getEM().createNamedQuery(CorrelationSetDAOImpl.SELECT_CORRELATION_SET_IDS_BY_PROCESS).setParameter("process", this).getResultList();
+ batchUpdateByIds(corrSetIds.iterator(), getEM().createNamedQuery(CorrSetProperty.DELETE_CORSET_PROPERTIES_BY_PROPERTY_IDS), "corrSetIds");
+ batchUpdateByIds(corrSetIds.iterator(), getEM().createNamedQuery(CorrelationSetDAOImpl.DELETE_CORRELATION_SETS_BY_IDS), "ids");
+ }
+
+ @SuppressWarnings("unchecked")
+ private void deleteEvents() {
+ Collection eventIds = getEM().createNamedQuery(EventDAOImpl.SELECT_EVENT_IDS_BY_PROCESS).setParameter("process", this).getResultList();
+ batchUpdateByIds(eventIds.iterator(), getEM().createNamedQuery(EventDAOImpl.DELETE_EVENTS_BY_IDS), "ids");
+ }
+
+ public int getNumInstances() {
+ Long instanceCount = (Long) getSingleResult(getEM().createNamedQuery(ProcessInstanceDAOImpl.COUNT_INSTANCE_IDS_BY_PROCESS).setParameter("process", this));
+ return (instanceCount == null ? 0 : instanceCount.intValue());
+ }
+
+ public long getVersion() {
+ return _version;
+ }
+
+ public void instanceCompleted(ProcessInstanceDAO instance) {
+ // nothing to do here (yet?)
+ }
+
+ public void removeRoutes(String routeId, ProcessInstanceDAO target) {
+ for (CorrelatorDAO c : _correlators) {
+ ((CorrelatorDAOImpl)c).removeLocalRoutes(routeId, target);
+ }
+ }
+
+ public String getGuid() {
+ return _guid;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Collection<ProcessInstanceDAO> getActiveInstances() {
+ Query qry = getEM().createNamedQuery("ActiveInstances");
+ qry.setParameter("process", this);
+ qry.setParameter("state", ProcessState.STATE_ACTIVE);
+ return qry.getResultList();
+ }
+}
\ No newline at end of file
Added: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/ProcessInstanceDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/ProcessInstanceDAOImpl.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/ProcessInstanceDAOImpl.java (added)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/ProcessInstanceDAOImpl.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.jpa.bpel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.common.ProcessState;
+import org.apache.ode.bpel.evt.ProcessInstanceEvent;
+import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
+import org.apache.ode.dao.bpel.ActivityRecoveryDAO;
+import org.apache.ode.dao.bpel.BpelDAOConnection;
+import org.apache.ode.dao.bpel.CorrelationSetDAO;
+import org.apache.ode.dao.bpel.CorrelatorDAO;
+import org.apache.ode.dao.bpel.FaultDAO;
+import org.apache.ode.dao.bpel.MessageExchangeDAO;
+import org.apache.ode.dao.bpel.ProcessDAO;
+import org.apache.ode.dao.bpel.ProcessInstanceDAO;
+import org.apache.ode.dao.bpel.ScopeDAO;
+import org.apache.ode.dao.bpel.ScopeStateEnum;
+import org.apache.ode.dao.bpel.XmlDataDAO;
+import org.w3c.dom.Element;
+
+import javax.persistence.Basic;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.JoinColumn;
+import javax.persistence.Lob;
+import javax.persistence.ManyToOne;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.OneToMany;
+import javax.persistence.OneToOne;
+import javax.persistence.Table;
+import javax.xml.namespace.QName;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Entity
+@Table(name="ODE_PROCESS_INSTANCE")
+@NamedQueries({
+ @NamedQuery(name=ProcessInstanceDAOImpl.DELETE_INSTANCES_BY_PROCESS, query="delete from ProcessInstanceDAOImpl as i where i._process = :process"),
+ @NamedQuery(name=ProcessInstanceDAOImpl.SELECT_INSTANCE_IDS_BY_PROCESS, query="select i._instanceId from ProcessInstanceDAOImpl as i where i._process = :process"),
+ @NamedQuery(name=ProcessInstanceDAOImpl.COUNT_INSTANCE_IDS_BY_PROCESS, query="select count(i._instanceId) from ProcessInstanceDAOImpl as i where i._process = :process"),
+ @NamedQuery(name=ProcessInstanceDAOImpl.SELECT_FAULT_IDS_BY_PROCESS, query="select i._faultId from ProcessInstanceDAOImpl as i where i._process = :process and i._faultId is not null"),
+ @NamedQuery(name=ProcessInstanceDAOImpl.COUNT_FAILED_INSTANCES_BY_STATUS_AND_PROCESS_ID,
+ query="select count(i._instanceId), max(i._lastRecovery) from ProcessInstanceDAOImpl as i where i._process._processId = :processId and i._state in(:states) and exists(select r from ActivityRecoveryDAOImpl r where i = r._instance)")
+})
+public class ProcessInstanceDAOImpl extends BpelDAO implements ProcessInstanceDAO {
+ private static final Log __log = LogFactory.getLog(ProcessInstanceDAOImpl.class);
+
+ public final static String DELETE_INSTANCES_BY_PROCESS = "DELETE_INSTANCES_BY_PROCESS";
+ public final static String SELECT_INSTANCE_IDS_BY_PROCESS = "SELECT_INSTANCE_IDS_BY_PROCESS";
+ public final static String COUNT_INSTANCE_IDS_BY_PROCESS = "COUNT_INSTANCE_IDS_BY_PROCESS";
+
+ public final static String SELECT_FAULT_IDS_BY_PROCESS = "SELECT_FAULT_IDS_BY_PROCESS";
+ public final static String COUNT_FAILED_INSTANCES_BY_STATUS_AND_PROCESS_ID = "COUNT_FAILED_INSTANCES_BY_STATUS_AND_PROCESS_ID";
+
+ @Id @Column(name="ID")
+ @GeneratedValue(strategy=GenerationType.AUTO)
+ private Long _instanceId;
+ @Basic @Column(name="LAST_RECOVERY_DATE")
+ private Date _lastRecovery;
+ @Basic @Column(name="LAST_ACTIVE_TIME")
+ private Date _lastActive;
+ @Basic @Column(name="INSTANCE_STATE")
+ private short _state;
+ @Basic @Column(name="PREVIOUS_STATE")
+ private short _previousState;
+ @Lob @Column(name="EXECUTION_STATE")
+ private byte[] _executionState;
+ @Basic @Column(name="SEQUENCE")
+ private long _sequence;
+ @Basic @Column(name="DATE_CREATED")
+ private Date _dateCreated = new Date();
+
+ @OneToOne(fetch=FetchType.LAZY,cascade={CascadeType.MERGE, CascadeType.PERSIST, CascadeType.REFRESH}) @JoinColumn(name="ROOT_SCOPE_ID")
+ private ScopeDAOImpl _rootScope;
+ @OneToMany(targetEntity=ScopeDAOImpl.class,mappedBy="_processInstance",fetch=FetchType.LAZY,cascade={CascadeType.MERGE, CascadeType.PERSIST, CascadeType.REFRESH})
+ private Collection<ScopeDAO> _scopes = new ArrayList<ScopeDAO>();
+ @OneToMany(targetEntity=ActivityRecoveryDAOImpl.class,mappedBy="_instance",fetch=FetchType.LAZY,cascade={CascadeType.ALL})
+ private Collection<ActivityRecoveryDAO> _recoveries = new ArrayList<ActivityRecoveryDAO>();
+
+ @SuppressWarnings("unused")
+ @Basic @Column(name="FAULT_ID", insertable=false, updatable=false, nullable=true)
+ private long _faultId;
+ @OneToOne(fetch=FetchType.LAZY,cascade={CascadeType.MERGE, CascadeType.PERSIST, CascadeType.REFRESH}) @JoinColumn(name="FAULT_ID")
+ private FaultDAOImpl _fault;
+ @ManyToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) @JoinColumn(name="PROCESS_ID")
+ private ProcessDAOImpl _process;
+ @ManyToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) @JoinColumn(name="INSTANTIATING_CORRELATOR_ID")
+ private CorrelatorDAOImpl _instantiatingCorrelator;
+
+ @OneToMany(targetEntity=MessageExchangeDAOImpl.class,mappedBy="_processInst",fetch=FetchType.LAZY)
+ @SuppressWarnings("unused")
+ private Collection<MessageExchangeDAO> _messageExchanges = new ArrayList<MessageExchangeDAO>();
+
+ private transient int _activityFailureCount = -1;
+
+ public ProcessInstanceDAOImpl() {}
+ public ProcessInstanceDAOImpl(CorrelatorDAOImpl correlator, ProcessDAOImpl process) {
+ _instantiatingCorrelator = correlator;
+ _process = process;
+ }
+
+ public void createActivityRecovery(String channel, long activityId,
+ String reason, Date dateTime, Element data, String[] actions,
+ int retries) {
+ ActivityRecoveryDAOImpl ar = new ActivityRecoveryDAOImpl(channel, activityId, reason, dateTime, data, actions, retries);
+ _recoveries.add(ar);
+ ar.setInstance(this);
+ _lastRecovery = dateTime;
+ }
+
+ public ScopeDAO createScope(ScopeDAO parentScope, String name, int scopeModelId) {
+ ScopeDAOImpl ret = new ScopeDAOImpl((ScopeDAOImpl)parentScope,name,scopeModelId,this);
+ ret.setState(ScopeStateEnum.ACTIVE);
+ _scopes.add(ret);
+ _rootScope = (parentScope == null)?ret:_rootScope;
+
+ // Must persist the scope to generate a scope ID
+ getEM().persist(ret);
+ return ret;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Collection<CorrelationSetDAO> selectCorrelationSets(Collection<ProcessInstanceDAO> instances) {
+ return getEM().createNamedQuery(CorrelationSetDAOImpl.SELECT_CORRELATION_SETS_BY_INSTANCES).setParameter("instances", instances).getResultList();
+ }
+
+ public void delete(Set<CLEANUP_CATEGORY> cleanupCategories) {
+ delete(cleanupCategories, true);
+ }
+
+ public void delete(Set<CLEANUP_CATEGORY> cleanupCategories, boolean deleteMyRoleMex) {
+ if(__log.isDebugEnabled()) __log.debug("Cleaning up instance Data with " + cleanupCategories);
+
+ // remove jacob state
+ setExecutionState(null);
+ if (getEM() != null) {
+ if( !cleanupCategories.isEmpty() ) {
+ // by default, we do not flush before select; flush it, so we can delete no matter if an entity is loaded up
+ // or not; more importantly, OpenJPA will secretly load from the entire table if some entities reside only
+ // in memory
+ getEM().flush();
+ }
+
+ if (cleanupCategories.contains(CLEANUP_CATEGORY.EVENTS)) {
+ deleteEvents();
+ }
+ if (cleanupCategories.contains(CLEANUP_CATEGORY.CORRELATIONS)) {
+ deleteCorrelations();
+ }
+ if( cleanupCategories.contains(CLEANUP_CATEGORY.MESSAGES) ) {
+ deleteMessageRoutes();
+ }
+ if (cleanupCategories.contains(CLEANUP_CATEGORY.VARIABLES)) {
+ deleteVariables();
+ }
+ if (cleanupCategories.contains(CLEANUP_CATEGORY.INSTANCE)) {
+ deleteInstance();
+ }
+
+ getEM().flush();
+ }
+ }
+
+ private void deleteInstance() {
+ if( _fault != null ) {
+ getEM().remove(_fault);
+ }
+ getEM().remove(this); // This deletes ActivityRecoveryDAO
+ }
+
+ @SuppressWarnings("unchecked")
+ private void deleteVariables() {
+ Collection xmlDataIds = getEM().createNamedQuery(XmlDataDAOImpl.SELECT_XMLDATA_IDS_BY_INSTANCE).setParameter("instance", this).getResultList();
+ batchUpdateByIds(xmlDataIds.iterator(), getEM().createNamedQuery(XmlDataProperty.DELETE_XML_DATA_PROPERTIES_BY_XML_DATA_IDS), "xmlDataIds");
+ Collection scopeIds = getEM().createNamedQuery(ScopeDAOImpl.SELECT_SCOPE_IDS_BY_INSTANCE).setParameter("instance", this).getResultList();
+ batchUpdateByIds(scopeIds.iterator(), getEM().createNamedQuery(XmlDataDAOImpl.DELETE_XMLDATA_BY_SCOPE_IDS), "scopeIds");
+
+ batchUpdateByIds(scopeIds.iterator(), getEM().createNamedQuery(PartnerLinkDAOImpl.DELETE_PARTNER_LINKS_BY_SCOPE_IDS), "scopeIds");
+ batchUpdateByIds(scopeIds.iterator(), getEM().createNamedQuery(ScopeDAOImpl.DELETE_SCOPES_BY_SCOPE_IDS), "ids");
+ }
+
+ private void deleteMessageRoutes() {
+ getEM().createNamedQuery(MessageRouteDAOImpl.DELETE_MESSAGE_ROUTES_BY_INSTANCE).setParameter ("instance", this).executeUpdate();
+ }
+
+ @SuppressWarnings("unchecked")
+ private void deleteCorrelations() {
+ Collection corrSetIds = getEM().createNamedQuery(CorrelationSetDAOImpl.SELECT_CORRELATION_SET_IDS_BY_INSTANCE).setParameter("instance", this).getResultList();
+ batchUpdateByIds(corrSetIds.iterator(), getEM().createNamedQuery(CorrSetProperty.DELETE_CORSET_PROPERTIES_BY_PROPERTY_IDS), "corrSetIds");
+ batchUpdateByIds(corrSetIds.iterator(), getEM().createNamedQuery(CorrelationSetDAOImpl.DELETE_CORRELATION_SETS_BY_IDS), "ids");
+ }
+
+ private void deleteEvents() {
+ getEM().createNamedQuery(EventDAOImpl.DELETE_EVENTS_BY_INSTANCE).setParameter ("instance", this).executeUpdate();
+ }
+
+ public void deleteActivityRecovery(String channel) {
+ ActivityRecoveryDAOImpl toRemove = null;
+ for (ActivityRecoveryDAO _recovery : _recoveries) {
+ ActivityRecoveryDAOImpl arElement = (ActivityRecoveryDAOImpl) _recovery;
+ if (arElement.getChannel().equals(channel)) {
+ toRemove = arElement;
+ break;
+ }
+ }
+ if (toRemove != null) {
+ getEM().remove(toRemove);
+ _recoveries.remove(toRemove);
+ }
+
+ }
+
+ public void finishCompletion() {
+ // make sure we have completed.
+ assert (ProcessState.isFinished(this.getState()));
+ // let our process know that we've done our work.
+ }
+
+ public long genMonotonic() {
+ return _sequence++;
+ }
+
+ public int getActivityFailureCount() {
+ if( _activityFailureCount == -1 ) {
+ _activityFailureCount = _recoveries.size();
+ }
+
+ return _activityFailureCount;
+ }
+
+ public void setActivityFailureCount(int activityFailureCount) {
+ _activityFailureCount = activityFailureCount;
+ }
+
+ public Date getActivityFailureDateTime() {
+ return _lastRecovery;
+ }
+
+ public Collection<ActivityRecoveryDAO> getActivityRecoveries() {
+ return _recoveries;
+ }
+
+ public CorrelationSetDAO getCorrelationSet(String name) {
+ // TODO: should this method be deprecated?
+
+ // Its not clear where the correlation set for the process is used
+ // or populated.
+
+ throw new UnsupportedOperationException();
+
+ //return null;
+ }
+
+ public Set<CorrelationSetDAO> getCorrelationSets() {
+ // TODO: should this method be deprecated?
+ // Its not clear where the correlation set for the process is used
+ // or populated.
+ return new HashSet<CorrelationSetDAO>();
+ }
+
+ public Date getCreateTime() {
+ return _dateCreated;
+ }
+
+ public EventsFirstLastCountTuple getEventsFirstLastCount() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public byte[] getExecutionState() {
+ return _executionState;
+ }
+
+ public FaultDAO getFault() {
+ return _fault;
+ }
+
+ public Long getInstanceId() {
+ return _instanceId;
+ }
+
+ public CorrelatorDAO getInstantiatingCorrelator() {
+ return _instantiatingCorrelator;
+ }
+
+ public Date getLastActiveTime() {
+ return _lastActive;
+ }
+
+ public short getPreviousState() {
+ return _previousState;
+ }
+
+ public ProcessDAO getProcess() {
+ return _process;
+ }
+
+ public ScopeDAO getRootScope() {
+ return _rootScope;
+ }
+
+ public ScopeDAO getScope(Long scopeInstanceId) {
+ return getEM().find(ScopeDAOImpl.class, scopeInstanceId);
+ }
+
+ public Collection<ScopeDAO> getScopes(String scopeName) {
+ Collection<ScopeDAO> ret = new ArrayList<ScopeDAO>();
+
+ for (ScopeDAO sElement : _scopes) {
+ if ( sElement.getName().equals(scopeName)) ret.add(sElement);
+ }
+ return ret;
+ }
+
+ public Collection<ScopeDAO> getScopes() {
+ return _scopes;
+ }
+
+ public short getState() {
+ return _state;
+ }
+
+ public XmlDataDAO[] getVariables(String variableName, int scopeModelId) {
+
+ //TODO: This method is not used and should be considered a deprecation candidate.
+
+ List<XmlDataDAO> results = new ArrayList<XmlDataDAO>();
+
+ for (ScopeDAO sElement : _scopes) {
+ if ( sElement.getModelId() == scopeModelId) {
+ XmlDataDAO var = sElement.getVariable(variableName);
+ if ( var != null ) results.add(var);
+ }
+ }
+ return results.toArray(new XmlDataDAO[results.size()]);
+ }
+
+ public void insertBpelEvent(ProcessInstanceEvent event) {
+ getConn().insertBpelEvent(event, getProcess(), this);
+ }
+
+ public void setExecutionState(byte[] execState) {
+ _executionState = execState;
+ }
+
+ public void setFault(FaultDAO fault) {
+ _fault = (FaultDAOImpl)fault;
+ }
+
+ public void setFault(QName faultName, String explanation, int faultLineNo,
+ int activityId, Element faultMessage) {
+ _fault = new FaultDAOImpl(faultName,explanation,faultLineNo,activityId,faultMessage);
+ }
+
+ public void setLastActiveTime(Date dt) {
+ _lastActive = dt;
+ }
+
+ public void setState(short state) {
+ _previousState = _state;
+ _state = state;
+ }
+
+ void removeRoutes(String routeGroupId) {
+ _process.removeRoutes(routeGroupId, this);
+ }
+
+ public BpelDAOConnection getConnection() {
+ return BpelDAOConnectionImpl.getThreadLocal().get();
+ }
+
+ public Collection<String> getMessageExchangeIds() {
+ Collection<String> c = new HashSet<String>();
+ for (MessageExchangeDAO m : _messageExchanges) {
+ c.add(m.getMessageExchangeId());
+ }
+ return c;
+ }
+}
Added: ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/ProcessManagementDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/ProcessManagementDAOImpl.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/ProcessManagementDAOImpl.java (added)
+++ ode/trunk/dao-jpa/src/main/java/org/apache/ode/dao/jpa/bpel/ProcessManagementDAOImpl.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.jpa.bpel;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+import javax.xml.namespace.QName;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.common.InstanceFilter;
+import org.apache.ode.dao.bpel.BpelDAOConnection;
+import org.apache.ode.dao.bpel.ProcessInstanceDAO;
+import org.apache.ode.dao.bpel.ProcessManagementDAO;
+
+public class ProcessManagementDAOImpl implements ProcessManagementDAO {
+ private static final Log __log = LogFactory.getLog(ProcessManagementDAOImpl.class);
+
+ private EntityManager em;
+
+ public ProcessManagementDAOImpl(EntityManager em) {
+ this.em = em;
+ }
+
+ public Object[] findFailedCountAndLastFailedDateForProcessId(BpelDAOConnection conn, String status, String processId) {
+ Query query = em.createNamedQuery(ProcessInstanceDAOImpl.COUNT_FAILED_INSTANCES_BY_STATUS_AND_PROCESS_ID);
+ query.setParameter("states", new InstanceFilter("status=" + status).convertFilterState());
+ query.setParameter("processId", processId);
+
+ return (Object[])query.getSingleResult();
+ }
+
+ public void prefetchActivityFailureCounts(Collection<ProcessInstanceDAO> instances) {
+ if(__log.isTraceEnabled()) __log.trace("Prefetching activity failure counts for " + instances.size() + " instances.");
+
+ if( instances.isEmpty() ) return;
+
+ Query query = em.createNamedQuery(ActivityRecoveryDAOImpl.COUNT_ACTIVITY_RECOVERIES_BY_INSTANCES);
+ query.setParameter("instances", instances);
+
+ Map<Long, Long> countsByInstanceId = new HashMap<Long, Long>();
+ for( Object instanceIdAndCount : query.getResultList() ) {
+ Object instanceId = ((Object[])instanceIdAndCount)[0];
+ Object count = ((Object[])instanceIdAndCount)[0];
+ countsByInstanceId.put((Long)instanceId, (Long)count);
+ }
+
+ for( ProcessInstanceDAO instance : instances ) {
+ Long count = countsByInstanceId.get(instance.getInstanceId());
+ if( count != null ) {
+ ((ProcessInstanceDAOImpl)instance).setActivityFailureCount(count.intValue());
+ }
+ }
+ }
+
+ public int countInstancesByPidAndString(BpelDAOConnection conn, QName pid, String status) {
+ InstanceFilter instanceFilter = new InstanceFilter("status=" + status + " pid="+ pid);
+
+ // TODO: this is grossly inefficient
+ return conn.instanceQuery(instanceFilter).size();
+ }
+
+ public Map<InstanceSummaryKey, Long> countInstancesSummary(Set<String> pids) {
+ return new HashMap<InstanceSummaryKey, Long>();
+ }
+
+ public Map<String, FailedSummaryValue> findFailedCountAndLastFailedDateForProcessIds(Set<String> pids) {
+ return new HashMap<String, FailedSummaryValue>();
+ }
+}