You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by je...@apache.org on 2010/08/18 06:12:59 UTC
svn commit: r986561 [6/13] - in /ode/trunk: ./ axis2-war/
axis2-war/src/main/assembly/ axis2-war/src/test/java/org/apache/ode/axis2/
axis2-war/src/test/java/org/apache/ode/axis2/instancecleanup/
axis2-war/src/test/java/org/apache/ode/bpel/dao/ axis2-wa...
Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/NativeHiLoGenerator.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/NativeHiLoGenerator.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/NativeHiLoGenerator.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/NativeHiLoGenerator.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.hib;
+
+import java.io.Serializable;
+import java.util.Properties;
+
+import org.hibernate.HibernateException;
+import org.hibernate.MappingException;
+import org.hibernate.dialect.Dialect;
+import org.hibernate.engine.SessionImplementor;
+import org.hibernate.id.Configurable;
+import org.hibernate.id.IdentifierGenerator;
+import org.hibernate.id.PersistentIdentifierGenerator;
+import org.hibernate.id.SequenceHiLoGenerator;
+import org.hibernate.id.TableHiLoGenerator;
+import org.hibernate.type.Type;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class NativeHiLoGenerator implements IdentifierGenerator, PersistentIdentifierGenerator, Configurable {
+ private static final Log __log = LogFactory.getLog(NativeHiLoGenerator.class);
+ private IdentifierGenerator _proxy;
+
+ public NativeHiLoGenerator() {
+ super();
+ }
+
+ public Serializable generate(SessionImplementor session, Object object) throws HibernateException {
+ return _proxy.generate(session, object);
+ }
+
+ public Object generatorKey() {
+ if (_proxy instanceof PersistentIdentifierGenerator)
+ return ((PersistentIdentifierGenerator) _proxy).generatorKey();
+ else
+ return this;
+ }
+
+ public String[] sqlCreateStrings(Dialect dialect) throws HibernateException {
+ if (_proxy instanceof PersistentIdentifierGenerator)
+ return ((PersistentIdentifierGenerator) _proxy).sqlCreateStrings(dialect);
+ else
+ return new String[] {};
+ }
+
+ public String[] sqlDropStrings(Dialect dialect) throws HibernateException {
+ if (_proxy instanceof PersistentIdentifierGenerator)
+ return ((PersistentIdentifierGenerator) _proxy).sqlDropStrings(dialect);
+ else
+ return null;
+ }
+
+ public void configure(Type type, Properties params, Dialect dialect) throws MappingException {
+ Class generatorClass = null;
+ if (dialect.supportsSequences()) {
+ __log.debug("Using SequenceHiLoGenerator");
+ generatorClass = SequenceHiLoGenerator.class;
+ } else {
+ generatorClass = TableHiLoGenerator.class;
+ __log.debug("Using native dialect generator " + generatorClass);
+ }
+
+ IdentifierGenerator g = null;
+ try {
+ g = (IdentifierGenerator) generatorClass.newInstance();
+ } catch (Exception e) {
+ throw new MappingException("", e);
+ }
+
+ if (g instanceof Configurable)
+ ((Configurable) g).configure(type, params, dialect);
+
+ this._proxy = g;
+ }
+}
Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/SessionManager.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/SessionManager.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/SessionManager.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/SessionManager.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.dao.hib;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.dao.hib.bpel.hobj.*;
+import org.apache.ode.dao.hib.store.hobj.DeploymentUnitDaoImpl;
+import org.apache.ode.dao.hib.store.hobj.ProcessConfDaoImpl;
+import org.apache.ode.dao.hib.store.hobj.VersionTrackerDAOImpl;
+import org.apache.ode.utils.uuid.UUID;
+import org.hibernate.HibernateException;
+import org.hibernate.MappingException;
+import org.hibernate.Session;
+import org.hibernate.SessionFactory;
+import org.hibernate.cfg.Configuration;
+import org.hibernate.cfg.Environment;
+
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Manages hibernate sessions, and their association with
+ * a transaction thread. Uses a ThreadLocal strategy for
+ * managing sessions.
+ */
+public class SessionManager {
+ private static final Log __log = LogFactory.getLog(SessionManager.class);
+
+ public static final String PROP_GUID = "ode.hibernate.guid";
+
+ private static final Map<String, TransactionManager> _txManagers =
+ Collections.synchronizedMap(new HashMap<String, TransactionManager>());
+ private static final Map<String, DataSource> _dataSources =
+ Collections.synchronizedMap(new HashMap<String,DataSource>());
+
+ private static final String[] CANNOT_JOIN_FOR_UPDATE_DIALECTS =
+ {"org.hibernate.dialect.IngresDialect"};
+
+ private final String _uuid = new UUID().toString();
+ private final TransactionManager _txManager;
+ private final SessionFactory _sessionFactory;
+ private boolean _canJoinForUpdate = true;
+
+ private final TxContext _ctx;
+
+ public SessionManager(Properties env, DataSource ds, TransactionManager tx) throws HibernateException {
+ this(getDefaultConfiguration(), env, ds, tx);
+ }
+
+ /** Inaccessible constructor. */
+ public SessionManager(Configuration conf, Properties env, DataSource ds, TransactionManager tx) throws HibernateException {
+
+ if (tx!=null){
+ _ctx = new HibernateJtaTxContext();
+ }else{
+ _ctx = new HibernateNonTxContext();
+ }
+
+ _txManager = tx;
+ _txManagers.put(_uuid,tx);
+ _dataSources.put(_uuid,ds);
+
+ _sessionFactory = conf.setProperties(env).setProperty(PROP_GUID, _uuid).buildSessionFactory();
+
+ /*
+ Some Hibernate dialects (like IngresDialect) do not support update for join.
+ We need to distinguish them and explicitly define subqueries, otherwise Hibernate
+ implicitly generates joins which causes problems during update for such DBMS.
+ See org.apache.ode.dao.hib.bpel.CorrelatorDaoImpl for instance.
+ */
+ String currentHibDialect = env.getProperty(Environment.DIALECT);
+ for (String dialect : CANNOT_JOIN_FOR_UPDATE_DIALECTS) {
+ if (dialect.equals(currentHibDialect)) {
+ _canJoinForUpdate = false;
+ }
+ }
+ }
+
+ TransactionManager getTransactionManager() {
+ return _txManager;
+ }
+
+ public static void registerTransactionManager(String uuid, TransactionManager txm) {
+ _txManagers.put(uuid, txm);
+ }
+
+ /**
+ * Get the current Hibernate Session.
+ */
+ public Session getSession() {
+ return _sessionFactory.getCurrentSession();
+ }
+
+ /**
+ * Returns flag which shows whether " where .. join ... for update" kind of queries can be used (supported
+ * by currently effective {@link org.hibernate.dialect.Dialect}. If it's {@code false} than sub-query fallback
+ * should be invoked instead.
+ *
+ * @return currently returns false only for {@link org.hibernate.dialect.IngresDialect}
+ */
+ public boolean canJoinForUpdate() {
+ return _canJoinForUpdate;
+ }
+
+
+ /**
+ * Returns a hibernate configuration with hibernate DAO objects added as resources.
+ * @return
+ * @throws MappingException
+ */
+ public static Configuration getDefaultConfiguration() throws MappingException {
+ return new Configuration()
+ .addClass(HProcess.class)
+ .addClass(HProcessInstance.class)
+ .addClass(HCorrelator.class)
+ .addClass(HCorrelatorMessage.class)
+ .addClass(HCorrelationProperty.class)
+ .addClass(HCorrelatorSelector.class)
+ .addClass(HMessageExchange.class)
+ .addClass(HMessage.class)
+ .addClass(HPartnerLink.class)
+ .addClass(HScope.class)
+ .addClass(HCorrelationSet.class)
+ .addClass(HXmlData.class)
+ .addClass(HVariableProperty.class)
+ .addClass(HBpelEvent.class)
+ .addClass(HFaultData.class)
+ .addClass(HActivityRecovery.class)
+ .addClass(HMessageExchangeProperty.class)//store classes
+ .addClass(ProcessConfDaoImpl.class)
+ .addClass(DeploymentUnitDaoImpl.class)
+ .addClass(VersionTrackerDAOImpl.class);
+ }
+
+ public static TransactionManager getTransactionManager(Properties props) {
+ String guid = props.getProperty(PROP_GUID);
+ return _txManagers.get(guid);
+ }
+
+ public static Connection getConnection(Properties props) throws SQLException {
+ String guid = props.getProperty(PROP_GUID);
+ return _dataSources.get(guid).getConnection();
+ }
+
+ public boolean isClosed() {
+ return _sessionFactory.isClosed();
+ }
+
+ public void shutdown() {
+ _sessionFactory.close();
+ }
+
+
+ public void begin(){
+ _ctx.begin();
+ }
+
+ public void commit(){
+ _ctx.commit();
+ }
+
+ public void rollback(){
+ _ctx.rollback();
+ }
+
+ public interface TxContext {
+
+ public void begin();
+
+ public void commit();
+
+ public void rollback();
+ }
+
+ public class HibernateNonTxContext implements TxContext {
+
+ public void begin() {
+ getSession().beginTransaction();
+ }
+
+ public void commit() {
+ getSession().getTransaction().commit();
+ }
+
+ public void rollback() {
+ getSession().getTransaction().rollback();
+ }
+ }
+
+ public class HibernateJtaTxContext implements TxContext {
+
+ public void begin() {
+ }
+
+ public void commit() {
+ }
+
+ public void rollback() {
+ }
+ }
+}
Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/ActivityRecoveryDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/ActivityRecoveryDaoImpl.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/ActivityRecoveryDaoImpl.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/ActivityRecoveryDaoImpl.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.hib.bpel;
+
+import java.io.IOException;
+import java.util.Date;
+
+import org.apache.ode.dao.bpel.ActivityRecoveryDAO;
+import org.apache.ode.dao.hib.SessionManager;
+import org.apache.ode.dao.hib.bpel.hobj.HActivityRecovery;
+import org.apache.ode.utils.DOMUtils;
+import org.w3c.dom.Element;
+import org.xml.sax.SAXException;
+
+/**
+ * Hibernate based {@link ActivityRecoveryDao} implementation
+ */
+public class ActivityRecoveryDaoImpl extends HibernateDao implements ActivityRecoveryDAO {
+ HActivityRecovery _self;
+
+ public ActivityRecoveryDaoImpl(SessionManager sm, HActivityRecovery recovery) {
+ super(sm, recovery);
+ entering("ActivityRecoveryDaoImpl.ActivityRecoveryDaoImpl");
+ _self = recovery;
+ }
+
+ public long getActivityId() {
+ return _self.getActivityId();
+ }
+
+ public String getChannel() {
+ return _self.getChannel();
+ }
+
+ public String getReason() {
+ return _self.getReason();
+ }
+
+ public Date getDateTime() {
+ return _self.getDateTime();
+ }
+
+ public Element getDetails() {
+ entering("ActivityRecoveryDaoImpl.getDetails");
+ if (_self.getDetails() == null) {
+ return null;
+ }
+ try {
+ return DOMUtils.stringToDOM(_self.getDetails());
+ } catch (SAXException e) {
+ throw new RuntimeException(e);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String getActions() {
+ return _self.getActions();
+ }
+
+ public String[] getActionsList() {
+ return _self.getActions().split(" ");
+ }
+
+ public int getRetries() {
+ return _self.getRetries();
+ }
+}
Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/BpelDAOConnectionFactoryImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/BpelDAOConnectionFactoryImpl.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/BpelDAOConnectionFactoryImpl.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/BpelDAOConnectionFactoryImpl.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.dao.hib.bpel;
+
+import java.sql.Connection;
+import java.util.Enumeration;
+import java.util.Properties;
+
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.dao.bpel.BpelDAOConnection;
+import org.apache.ode.dao.bpel.BpelDAOConnectionFactory;
+import org.apache.ode.dao.hib.DataSourceConnectionProvider;
+import org.apache.ode.dao.hib.HibernateTransactionManagerLookup;
+import org.apache.ode.dao.hib.SessionManager;
+import org.apache.ode.il.config.OdeConfigProperties;
+import org.hibernate.HibernateException;
+import org.hibernate.cfg.Configuration;
+import org.hibernate.cfg.Environment;
+import org.hibernate.dialect.Dialect;
+import org.hibernate.dialect.resolver.DialectFactory;
+
+/**
+ * Hibernate-based {@link org.apache.ode.dao.bpel.BpelDAOConnectionFactory}
+ * implementation.
+ */
+public class BpelDAOConnectionFactoryImpl implements BpelDAOConnectionFactory {
+ private static final Log __log = LogFactory.getLog(BpelDAOConnectionFactoryImpl.class);
+
+ protected SessionManager _sessionManager;
+
+ protected DataSource _ds;
+
+ protected TransactionManager _txm;
+
+ /**
+ * Constructor.
+ */
+ public BpelDAOConnectionFactoryImpl() {
+ }
+
+ public BpelDAOConnection getConnection() {
+ try {
+ return new BpelDAOConnectionImpl(_sessionManager);
+ } catch (HibernateException e) {
+ __log.error("DbError", e);
+ throw e;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void init(Properties initialProps, TransactionManager mgr, Object env) {
+ _txm=mgr;
+ _ds=(DataSource)env;
+ if (_txm == null){
+ __log.error("Hibernate BpelDAOConnectionFactoryImpl requires a JTA Transaction Manager to be set.");
+ }
+ _sessionManager = setupSessionManager(initialProps, _txm, _ds);
+
+
+ }
+
+ public static SessionManager setupSessionManager(Properties initialProps, TransactionManager mgr, DataSource ds){
+ return setupSessionManager(SessionManager.getDefaultConfiguration(), initialProps, mgr, ds);
+ }
+
+ public static SessionManager setupSessionManager(Configuration conf, Properties initialConfig, TransactionManager mgr, DataSource ds){
+ // Don't want to pollute original properties
+ Properties properties = new Properties();
+ if (initialConfig != null) {
+ for (Object prop : initialConfig.keySet()) {
+ properties.put(prop, initialConfig.get(prop));
+ }
+ }
+
+ // Note that we don't allow the following properties to be overriden by
+ // the client.
+ /*
+ if (properties.containsKey(Environment.CONNECTION_PROVIDER))
+ __log.warn("Ignoring user-specified Hibernate property: " + Environment.CONNECTION_PROVIDER);
+ if (properties.containsKey(Environment.TRANSACTION_MANAGER_STRATEGY))
+ __log.warn("Ignoring user-specified Hibernate property: " + Environment.TRANSACTION_MANAGER_STRATEGY);
+ if (properties.containsKey(Environment.SESSION_FACTORY_NAME))
+ __log.warn("Ignoring user-specified Hibernate property: " + Environment.SESSION_FACTORY_NAME);
+ */
+ if (ds!=null){
+ properties.put(Environment.CONNECTION_PROVIDER, DataSourceConnectionProvider.class.getName());
+ // Guess Hibernate dialect if not specified in hibernate.properties
+ if (properties.get(Environment.DIALECT) == null) {
+ try {
+ properties.put(Environment.DIALECT, guessDialect(ds));
+ } catch (Exception ex) {
+ String errmsg = "Unable to detect Hibernate dialect!";
+
+ if (__log.isDebugEnabled())
+ __log.debug(errmsg, ex);
+
+ __log.error(errmsg);
+ }
+ }
+ }
+
+ if (mgr!=null){
+ properties.put(Environment.TRANSACTION_MANAGER_STRATEGY, HibernateTransactionManagerLookup.class.getName());
+ /*
+ * Since Hibernate 3.2.6, Hibernate JTATransaction requires User Transaction bound on JNDI. Let's work around
+ * by implementing Hibernate JTATransactionFactory that hooks up to the JTATransactionManager(ODE uses geronimo
+ * by default).
+ */
+ //properties.put(Environment.TRANSACTION_STRATEGY, "org.hibernate.transaction.JTATransactionFactory");
+ properties.put(Environment.TRANSACTION_STRATEGY, "org.apache.ode.dao.hib.JotmTransactionFactory");
+ properties.put(Environment.CURRENT_SESSION_CONTEXT_CLASS, "jta");
+ }else{
+ properties.put(Environment.TRANSACTION_STRATEGY,"org.hibernate.transaction.JDBCTransactionFactory");
+ properties.put(Environment.CURRENT_SESSION_CONTEXT_CLASS,"thread");
+ }
+
+ // Isolation levels override
+ if (System.getProperty("ode.connection.isolation") != null) {
+ String level = System.getProperty("ode.connection.isolation", "2");
+ properties.put(Environment.ISOLATION, level);
+ }
+
+ if (Boolean.valueOf(initialConfig.getProperty(OdeConfigProperties.PROP_DB_EMBEDDED_CREATE, "true"))) {
+ properties.put(Environment.HBM2DDL_AUTO, "create-drop");
+ }
+
+ if (__log.isDebugEnabled()) {
+ Enumeration names = properties.propertyNames();
+ __log.debug("Properties passed to Hibernate:");
+ while (names.hasMoreElements()) {
+ String name = (String) names.nextElement();
+ __log.debug(name + "=" + properties.getProperty(name));
+ }
+ }
+
+ return new SessionManager(conf, properties, ds,mgr);
+
+ }
+
+ protected SessionManager createSessionManager(Properties properties, DataSource ds, TransactionManager tm) {
+ return new SessionManager(properties, ds, tm);
+ }
+
+ private static final String DEFAULT_HIBERNATE_DIALECT = "org.hibernate.dialect.DerbyDialect";
+
+ public void shutdown() {
+ // Not too much to do for hibernate.
+ }
+
+ public static String guessDialect(DataSource dataSource) throws Exception {
+ String dialect = null;
+ // Open a connection and use that connection to figure out database
+ // product name/version number in order to decide which Hibernate
+ // dialect to use.
+ Connection conn = dataSource.getConnection();
+ try {
+ Dialect d = DialectFactory.buildDialect(new Properties(), conn);
+ dialect=d.getClass().getName();
+ } finally {
+ conn.close();
+ }
+
+ if (dialect == null) {
+ __log.info("Cannot determine hibernate dialect for this database: using the default one.");
+ dialect = DEFAULT_HIBERNATE_DIALECT;
+ }
+
+ return dialect;
+
+ }
+
+ public void setDataSource(DataSource ds) {
+ _ds = ds;
+ }
+
+ public DataSource getDataSource() {
+ return _ds;
+ }
+
+ public void setTransactionManager(Object tm) {
+ _txm = (TransactionManager) tm;
+ }
+
+ public void setUnmanagedDataSource(DataSource ds) {
+ // Hibernate doesn't use this.
+ }
+
+}
Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/BpelDAOConnectionImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/BpelDAOConnectionImpl.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/BpelDAOConnectionImpl.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/BpelDAOConnectionImpl.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.dao.hib.bpel;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.xml.namespace.QName;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.common.BpelEventFilter;
+import org.apache.ode.bpel.common.InstanceFilter;
+import org.apache.ode.bpel.common.ProcessState;
+import org.apache.ode.bpel.evt.BpelEvent;
+import org.apache.ode.bpel.evt.ScopeEvent;
+import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
+import org.apache.ode.dao.bpel.BpelDAOConnection;
+import org.apache.ode.dao.bpel.CorrelationSetDAO;
+import org.apache.ode.dao.bpel.FilteredInstanceDeletable;
+import org.apache.ode.dao.bpel.MessageExchangeDAO;
+import org.apache.ode.dao.bpel.ProcessDAO;
+import org.apache.ode.dao.bpel.ProcessInstanceDAO;
+import org.apache.ode.dao.bpel.ProcessManagementDAO;
+import org.apache.ode.dao.bpel.ScopeDAO;
+import org.apache.ode.dao.hib.SessionManager;
+import org.apache.ode.dao.hib.bpel.hobj.HBpelEvent;
+import org.apache.ode.dao.hib.bpel.hobj.HCorrelationSet;
+import org.apache.ode.dao.hib.bpel.hobj.HMessageExchange;
+import org.apache.ode.dao.hib.bpel.hobj.HProcess;
+import org.apache.ode.dao.hib.bpel.hobj.HProcessInstance;
+import org.apache.ode.dao.hib.bpel.hobj.HScope;
+import org.apache.ode.dao.hib.bpel.ql.HibernateInstancesQueryCompiler;
+import org.apache.ode.ql.eval.skel.CommandEvaluator;
+import org.apache.ode.ql.tree.Builder;
+import org.apache.ode.ql.tree.BuilderFactory;
+import org.apache.ode.ql.tree.nodes.Query;
+import org.apache.ode.utils.SerializableUtils;
+import org.apache.ode.utils.stl.CollectionsX;
+import org.apache.ode.utils.stl.UnaryFunctionEx;
+import org.hibernate.Criteria;
+import org.hibernate.FetchMode;
+import org.hibernate.HibernateException;
+import org.hibernate.Session;
+import org.hibernate.criterion.Expression;
+import org.hibernate.criterion.Projections;
+
+/**
+ * Hibernate-based {@link BpelDAOConnection} implementation.
+ */
+public class BpelDAOConnectionImpl implements BpelDAOConnection, FilteredInstanceDeletable {
+ private static final Log __log = LogFactory.getLog(BpelDAOConnectionImpl.class);
+
+ public SessionManager _sm;
+
+ public BpelDAOConnectionImpl(SessionManager sm) {
+ _sm = sm;
+ }
+
+ protected Session getSession(){
+ return _sm.getSession();
+ }
+
+ public MessageExchangeDAO createMessageExchange(char dir) {
+ HMessageExchange mex = new HMessageExchange();
+ mex.setDirection(dir);
+ mex.setInsertTime(new Date(System.currentTimeMillis()));
+ getSession().save(mex);
+ return new MessageExchangeDaoImpl(_sm, mex);
+ }
+
+ public MessageExchangeDAO getMessageExchange(String mexid) {
+ HMessageExchange mex = (HMessageExchange) getSession().get(HMessageExchange.class, new Long(mexid));
+ return mex == null ? null : new MessageExchangeDaoImpl(_sm, mex);
+ }
+
+ public ProcessDAO createProcess(QName pid, QName type, String guid, long version) {
+ HProcess process = new HProcess();
+ process.setProcessId(pid.toString());
+ process.setTypeName(type.getLocalPart());
+ process.setTypeNamespace(type.getNamespaceURI());
+ process.setDeployDate(new Date());
+ process.setGuid(guid);
+ process.setVersion(version);
+ getSession().save(process);
+ return new ProcessDaoImpl(_sm, process);
+ }
+
+ public ProcessDAO createTransientProcess(Long id) {
+ HProcess process = new HProcess();
+ process.setId(id);
+
+ return new ProcessDaoImpl(_sm, process);
+ }
+
+ public ProcessDAO getProcess(QName processId) {
+ try {
+ Criteria criteria = getSession().createCriteria(HProcess.class);
+ criteria.add(Expression.eq("processId", processId.toString()));
+ // For the moment we are expecting only one result.
+ HProcess hprocess = (HProcess) criteria.uniqueResult();
+ return hprocess == null ? null : new ProcessDaoImpl(_sm, hprocess);
+ } catch (HibernateException e) {
+ __log.error("DbError", e);
+ throw e;
+ }
+
+ }
+
+ public void close() {
+ }
+
+ /**
+ * @see org.apache.ode.dao.bpel.ProcessDAO#getInstance(java.lang.Long)
+ */
+ public ProcessInstanceDAO getInstance(Long instanceId) {
+ return _getInstance(_sm, getSession(), instanceId);
+ }
+
+ public int getNumInstances(QName processId) {
+ ProcessDAO process = getProcess(processId);
+ if (process != null)
+ return process.getNumInstances();
+ else return -1;
+ }
+
+ public ScopeDAO getScope(Long siidl) {
+ return _getScope(_sm, getSession(), siidl);
+ }
+
+ public Collection<ProcessInstanceDAO> instanceQuery(InstanceFilter criteria) {
+ if (criteria.getLimit() == 0) {
+ return Collections.emptyList();
+ }
+ List<ProcessInstanceDAO> daos = new ArrayList<ProcessInstanceDAO>();
+
+ Iterator<HProcessInstance> iter = _instanceQuery(getSession(), false, criteria);
+ while (iter.hasNext()) {
+ daos.add(new ProcessInstanceDaoImpl(_sm, iter.next()));
+ }
+
+ return daos;
+ }
+
+ public int deleteInstances(InstanceFilter criteria, Set<CLEANUP_CATEGORY> categories) {
+ if (criteria.getLimit() == 0) {
+ return 0;
+ }
+
+ List<HProcessInstance> instances = _instanceQueryForList(getSession(), false, criteria);
+ if( __log.isDebugEnabled() ) __log.debug("Collected " + instances.size() + " instances to delete.");
+
+ if( !instances.isEmpty() ) {
+ ProcessDaoImpl process = (ProcessDaoImpl)createTransientProcess(instances.get(0).getProcessId());
+ return process.deleteInstances(instances, categories);
+ }
+
+ return 0;
+ }
+
+ static Iterator<HProcessInstance> _instanceQuery(Session session, boolean countOnly, InstanceFilter filter) {
+ return _instanceQueryForList(session, countOnly, filter).iterator();
+ }
+
+ @SuppressWarnings("unchecked")
+ private static List<HProcessInstance> _instanceQueryForList(Session session, boolean countOnly, InstanceFilter filter) {
+ CriteriaBuilder cb = new CriteriaBuilder();
+
+ return cb.buildHQLQuery(session, filter).list();
+ }
+
+ static ProcessInstanceDAO _getInstance(SessionManager sm, Session session, Long iid) {
+ HProcessInstance instance = (HProcessInstance) session.get(HProcessInstance.class, iid);
+ return instance != null ? new ProcessInstanceDaoImpl(sm, instance) : null;
+ }
+
+ static ScopeDAO _getScope(SessionManager sm, Session session, Long siid) {
+ HScope scope = (HScope) session.get(HScope.class, siid);
+ return scope != null ? new ScopeDaoImpl(sm, scope) : null;
+ }
+
+ public void insertBpelEvent(BpelEvent event, ProcessDAO process, ProcessInstanceDAO instance) {
+ _insertBpelEvent(_sm.getSession(), event, process, instance);
+ }
+
+ /**
+ * Helper method for inserting bpel events into the database.
+ *
+ * @param sess
+ * @param event
+ * @param process
+ * @param instance
+ */
+ static void _insertBpelEvent(Session sess, BpelEvent event, ProcessDAO process, ProcessInstanceDAO instance) {
+ HBpelEvent hevent = new HBpelEvent();
+ hevent.setTstamp(new Timestamp(System.currentTimeMillis()));
+ hevent.setType(BpelEvent.eventName(event));
+ hevent.setDetail(event.toString());
+ if (process != null)
+ hevent.setProcess((HProcess) ((ProcessDaoImpl) process).getHibernateObj());
+ if (instance != null)
+ hevent.setInstance((HProcessInstance) ((ProcessInstanceDaoImpl) instance).getHibernateObj());
+ if (event instanceof ScopeEvent)
+ hevent.setScopeId(((ScopeEvent) event).getScopeId());
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos);
+ oos.writeObject(event);
+ oos.flush();
+ hevent.setData(bos.toByteArray());
+ } catch (Throwable ex) {
+ // this is really unexpected.
+ __log.fatal("InternalError: BpelEvent serialization failed.", ex);
+ }
+ sess.save(hevent);
+ }
+
+ @SuppressWarnings( { "unchecked", "deprecation" })
+ public List<Date> bpelEventTimelineQuery(InstanceFilter ifilter, BpelEventFilter efilter) {
+ CriteriaBuilder cb = new CriteriaBuilder();
+ Criteria crit = getSession().createCriteria(HBpelEvent.class);
+ if (ifilter != null)
+ cb.buildCriteria(crit, efilter);
+ if (ifilter != null)
+ cb.buildCriteria(crit.createCriteria("instance"), ifilter);
+ crit.setFetchMode("tstamp", FetchMode.EAGER);
+ crit.setProjection(Projections.property("tstamp"));
+ return crit.list();
+ }
+
+ @SuppressWarnings("unchecked")
+ public List<BpelEvent> bpelEventQuery(InstanceFilter ifilter, BpelEventFilter efilter) {
+ CriteriaBuilder cb = new CriteriaBuilder();
+ Criteria crit = getSession().createCriteria(HBpelEvent.class);
+ if (efilter != null)
+ cb.buildCriteria(crit, efilter);
+ if (ifilter != null)
+ cb.buildCriteria(crit.createCriteria("instance"), ifilter);
+ List<HBpelEvent> hevents = crit.list();
+ List<BpelEvent> ret = new ArrayList<BpelEvent>(hevents.size());
+ try {
+ CollectionsX.transformEx(ret, hevents, new UnaryFunctionEx<HBpelEvent, BpelEvent>() {
+ public BpelEvent apply(HBpelEvent x) throws Exception {
+ return (BpelEvent) SerializableUtils.toObject(x.getData(), BpelEvent.class
+ .getClassLoader());
+ }
+
+ });
+ } catch (Exception ex) {
+ __log.fatal("Internal error: unable to transform HBpelEvent", ex);
+ throw new RuntimeException(ex);
+ }
+ return ret;
+ }
+
+ /**
+ * @see org.apache.ode.dao.bpel.BpelDAOConnection#instanceQuery(String)
+ */
+ @SuppressWarnings("unchecked")
+ public Collection<ProcessInstanceDAO> instanceQuery(String expression) {
+ Builder<String> builder = BuilderFactory.getInstance().createBuilder();
+ final org.apache.ode.ql.tree.nodes.Node rootNode = builder.build(expression);
+
+ HibernateInstancesQueryCompiler compiler = new HibernateInstancesQueryCompiler();
+
+ CommandEvaluator<List, Session> eval = compiler.compile((Query) rootNode);
+ List<HProcessInstance> instancesList = (List<HProcessInstance>) eval.evaluate(getSession());
+
+ Collection<ProcessInstanceDAO> result = new ArrayList<ProcessInstanceDAO>(instancesList.size());
+ for (HProcessInstance instance : instancesList) {
+ result.add(getInstance(instance.getId()));
+ }
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Map<Long, Collection<CorrelationSetDAO>> getCorrelationSets(Collection<ProcessInstanceDAO> instances) {
+ if (instances.size() == 0) {
+ return new HashMap<Long, Collection<CorrelationSetDAO>>();
+ }
+ ArrayList<Long> iids = new ArrayList<Long>(instances.size());
+ int i=0;
+ for (ProcessInstanceDAO dao: instances) {
+ iids.add(dao.getInstanceId());
+ i++;
+ }
+ Collection<HCorrelationSet> csets = new ArrayList<HCorrelationSet>();
+ // some databases don't like long lists of values with IN operator
+ // so we select in batches. Oracle 9i, for instance, doesn't support
+ // more than 1000 -- we opt to be conservative.
+ final int batchSize = 100;
+ int index = 0;
+ while (index < iids.size()) {
+ List<Long> subList = iids.subList(index, Math.min(index+batchSize, iids.size()));
+ csets.addAll(getSession().getNamedQuery(HCorrelationSet.SELECT_CORSETS_BY_INSTANCES).setParameterList("instances", subList).list());
+ index += batchSize;
+ }
+ Map<Long, Collection<CorrelationSetDAO>> map = new HashMap<Long, Collection<CorrelationSetDAO>>();
+ for (HCorrelationSet cset: csets) {
+ Long id = cset.getInstance().getId();
+ Collection<CorrelationSetDAO> existing = map.get(id);
+ if (existing == null) {
+ existing = new ArrayList<CorrelationSetDAO>();
+ map.put(id, existing);
+ }
+ existing.add(new CorrelationSetDaoImpl(_sm, cset));
+ }
+ return map;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Collection<CorrelationSetDAO> getActiveCorrelationSets() {
+ ArrayList<CorrelationSetDAO> csetDaos = new ArrayList<CorrelationSetDAO>();
+ Collection<HCorrelationSet> csets = getSession().getNamedQuery(HCorrelationSet.SELECT_CORSETS_BY_PROCESS_STATES).setParameter("states", ProcessState.STATE_ACTIVE).list();
+ for (HCorrelationSet cset : csets)
+ csetDaos.add(new CorrelationSetDaoImpl(_sm, cset));
+ return csetDaos;
+ }
+
+
+ public ProcessManagementDAO getProcessManagement() {
+ return new ProcessManagementDaoImpl(_sm);
+ }
+
+ public boolean isClosed() {
+ return _sm.isClosed();
+ }
+}
Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelationSetDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelationSetDaoImpl.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelationSetDaoImpl.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelationSetDaoImpl.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.dao.hib.bpel;
+
+import org.apache.ode.bpel.common.CorrelationKey;
+import org.apache.ode.dao.bpel.CorrelationSetDAO;
+import org.apache.ode.dao.bpel.ProcessDAO;
+import org.apache.ode.dao.bpel.ProcessInstanceDAO;
+import org.apache.ode.dao.bpel.ScopeDAO;
+import org.apache.ode.dao.hib.SessionManager;
+import org.apache.ode.dao.hib.bpel.hobj.HCorrelationProperty;
+import org.apache.ode.dao.hib.bpel.hobj.HCorrelationSet;
+
+import javax.xml.namespace.QName;
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ * Hibernate-based {@link CorrelationSetDAO} implementation.
+ */
+class CorrelationSetDaoImpl extends HibernateDao
+ implements CorrelationSetDAO {
+
+ private HCorrelationSet _correlationSet;
+
+ public CorrelationSetDaoImpl(SessionManager sessionManager, HCorrelationSet correlationSet) {
+ super(sessionManager, correlationSet);
+ entering("CorrelationSetDaoImpl.CorrelationSetDaoImpl");
+ _correlationSet = correlationSet;
+ }
+
+ public Long getCorrelationSetId() {
+ return _correlationSet.getId();
+ }
+
+ public String getName() {
+ return _correlationSet.getName();
+ }
+
+ public ScopeDAO getScope() {
+ entering("CorrelationSetDaoImpl.getScope");
+ return new ScopeDaoImpl(_sm, _correlationSet.getScope());
+ }
+
+ public void setValue(QName[] names, CorrelationKey values) {
+ entering("CorrelationSetDaoImpl.setValue");
+ _correlationSet.setValue(values.toCanonicalString());
+ if (names != null) {
+ if (_correlationSet.getProperties() == null || _correlationSet.getProperties().size() == 0) {
+ for (int m = 0; m < names.length; m++) {
+ HCorrelationProperty prop =
+ new HCorrelationProperty(names[m], values.getValues()[m], _correlationSet);
+ getSession().save(prop);
+ }
+ } else {
+ for (int m = 0; m < names.length; m++) {
+ HCorrelationProperty prop = getProperty(names[m]);
+ if (prop == null) prop = new HCorrelationProperty(names[m], values.getValues()[m], _correlationSet);
+ else prop.setValue(values.getValues()[m]);
+ getSession().save(prop);
+ }
+ }
+ }
+ getSession().update(_correlationSet);
+ }
+
+ public CorrelationKey getValue() {
+ entering("CorrelationSetDaoImpl.getValue");
+ if (_correlationSet.getValue() != null) return new CorrelationKey(_correlationSet.getValue());
+ else return null;
+ }
+
+ public Map<QName, String> getProperties() {
+ entering("CorrelationSetDaoImpl.getProperties");
+ HashMap<QName, String> result = new HashMap<QName, String>();
+ for (HCorrelationProperty property : _correlationSet.getProperties()) {
+ result.put(property.getQName(), property.getValue());
+ }
+ return result;
+ }
+
+ public ProcessDAO getProcess() {
+ return new ProcessDaoImpl(_sm, _correlationSet.getProcess());
+ }
+
+ public ProcessInstanceDAO getInstance() {
+ return new ProcessInstanceDaoImpl(_sm, _correlationSet.getInstance());
+ }
+
+ private HCorrelationProperty getProperty(QName qName) {
+ entering("CorrelationSetDaoImpl.getProperty");
+ for (HCorrelationProperty property : _correlationSet.getProperties()) {
+ if (qName.getLocalPart().equals(property.getName())
+ && qName.getNamespaceURI().equals(property.getNamespace()))
+ return property;
+ }
+ return null;
+ }
+
+}
Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelatorDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelatorDaoImpl.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelatorDaoImpl.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelatorDaoImpl.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.dao.hib.bpel;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.xml.namespace.QName;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.common.CorrelationKeySet;
+import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.dao.bpel.CorrelatorDAO;
+import org.apache.ode.dao.bpel.CorrelatorMessageDAO;
+import org.apache.ode.dao.bpel.MessageExchangeDAO;
+import org.apache.ode.dao.bpel.MessageRouteDAO;
+import org.apache.ode.dao.bpel.ProcessInstanceDAO;
+import org.apache.ode.dao.hib.SessionManager;
+import org.apache.ode.dao.hib.bpel.hobj.HCorrelator;
+import org.apache.ode.dao.hib.bpel.hobj.HCorrelatorMessage;
+import org.apache.ode.dao.hib.bpel.hobj.HCorrelatorSelector;
+import org.apache.ode.dao.hib.bpel.hobj.HMessageExchange;
+import org.apache.ode.dao.hib.bpel.hobj.HProcessInstance;
+import org.hibernate.Hibernate;
+import org.hibernate.LockMode;
+import org.hibernate.Query;
+import org.hibernate.Session;
+import org.hibernate.exception.LockAcquisitionException;
+
+/**
+ * Hibernate-based {@link CorrelatorDAO} implementation.
+ */
+public class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO {
+ static Log __log = LogFactory.getLog(CorrelatorDaoImpl.class);
+
+ /** filter for finding a matching selector. */
+ private static final String LOCK_SELECTORS = "update from HCorrelatorSelector as hs set hs.lock = hs.lock+1 where hs.processType = :processType";
+ private static final String CHECK_SELECTORS = "from HCorrelatorSelector as hs where hs.processType = :processType and hs.correlator.correlatorId = :correlatorId";
+ private static final String FLTR_SELECTORS = "from HCorrelatorSelector as hs where hs.processType = :processType and hs.correlator.correlatorId = :correlatorId";
+ private static final String FLTR_SELECTORS_SUBQUERY = ("from HCorrelatorSelector as hs where hs.processType = :processType and hs.correlatorId = " +
+ "(select hc.id from HCorrelator as hc where hc.correlatorId = :correlatorId )").intern();
+
+ /** Query for removing routes. */
+ private static final String QRY_DELSELECTORS = "delete from HCorrelatorSelector where groupId = ? and instance = ?";
+
+ private HCorrelator _hobj;
+
+ public CorrelatorDaoImpl(SessionManager sm, HCorrelator hobj) {
+ super(sm, hobj);
+ entering("CorrelatorDaoImpl.CorrelatorDaoImpl");
+ _hobj = hobj;
+ }
+
+ @SuppressWarnings("unchecked")
+ public MessageExchangeDAO dequeueMessage(CorrelationKeySet keySet) {
+ entering("CorrelatorDaoImpl.dequeueMessage");
+
+ MessageExchangeDAO mex = null;
+
+ String hdr = "dequeueMessage(" + keySet + "): ";
+ __log.debug(hdr);
+
+ List<CorrelationKeySet> subSets = keySet.findSubSets();
+ Query qry = getSession().createFilter(_hobj.getMessageCorrelations(),
+ generateUnmatchedQuery(subSets));
+ for( int i = 0; i < subSets.size(); i++ ) {
+ qry.setString("s" + i, subSets.get(i).toCanonicalString());
+ }
+
+ // We really should consider the possibility of multiple messages matching a criteria.
+ // When the message is handled, its not too convenient to attempt to determine if the
+ // received message conflicts with one already received.
+ Iterator mcors;
+ try {
+ mcors = qry.setLockMode("this", LockMode.UPGRADE).iterate();
+ } catch (LockAcquisitionException e) {
+ throw new Scheduler.JobProcessorException(e, true);
+ }
+ try {
+ if (!mcors.hasNext()) {
+ if (__log.isDebugEnabled())
+ __log.debug(hdr + "did not find a MESSAGE entry.");
+ } else {
+ HCorrelatorMessage mcor = (HCorrelatorMessage) mcors.next();
+ if (__log.isDebugEnabled())
+ __log.debug(hdr + "found MESSAGE entry " + mcor.getMessageExchange());
+ mex = new MessageExchangeDaoImpl(_sm, mcor.getMessageExchange());
+ }
+ } finally {
+ Hibernate.close(mcors);
+ }
+
+ return mex;
+ }
+
+ @SuppressWarnings("unchecked")
+ public List<MessageRouteDAO> findRoute(CorrelationKeySet keySet) {
+ List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>();
+
+ entering("CorrelatorDaoImpl.findRoute");
+ String hdr = "findRoute(keySet=" + keySet + "): ";
+ if (__log.isDebugEnabled()) __log.debug(hdr);
+
+ String processType = new QName(_hobj.getProcess().getTypeNamespace(), _hobj.getProcess().getTypeName()).toString();
+ List<CorrelationKeySet> subSets = keySet.findSubSets();
+
+ Query q = getSession().createQuery(generateSelectorQuery(_sm.canJoinForUpdate() ? FLTR_SELECTORS : FLTR_SELECTORS_SUBQUERY, subSets));
+ q.setString("processType", processType);
+ q.setString("correlatorId", _hobj.getCorrelatorId());
+
+ for( int i = 0; i < subSets.size(); i++ ) {
+ q.setString("s" + i, subSets.get(i).toCanonicalString());
+ }
+ // Make sure we obtain a lock for the selector we want to find.
+ q.setLockMode("hs", LockMode.UPGRADE);
+
+ List<HProcessInstance> targets = new ArrayList<HProcessInstance>();
+ List<HCorrelatorSelector> list;
+ try {
+ list = (List<HCorrelatorSelector>) q.list();
+ } catch (LockAcquisitionException e) {
+ throw new Scheduler.JobProcessorException(e, true);
+ }
+ for (HCorrelatorSelector selector : list) {
+ if (selector != null) {
+ boolean isRoutePolicyOne = selector.getRoute() == null || "one".equals(selector.getRoute());
+ if ("all".equals(selector.getRoute()) ||
+ (isRoutePolicyOne && !targets.contains(selector.getInstance()))) {
+ routes.add(new MessageRouteDaoImpl(_sm, selector));
+ targets.add(selector.getInstance());
+ }
+ }
+ }
+
+ if(__log.isDebugEnabled()) __log.debug(hdr + "found " + routes);
+
+ return routes;
+ }
+
+ private String generateUnmatchedQuery(List<CorrelationKeySet> subSets) {
+ StringBuffer filterQuery = new StringBuffer();
+
+ if( subSets.size() == 1 ) {
+ filterQuery.append(" where this.correlationKey = :s0");
+ } else if( subSets.size() > 1 ) {
+ filterQuery.append(" where this.correlationKey in(");
+ for( int i = 0; i < subSets.size(); i++ ) {
+ if( i > 0 ) {
+ filterQuery.append(", ");
+ }
+ filterQuery.append(":s").append(i);
+ }
+ filterQuery.append(")");
+ }
+
+ return filterQuery.toString();
+ }
+
+ private String generateSelectorQuery(String header, List<CorrelationKeySet> subSets) {
+ StringBuffer filterQuery = new StringBuffer(header);
+
+ if( subSets.size() == 1 ) {
+ filterQuery.append(" and hs.correlationKey = :s0");
+ } else if( subSets.size() > 1 ) {
+ filterQuery.append(" and hs.correlationKey in(");
+ for( int i = 0; i < subSets.size(); i++ ) {
+ if( i > 0 ) {
+ filterQuery.append(", ");
+ }
+ filterQuery.append(":s").append(i);
+ }
+ filterQuery.append(")");
+ }
+
+ return filterQuery.toString();
+ }
+
+ public void enqueueMessage(MessageExchangeDAO mex, CorrelationKeySet correlationKeySet) {
+ entering("CorrelatorDaoImpl.enqueueMessage");
+ String hdr = "enqueueMessage(mex=" + ((MessageExchangeDaoImpl) mex)._hobj.getId() + " keySet="
+ + correlationKeySet.toCanonicalString() + "): ";
+
+ if (__log.isDebugEnabled())
+ __log.debug(hdr);
+
+ for( CorrelationKeySet aSubSet : correlationKeySet.findSubSets() ) {
+ HCorrelatorMessage mcor = new HCorrelatorMessage();
+ mcor.setCorrelator(_hobj);
+ mcor.setCreated(new Date());
+ mcor.setMessageExchange((HMessageExchange) ((MessageExchangeDaoImpl) mex)._hobj);
+ mcor.setCorrelationKey(aSubSet.toCanonicalString());
+ getSession().save(mcor);
+
+ if (__log.isDebugEnabled())
+ __log.debug(hdr + "saved " + mcor);
+ }
+ }
+
+ public void addRoute(String routeGroupId, ProcessInstanceDAO target, int idx, CorrelationKeySet correlationKeySet, String routePolicy) {
+ entering("CorrelatorDaoImpl.addRoute");
+ String hdr = "addRoute(" + routeGroupId + ", iid=" + target.getInstanceId() + ", idx=" + idx + ", ckeySet="
+ + correlationKeySet + "): ";
+
+ if (__log.isDebugEnabled())
+ __log.debug(hdr);
+ HCorrelatorSelector hsel = new HCorrelatorSelector();
+ hsel.setGroupId(routeGroupId);
+ hsel.setIndex(idx);
+ hsel.setLock(0);
+ hsel.setCorrelationKey(correlationKeySet.toCanonicalString());
+ hsel.setInstance((HProcessInstance) ((ProcessInstanceDaoImpl) target).getHibernateObj());
+ hsel.setProcessType(target.getProcess().getType().toString());
+ hsel.setCorrelator(_hobj);
+ hsel.setCreated(new Date());
+ hsel.setRoute(routePolicy);
+ try {
+ getSession().save(hsel);
+ } catch (LockAcquisitionException e) {
+ throw new Scheduler.JobProcessorException(e, true);
+ }
+
+ if (__log.isDebugEnabled())
+ __log.debug(hdr + "saved " + hsel);
+ }
+
+ public boolean checkRoute(CorrelationKeySet correlationKeySet) {
+ entering("CorrelatorDaoImpl.checkRoute");
+ Query q = getSession().getNamedQuery(HCorrelatorSelector.SELECT_MESSAGE_ROUTE);
+ q.setEntity("corr",_hobj);
+ q.setString("ckey", correlationKeySet.toCanonicalString());
+ q.setReadOnly(true);
+ return q.list().isEmpty();
+ }
+
+ public String getCorrelatorId() {
+ return _hobj.getCorrelatorId();
+ }
+
+ public void setCorrelatorId(String newId) {
+ _hobj.setCorrelatorId(newId);
+ }
+
+ public void removeRoutes(String routeGroupId, ProcessInstanceDAO target) {
+ entering("CorrelatorDaoImpl.removeRoutes");
+ String hdr = "removeRoutes(" + routeGroupId + ", iid=" + target.getInstanceId() + "): ";
+ __log.debug(hdr);
+ Session session = getSession();
+ Query q = session.createQuery(QRY_DELSELECTORS);
+ q.setString(0, routeGroupId); // groupId
+ q.setEntity(1, ((ProcessInstanceDaoImpl) target).getHibernateObj()); // instance
+ int updates = q.executeUpdate();
+ session.flush(); // explicit flush to ensure route removed
+ if (__log.isDebugEnabled())
+ __log.debug(hdr + "deleted " + updates + " rows");
+ }
+
+ public Collection<CorrelatorMessageDAO> getAllMessages() {
+ Collection<CorrelatorMessageDAO> msgs = new ArrayList<CorrelatorMessageDAO>();
+ for (HCorrelatorMessage correlatorMessage : _hobj.getMessageCorrelations())
+ msgs.add(new CorrelatorMessageDaoImpl(_sm, correlatorMessage));
+ return msgs;
+ }
+
+ public Collection<MessageRouteDAO> getAllRoutes() {
+ Collection<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>();
+ for (HCorrelatorSelector selector : _hobj.getSelectors())
+ routes.add(new MessageRouteDaoImpl(_sm, selector));
+ return routes;
+ }
+
+}
\ No newline at end of file
Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelatorMessageDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelatorMessageDaoImpl.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelatorMessageDaoImpl.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelatorMessageDaoImpl.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.hib.bpel;
+
+import org.apache.ode.bpel.common.CorrelationKey;
+import org.apache.ode.dao.bpel.CorrelatorMessageDAO;
+import org.apache.ode.dao.hib.SessionManager;
+import org.apache.ode.dao.hib.bpel.hobj.HCorrelatorMessage;
+
+public class CorrelatorMessageDaoImpl extends HibernateDao implements CorrelatorMessageDAO {
+
+ private HCorrelatorMessage _hobj;
+
+ public CorrelatorMessageDaoImpl(SessionManager sm, HCorrelatorMessage hobj) {
+ super(sm, hobj);
+ entering("CorrelatorDaoImpl.CorrelatorDaoImpl");
+ _hobj = hobj;
+ }
+
+ public CorrelationKey getCorrelationKey() {
+ return new CorrelationKey(_hobj.getCorrelationKey());
+ }
+
+ public void setCorrelationKey(CorrelationKey ckey) {
+ _hobj.setCorrelationKey(ckey.toCanonicalString());
+ }
+}
Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CriteriaBuilder.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CriteriaBuilder.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CriteriaBuilder.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CriteriaBuilder.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.hib.bpel;
+
+import java.sql.Timestamp;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.common.BpelEventFilter;
+import org.apache.ode.bpel.common.Filter;
+import org.apache.ode.bpel.common.InstanceFilter;
+import org.apache.ode.utils.ISO8601DateParser;
+import org.apache.ode.utils.RelativeDateParser;
+import org.hibernate.Criteria;
+import org.hibernate.Query;
+import org.hibernate.Session;
+import org.hibernate.criterion.Disjunction;
+import org.hibernate.criterion.Property;
+import org.hibernate.criterion.Restrictions;
+
+/**
+ * Class used for converting "filter" objects into Hibernate
+ * {@link org.hibernate.Criteria} objects.
+ */
+class CriteriaBuilder {
+ static final Log __log = LogFactory.getLog(CriteriaBuilder.class);
+
+ /**
+ * Build a HQL query from an instance filter.
+ * @param filter filter
+ */
+ Query buildHQLQuery(Session session, InstanceFilter filter) {
+ Map<String, Object> parameters = new HashMap<String, Object>();
+
+ StringBuffer query = new StringBuffer();
+
+ query.append("select pi from HProcessInstance as pi left join fetch pi.fault ");
+
+ if (filter != null) {
+ // Building each clause
+ ArrayList<String> clauses = new ArrayList<String>();
+
+ // iid filter
+ if ( filter.getIidFilter() != null ) {
+ StringBuffer filters = new StringBuffer();
+ List<String> iids = filter.getIidFilter();
+ for (int m = 0; m < iids.size(); m++) {
+ filters.append(" pi.id = :iid").append(m);
+ parameters.put("iid" + m, Long.parseLong(iids.get(m)));
+ if (m < iids.size() - 1) filters.append(" or");
+ }
+ clauses.add(" (" + filters + ")");
+ }
+
+ // pid filter
+ if (filter.getPidFilter() != null) {
+ StringBuffer filters = new StringBuffer();
+ List<String> pids = filter.getPidFilter();
+ String cmp;
+ if (filter.arePidsNegative()) {
+ cmp = " != ";
+ } else {
+ cmp = " = ";
+ }
+ for (int m = 0; m < pids.size(); m++) {
+ filters.append(" pi.process.processId ").append(cmp).append(" :pid").append(m);
+ parameters.put("pid" + m, pids.get(m));
+ if (m < pids.size() - 1) filters.append(" or");
+ }
+ clauses.add(" (" + filters + ")");
+ }
+
+ // name filter
+ if (filter.getNameFilter() != null) {
+ clauses.add(" pi.process.typeName like :pname");
+ parameters.put("pname", filter.getNameFilter().replaceAll("\\*", "%"));
+ }
+
+ // name space filter
+ if (filter.getNamespaceFilter() != null) {
+ clauses.add(" pi.process.typeNamespace like :pnamespace");
+ parameters.put("pnamespace", filter.getNamespaceFilter().replaceAll("\\*", "%"));
+ }
+
+ // started filter
+ if (filter.getStartedDateFilter() != null) {
+ for ( String ds : filter.getStartedDateFilter() ) {
+ // named parameters not needed as date is parsed and is hence not
+ // prone to HQL injections
+ clauses.add(" pi.created " + dateFilter(ds));
+ }
+ }
+
+ // last-active filter
+ if (filter.getLastActiveDateFilter() != null) {
+ for ( String ds : filter.getLastActiveDateFilter() ) {
+ // named parameters not needed as date is parsed and is hence not
+ // prone to HQL injections
+ clauses.add(" pi.lastActiveTime " + dateFilter(ds));
+ }
+ }
+
+ // status filter
+ if (filter.getStatusFilter() != null) {
+ StringBuffer filters = new StringBuffer();
+ List<Short> states = filter.convertFilterState();
+ for (int m = 0; m < states.size(); m++) {
+ filters.append(" pi.state = :pstate").append(m);
+ parameters.put("pstate" + m, states.get(m));
+ if (m < states.size() - 1) filters.append(" or");
+ }
+ clauses.add(" (" + filters.toString() + ")");
+ }
+
+ // $property filter
+ if (filter.getPropertyValuesFilter() != null) {
+ Map<String,String> props = filter.getPropertyValuesFilter();
+ // join to correlation sets
+ query.append(" inner join pi.correlationSets as cs");
+ int i = 0;
+ for (String propKey : props.keySet()) {
+ i++;
+ // join to props for each prop
+ query.append(" inner join cs.properties as csp"+i);
+ // add clause for prop key and value
+
+ // spaces have to be escaped, might be better handled in InstanceFilter
+ String value = props.get(propKey).replaceAll(" ", " ");
+ if (propKey.startsWith("{")) {
+ String namespace = propKey.substring(1, propKey.lastIndexOf("}"));
+ clauses.add(" csp" + i + ".name = :cspname" + i +
+ " and csp" + i + ".namespace = :cspnamespace" + i +
+ " and csp" + i + ".value = :cspvalue" + i);
+
+ parameters.put("cspname" + i, propKey.substring(propKey.lastIndexOf("}") + 1, propKey.length()));
+ parameters.put("cspnamespace" + i, namespace);
+ parameters.put("cspvalue" + i, value);
+ } else {
+ clauses.add(" csp" + i + ".name = :cspname" + i +
+ " and csp" + i + ".value = :cspvalue" + i);
+
+ parameters.put("cspname" + i, propKey);
+ parameters.put("cspvalue" + i, value);
+ }
+ }
+ }
+
+ // order by
+ StringBuffer orderby = new StringBuffer("");
+ if (filter.getOrders() != null) {
+ orderby.append(" order by");
+ List<String> orders = filter.getOrders();
+ for (int m = 0; m < orders.size(); m++) {
+ String field = orders.get(m);
+ String ord = " asc";
+ if (field.startsWith("-")) {
+ ord = " desc";
+ }
+ String fieldName = " pi.id";
+ if (field.endsWith("name")) {
+ fieldName = " pi.process.typeName";
+ }
+ if (field.endsWith("namespace")) {
+ fieldName = " pi.process.typeNamespace";
+ }
+ if ( field.endsWith("version")) {
+ fieldName = " pi.process.version";
+ }
+ if ( field.endsWith("status")) {
+ fieldName = " pi.state";
+ }
+ if ( field.endsWith("started")) {
+ fieldName = " pi.created";
+ }
+ if ( field.endsWith("last-active")) {
+ fieldName = " pi.lastActiveTime";
+ }
+ orderby.append(fieldName + ord);
+ if (m < orders.size() - 1) orderby.append(", ");
+ }
+
+ }
+
+ // Preparing the statement
+ if (clauses.size() > 0) {
+ query.append(" where");
+ for (int m = 0; m < clauses.size(); m++) {
+ query.append(clauses.get(m));
+ if (m < clauses.size() - 1) query.append(" and");
+ }
+ }
+
+ query.append(orderby);
+ }
+
+ if (__log.isDebugEnabled()) {
+ __log.debug(query.toString());
+ }
+
+ Query q = session.createQuery(query.toString());
+
+ for (String p : parameters.keySet()) {
+ q.setParameter(p, parameters.get(p));
+ }
+
+ if (filter.getLimit() != 0) {
+ q.setMaxResults(filter.getLimit());
+ }
+
+ return q;
+ }
+
+ private static String dateFilter(String filter) {
+ String date = Filter.getDateWithoutOp(filter);
+ String op = filter.substring(0,filter.indexOf(date));
+ Date dt = null;
+ try {
+ dt = ISO8601DateParser.parse(date);
+ } catch (ParseException e) {
+ e.printStackTrace();
+ }
+ Timestamp ts = new Timestamp(dt.getTime());
+ return op + " '" + ts.toString() + "'";
+ }
+
+
+
+ /**
+ * Build a Hibernate {@link Criteria} from an instance filter.
+ * @param crit target (destination) criteria
+ * @param filter filter
+ */
+ void buildCriteria(Criteria crit, InstanceFilter filter) {
+ Criteria processCrit = crit.createCriteria("process");
+
+ // Filtering on PID
+ List<String> pids = filter.getPidFilter();
+ if (pids != null && pids.size() > 0) {
+ Disjunction disj = Restrictions.disjunction();
+ for (String pid: pids) {
+ if( !filter.arePidsNegative() ) {
+ disj.add(Restrictions.eq("processId", pid));
+ } else {
+ disj.add(Restrictions.ne("processId", pid));
+ }
+ }
+ processCrit.add(disj);
+ }
+
+ List<String> iids = filter.getIidFilter();
+ if (iids != null && iids.size() > 0) {
+ Disjunction disj = Restrictions.disjunction();
+ for (String iid: iids) {
+ disj.add(Restrictions.eq("id", new Long(iid)));
+ }
+ crit.add(disj);
+ }
+
+ // Filtering on name and namespace
+ if (filter.getNameFilter() != null) {
+ processCrit.add(Restrictions.like("typeName", filter.getNameFilter().replaceAll("\\*", "%")));
+ }
+ if (filter.getNamespaceFilter() != null) {
+ processCrit.add(Restrictions.like("typeNamespace", filter.getNamespaceFilter().replaceAll("\\*", "%")));
+ }
+
+ // Specific filter for status (using a disjunction between possible statuses)
+ if (filter.getStatusFilter() != null) {
+ List<Short> statuses = filter.convertFilterState();
+ Disjunction disj = Restrictions.disjunction();
+ for (short status : statuses) {
+ disj.add(Restrictions.eq("state", status));
+ }
+ crit.add(disj);
+ }
+
+ // Specific filter for started and last active dates.
+ if (filter.getStartedDateFilter() != null) {
+ for (String sdf : filter.getStartedDateFilter()) {
+ addFilterOnPrefixedDate(crit, sdf, "created");
+ }
+ }
+ if (filter.getLastActiveDateFilter() != null) {
+ for (String ladf : filter.getLastActiveDateFilter()) {
+ addFilterOnPrefixedDate(crit, ladf, "lastActiveTime");
+ }
+ }
+
+ // Specific filter for correlation properties
+ if (filter.getPropertyValuesFilter() != null) {
+ Criteria propCrit = crit.createCriteria("correlationSets").createCriteria("properties");
+ for (Map.Entry<String, String> corValue : filter.getPropertyValuesFilter().entrySet()) {
+ String propName = (String)corValue.getKey();
+ if (propName.startsWith("{")) {
+ String namespace = propName.substring(1, propName.lastIndexOf("}"));
+ propName = propName.substring(propName.lastIndexOf("}") + 1, propName.length());
+ propCrit.add(Restrictions.eq("name", propName))
+ .add(Restrictions.eq("namespace", namespace))
+ .add(Restrictions.eq("value", corValue.getValue()));
+ } else {
+ propCrit.add(Restrictions.eq("name", corValue.getKey()))
+ .add(Restrictions.eq("value", corValue.getValue()));
+ }
+ }
+ }
+
+ // Ordering
+ if (filter.orders != null) {
+ for (String key : filter.orders) {
+ boolean ascending = true;
+ String orderKey = key;
+ if (key.startsWith("+") || key.startsWith("-")) {
+ orderKey = key.substring(1, key.length());
+ if (key.startsWith("-")) ascending = false;
+ }
+
+ if ("name".equals(orderKey)) {
+ if (ascending) processCrit.addOrder(Property.forName("typeName").asc());
+ else processCrit.addOrder(Property.forName("typeName").desc());
+ } else if ("namespace".equals(orderKey)) {
+ if (ascending) processCrit.addOrder(Property.forName("typeNamespace").asc());
+ else processCrit.addOrder(Property.forName("typeNamespace").desc());
+ } else if ("pid".equals(orderKey)) {
+ if (ascending) processCrit.addOrder(Property.forName("processId").asc());
+ else processCrit.addOrder(Property.forName("processId").desc());
+ } else if ("version".equals(orderKey)) {
+ if (ascending) processCrit.addOrder(Property.forName("version").asc());
+ else processCrit.addOrder(Property.forName("version").desc());
+ } else if ("status".equals(orderKey)) {
+ if (ascending) crit.addOrder(Property.forName("state").asc());
+ else crit.addOrder(Property.forName("state").desc());
+ } else if ("started".equals(orderKey)) {
+ if (ascending) crit.addOrder(Property.forName("created").asc());
+ else crit.addOrder(Property.forName("created").desc());
+ } else if ("last-active".equals(orderKey)) {
+ if (ascending) crit.addOrder(Property.forName("lastActiveTime").asc());
+ else crit.addOrder(Property.forName("lastActiveTime").desc());
+ }
+ }
+ }
+
+ if (filter.getLimit() > 0) crit.setMaxResults(filter.getLimit());
+ }
+
+ /**
+ * Build criteria for an event filter.
+ * @param crit target criteria
+ * @param efilter event filter
+ */
+ void buildCriteria(Criteria crit, BpelEventFilter efilter) {
+ if (efilter.getTypeFilter() != null)
+ crit.add(Restrictions.like("type", efilter.getTypeFilter().replace('*','%')));
+
+ // Specific filter for started and last active dates.
+ if (efilter.getTimestampFilter() != null) {
+ for (Filter.Restriction<Date> sdf : efilter.getTimestampFilter()) {
+ addFilterOnPrefixedDate(crit, sdf.op, sdf.value, "tstamp");
+ }
+ }
+
+ if (efilter.limit > 0) crit.setMaxResults(efilter.limit);
+ }
+
+ void addScopeFilter(Criteria crit, String scopeId) {
+ crit.add(Restrictions.eq("",scopeId));
+ }
+
+ static void addFilterOnPrefixedDate(Criteria crit, String prefixedDate, String dateAttribute) {
+ Date realDate = null;
+ try {
+ realDate = parseDateExpression(getDateWithoutOp(prefixedDate));
+ } catch (ParseException e) {
+ // Never occurs, the deploy date format is pre-validated by the filter
+ }
+ addFilterOnPrefixedDate(crit,prefixedDate,realDate,dateAttribute);
+ }
+
+ private static Date parseDateExpression(String date) throws ParseException {
+ if( date.toLowerCase().startsWith("-") && date.length() > 1 ) {
+ return RelativeDateParser.parseRelativeDate(date.substring(1));
+ } else {
+ return ISO8601DateParser.parse(date);
+ }
+ }
+
+ static void addFilterOnPrefixedDate(Criteria crit, String op, Date date, String dateAttribute) {
+ if (op.startsWith("=")) {
+ crit.add(Restrictions.eq(dateAttribute, date));
+ } else if (op.startsWith("<=")) {
+ crit.add(Restrictions.le(dateAttribute, date));
+ } else if (op.startsWith(">=")) {
+ crit.add(Restrictions.ge(dateAttribute, date));
+ } else if (op.startsWith("<")) {
+ crit.add(Restrictions.lt(dateAttribute, date));
+ } else if (op.startsWith(">")) {
+ crit.add(Restrictions.gt(dateAttribute, date));
+ }
+ }
+
+ private static String getDateWithoutOp(String ddf) {
+ return Filter.getDateWithoutOp(ddf);
+ }
+
+
+}
Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/FaultDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/FaultDAOImpl.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/FaultDAOImpl.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/FaultDAOImpl.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.hib.bpel;
+
+import java.io.IOException;
+
+import javax.xml.namespace.QName;
+
+import org.apache.ode.dao.bpel.FaultDAO;
+import org.apache.ode.dao.hib.SessionManager;
+import org.apache.ode.dao.hib.bpel.hobj.HFaultData;
+import org.apache.ode.utils.DOMUtils;
+import org.apache.ode.utils.QNameUtils;
+import org.w3c.dom.Element;
+import org.xml.sax.SAXException;
+
+/**
+ * Hibernate based {@link FaultDAO} implementation
+ */
+public class FaultDAOImpl extends HibernateDao implements FaultDAO {
+
+
+ HFaultData _self;
+
+ public FaultDAOImpl(SessionManager sm, HFaultData fault) {
+ super(sm, fault);
+ entering("FaultDAOImpl.FaultDAOImpl");
+ _self = fault;
+ }
+
+ public QName getName() {
+ return QNameUtils.toQName(_self.getName());
+ }
+
+ public Element getData() {
+ entering("FaultDAOImpl.getData");
+ if (_self.getData() == null) return null;
+ try {
+ return DOMUtils.stringToDOM(_self.getData());
+ } catch (SAXException e) {
+ throw new RuntimeException(e);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String getExplanation() {
+ return _self.getExplanation();
+ }
+
+ public int getLineNo() {
+ return _self.getLineNo();
+ }
+
+ public int getActivityId() {
+ return _self.getActivityId();
+ }
+}
Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/HibernateDao.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/HibernateDao.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/HibernateDao.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/HibernateDao.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.dao.hib.bpel;
+
+import org.apache.ode.dao.hib.SessionManager;
+import org.apache.ode.dao.hib.bpel.hobj.HObject;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.hibernate.Query;
+import org.hibernate.Session;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Base class for our DAO objects.
+ * <p>
+ * All subclass methods that might trigger SQL queries should log a message in the log category 'org.apache.ode.bpel.DAO' when entered.
+ * A typical message could be "className.methodName". <br/>
+ * Typical candidates are setters, finders and getters of entities. Getters of simple properties won't provide relevant information.
+ */
+public abstract class HibernateDao {
+
+ // logger used by subclasses to track entered methods that may trigger sql query
+ // we don't use the package name to avoid interferences with other logging info.
+ static final Log logDao = LogFactory.getLog("org.apache.ode.bpel.DAO");
+
+ protected final SessionManager _sm;
+ protected final HObject _hobj;
+
+ protected HibernateDao(SessionManager sessionManager, HObject hobj) {
+ _sm = sessionManager;
+ _hobj = hobj;
+ }
+
+ void entering(String msg){
+ // add a prefix to be parser friendly
+ if(logDao.isDebugEnabled()) logDao.debug("entering "+msg);
+ }
+
+ void leaving(String msg){
+ if(logDao.isDebugEnabled()) logDao.debug("leaving "+msg);
+ }
+
+ /**
+ * @see org.apache.ode.utils.dao.DAO#getDHandle()
+ */
+ public Serializable getDHandle() {
+ return new HibernateHandle(getClass(), _hobj.getClass(), getSession().getIdentifier(_hobj));
+ }
+
+ protected Session getSession() {
+ return _sm.getSession();
+ }
+
+ public HObject getHibernateObj() {
+ return _hobj;
+ }
+
+ public Serializable getId() {
+ if( _hobj != null ) {
+ return _hobj.getId();
+ }
+ return null;
+ }
+
+ public boolean equals(Object obj) {
+ assert obj instanceof HibernateDao;
+ return _hobj.getId().equals(((HibernateDao) obj)._hobj.getId());
+ }
+
+ public int hashCode() {
+ return _hobj.getId().hashCode();
+ }
+
+ protected void update() {
+ _sm.getSession().update(_hobj);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void deleteByIds(Class entity, List<Long> ids) {
+ deleteByColumn(entity, "id", ids);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void deleteByColumn(Class entity, String column, List<Long> values) {
+ if( values != null && values.size() > 0 ) {
+ final String delete = "delete from "+entity.getName()+" as e where e."+column+" in (:values)";
+
+ // some databases don't like long lists of values with IN operator
+ // so we delete in batches. Oracle 9i, for instance, doesn't support
+ // more than 1000 -- we opt to be conservative.
+ final int batchSize = 100;
+
+ int index = 0;
+ while (index < values.size()) {
+ List<Long> subList = values.subList(index, Math.min(index+batchSize, values.size()));
+ Query query = getSession().createQuery(delete);
+ query.setParameterList("values", subList);
+ query.executeUpdate();
+ index += batchSize;
+ }
+ }
+ }
+}
Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/HibernateHandle.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/HibernateHandle.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/HibernateHandle.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/HibernateHandle.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.dao.hib.bpel;
+
+import java.io.Serializable;
+
+/**
+ * Serializable class for obtain a reference to a hibernate POJO
+ */
+class HibernateHandle implements Serializable{
+
+ private static final long serialVersionUID = 1L;
+ private Class _daoCls;
+ private Class _hibCls;
+ private Serializable _id;
+ /**
+ *
+ */
+ public HibernateHandle(Class daoCls, Class hibCls, Serializable id) {
+ _daoCls = daoCls;
+ _hibCls = hibCls;
+ _id = id;
+ }
+
+ public Class getHibernateClass(){
+ return _hibCls;
+ }
+
+ public Class getDAOClass(){
+ return _daoCls;
+ }
+
+ public Serializable getId(){
+ return _id;
+ }
+}