You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by je...@apache.org on 2010/08/18 06:12:59 UTC

svn commit: r986561 [6/13] - in /ode/trunk: ./ axis2-war/ axis2-war/src/main/assembly/ axis2-war/src/test/java/org/apache/ode/axis2/ axis2-war/src/test/java/org/apache/ode/axis2/instancecleanup/ axis2-war/src/test/java/org/apache/ode/bpel/dao/ axis2-wa...

Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/NativeHiLoGenerator.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/NativeHiLoGenerator.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/NativeHiLoGenerator.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/NativeHiLoGenerator.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.hib;
+
+import java.io.Serializable;
+import java.util.Properties;
+
+import org.hibernate.HibernateException;
+import org.hibernate.MappingException;
+import org.hibernate.dialect.Dialect;
+import org.hibernate.engine.SessionImplementor;
+import org.hibernate.id.Configurable;
+import org.hibernate.id.IdentifierGenerator;
+import org.hibernate.id.PersistentIdentifierGenerator;
+import org.hibernate.id.SequenceHiLoGenerator;
+import org.hibernate.id.TableHiLoGenerator;
+import org.hibernate.type.Type;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class NativeHiLoGenerator implements IdentifierGenerator, PersistentIdentifierGenerator, Configurable {
+    private static final Log __log = LogFactory.getLog(NativeHiLoGenerator.class);
+    private IdentifierGenerator _proxy;
+
+    public NativeHiLoGenerator() {
+        super();
+    }
+
+    public Serializable generate(SessionImplementor session, Object object) throws HibernateException {
+        return _proxy.generate(session, object);
+    }
+
+    public Object generatorKey() {
+        if (_proxy instanceof PersistentIdentifierGenerator)
+            return ((PersistentIdentifierGenerator) _proxy).generatorKey();
+        else
+            return this;
+    }
+
+    public String[] sqlCreateStrings(Dialect dialect) throws HibernateException {
+        if (_proxy instanceof PersistentIdentifierGenerator)
+            return ((PersistentIdentifierGenerator) _proxy).sqlCreateStrings(dialect);
+        else
+            return new String[] {};
+    }
+
+    public String[] sqlDropStrings(Dialect dialect) throws HibernateException {
+        if (_proxy instanceof PersistentIdentifierGenerator)
+            return ((PersistentIdentifierGenerator) _proxy).sqlDropStrings(dialect);
+        else
+            return null;
+    }
+
+    public void configure(Type type, Properties params, Dialect dialect) throws MappingException {
+        Class generatorClass = null;
+        if (dialect.supportsSequences()) {
+            __log.debug("Using SequenceHiLoGenerator");
+            generatorClass = SequenceHiLoGenerator.class;
+        } else {
+            generatorClass = TableHiLoGenerator.class;
+            __log.debug("Using native dialect generator " + generatorClass);
+        }
+
+        IdentifierGenerator g = null;
+        try {
+            g = (IdentifierGenerator) generatorClass.newInstance();
+        } catch (Exception e) {
+            throw new MappingException("", e);
+        }
+
+        if (g instanceof Configurable)
+            ((Configurable) g).configure(type, params, dialect);
+
+        this._proxy = g;
+    }
+}

Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/SessionManager.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/SessionManager.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/SessionManager.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/SessionManager.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.dao.hib;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.dao.hib.bpel.hobj.*;
+import org.apache.ode.dao.hib.store.hobj.DeploymentUnitDaoImpl;
+import org.apache.ode.dao.hib.store.hobj.ProcessConfDaoImpl;
+import org.apache.ode.dao.hib.store.hobj.VersionTrackerDAOImpl;
+import org.apache.ode.utils.uuid.UUID;
+import org.hibernate.HibernateException;
+import org.hibernate.MappingException;
+import org.hibernate.Session;
+import org.hibernate.SessionFactory;
+import org.hibernate.cfg.Configuration;
+import org.hibernate.cfg.Environment;
+
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Manages hibernate sessions, and their association with 
+ * a transaction thread.  Uses a ThreadLocal strategy for 
+ * managing sessions.
+ */
+public class SessionManager {
+    private static final Log __log = LogFactory.getLog(SessionManager.class);
+
+    public static final String PROP_GUID = "ode.hibernate.guid";
+
+    private static final Map<String, TransactionManager> _txManagers =
+            Collections.synchronizedMap(new HashMap<String, TransactionManager>());
+    private static final Map<String, DataSource> _dataSources =
+            Collections.synchronizedMap(new HashMap<String,DataSource>());
+
+    private static final String[] CANNOT_JOIN_FOR_UPDATE_DIALECTS =
+            {"org.hibernate.dialect.IngresDialect"};
+
+    private final String _uuid = new UUID().toString();
+    private final TransactionManager _txManager;
+    private final SessionFactory _sessionFactory;
+    private boolean _canJoinForUpdate = true;
+    
+    private final TxContext _ctx;
+
+    public SessionManager(Properties env, DataSource ds, TransactionManager tx) throws HibernateException {
+        this(getDefaultConfiguration(), env, ds, tx);
+    }
+
+    /** Inaccessible constructor. */
+    public SessionManager(Configuration conf, Properties env, DataSource ds, TransactionManager tx) throws HibernateException {
+        
+    	if (tx!=null){
+            _ctx = new HibernateJtaTxContext();
+        }else{
+            _ctx = new HibernateNonTxContext();
+        }
+
+        _txManager = tx;
+        _txManagers.put(_uuid,tx);
+        _dataSources.put(_uuid,ds);
+
+        _sessionFactory = conf.setProperties(env).setProperty(PROP_GUID, _uuid).buildSessionFactory();
+
+        /*
+        Some Hibernate dialects (like IngresDialect) do not support update for join.
+        We need to distinguish them and explicitly define subqueries, otherwise Hibernate
+        implicitly generates joins which causes problems during update for such DBMS.
+        See org.apache.ode.dao.hib.bpel.CorrelatorDaoImpl for instance.
+        */
+        String currentHibDialect = env.getProperty(Environment.DIALECT);
+        for (String dialect : CANNOT_JOIN_FOR_UPDATE_DIALECTS) {
+            if (dialect.equals(currentHibDialect)) {
+                _canJoinForUpdate = false;
+            }
+        }
+    }
+
+    TransactionManager getTransactionManager() {
+        return _txManager;
+    }
+
+    public static void registerTransactionManager(String uuid, TransactionManager txm) {
+        _txManagers.put(uuid, txm);
+    }
+
+    /**
+     * Get the current Hibernate Session.
+     */
+    public Session getSession() {
+        return _sessionFactory.getCurrentSession();
+    }
+
+    /**
+     * Returns flag which shows whether " where .. join ... for update" kind of queries can be used (supported
+     * by currently effective {@link org.hibernate.dialect.Dialect}. If it's {@code false} than sub-query fallback
+     * should be invoked instead.
+     *
+     * @return currently returns false only for {@link org.hibernate.dialect.IngresDialect}
+     */
+    public boolean canJoinForUpdate() {
+        return _canJoinForUpdate;
+    }
+
+
+    /**
+     * Returns a hibernate configuration with hibernate DAO objects added as resources.
+     * @return
+     * @throws MappingException
+     */
+    public static Configuration getDefaultConfiguration() throws MappingException {
+        return new Configuration()
+                .addClass(HProcess.class)
+                .addClass(HProcessInstance.class)
+                .addClass(HCorrelator.class)
+                .addClass(HCorrelatorMessage.class)
+                .addClass(HCorrelationProperty.class)
+                .addClass(HCorrelatorSelector.class)
+                .addClass(HMessageExchange.class)
+                .addClass(HMessage.class)
+                .addClass(HPartnerLink.class)
+                .addClass(HScope.class)
+                .addClass(HCorrelationSet.class)
+                .addClass(HXmlData.class)
+                .addClass(HVariableProperty.class)
+                .addClass(HBpelEvent.class)
+                .addClass(HFaultData.class)
+                .addClass(HActivityRecovery.class)
+                .addClass(HMessageExchangeProperty.class)//store classes
+                .addClass(ProcessConfDaoImpl.class)
+                .addClass(DeploymentUnitDaoImpl.class)
+                .addClass(VersionTrackerDAOImpl.class);
+    }
+
+    public static TransactionManager getTransactionManager(Properties props) {
+        String guid = props.getProperty(PROP_GUID);
+        return _txManagers.get(guid);
+    }
+
+    public static Connection getConnection(Properties props) throws SQLException {
+        String guid = props.getProperty(PROP_GUID);
+        return _dataSources.get(guid).getConnection();
+    }
+    
+    public boolean isClosed() {
+        return _sessionFactory.isClosed();
+    }
+    
+    public void shutdown() {
+        _sessionFactory.close();
+    }
+    
+    
+    public void begin(){
+        _ctx.begin();
+     }
+
+     public void commit(){
+       _ctx.commit();
+     }
+
+     public void rollback(){
+       _ctx.rollback();
+     }
+
+     public interface TxContext {
+
+         public void begin();
+
+         public void commit();
+
+         public void rollback();
+     }
+
+     public class HibernateNonTxContext implements TxContext {
+
+         public void begin() {
+             getSession().beginTransaction();
+         }
+
+         public void commit() {
+             getSession().getTransaction().commit();
+         }
+
+         public void rollback() {
+             getSession().getTransaction().rollback();
+         }
+     }
+
+     public class HibernateJtaTxContext implements TxContext {
+
+         public void begin() {
+         }
+
+         public void commit() {
+         }
+
+         public void rollback() {
+         }
+     }
+}

Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/ActivityRecoveryDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/ActivityRecoveryDaoImpl.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/ActivityRecoveryDaoImpl.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/ActivityRecoveryDaoImpl.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.hib.bpel;
+
+import java.io.IOException;
+import java.util.Date;
+
+import org.apache.ode.dao.bpel.ActivityRecoveryDAO;
+import org.apache.ode.dao.hib.SessionManager;
+import org.apache.ode.dao.hib.bpel.hobj.HActivityRecovery;
+import org.apache.ode.utils.DOMUtils;
+import org.w3c.dom.Element;
+import org.xml.sax.SAXException;
+
+/**
+ * Hibernate based {@link ActivityRecoveryDao} implementation
+ */
+public class ActivityRecoveryDaoImpl extends HibernateDao implements ActivityRecoveryDAO {
+    HActivityRecovery _self;
+
+    public ActivityRecoveryDaoImpl(SessionManager sm, HActivityRecovery recovery) {
+        super(sm, recovery);
+        entering("ActivityRecoveryDaoImpl.ActivityRecoveryDaoImpl");
+        _self = recovery;
+    }
+
+    public long getActivityId() {
+        return _self.getActivityId();
+    }
+
+    public String getChannel() {
+        return _self.getChannel();
+    }
+
+    public String getReason() {
+        return _self.getReason();
+    }
+
+    public Date getDateTime() {
+        return _self.getDateTime();
+    }
+
+    public Element getDetails() {
+        entering("ActivityRecoveryDaoImpl.getDetails");
+        if (_self.getDetails() == null) {
+            return null;
+        }
+        try {
+            return DOMUtils.stringToDOM(_self.getDetails());
+        } catch (SAXException e) {
+            throw new RuntimeException(e);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public String getActions() {
+        return _self.getActions();
+    }
+
+    public String[] getActionsList() {
+        return _self.getActions().split(" ");
+    }
+
+    public int getRetries() {
+        return _self.getRetries();
+    }
+}

Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/BpelDAOConnectionFactoryImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/BpelDAOConnectionFactoryImpl.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/BpelDAOConnectionFactoryImpl.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/BpelDAOConnectionFactoryImpl.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.dao.hib.bpel;
+
+import java.sql.Connection;
+import java.util.Enumeration;
+import java.util.Properties;
+
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.dao.bpel.BpelDAOConnection;
+import org.apache.ode.dao.bpel.BpelDAOConnectionFactory;
+import org.apache.ode.dao.hib.DataSourceConnectionProvider;
+import org.apache.ode.dao.hib.HibernateTransactionManagerLookup;
+import org.apache.ode.dao.hib.SessionManager;
+import org.apache.ode.il.config.OdeConfigProperties;
+import org.hibernate.HibernateException;
+import org.hibernate.cfg.Configuration;
+import org.hibernate.cfg.Environment;
+import org.hibernate.dialect.Dialect;
+import org.hibernate.dialect.resolver.DialectFactory;
+
+/**
+ * Hibernate-based {@link org.apache.ode.dao.bpel.BpelDAOConnectionFactory}
+ * implementation.
+ */
+public class BpelDAOConnectionFactoryImpl implements BpelDAOConnectionFactory {
+    private static final Log __log = LogFactory.getLog(BpelDAOConnectionFactoryImpl.class);
+
+    protected SessionManager _sessionManager;
+
+    protected DataSource _ds;
+
+    protected TransactionManager _txm;
+
+    /**
+     * Constructor.
+     */
+    public BpelDAOConnectionFactoryImpl() {
+    }
+
+    public BpelDAOConnection getConnection() {
+        try {
+            return new BpelDAOConnectionImpl(_sessionManager);
+        } catch (HibernateException e) {
+            __log.error("DbError", e);
+            throw e;
+        }
+    }
+    
+    @SuppressWarnings("unchecked")
+    public void init(Properties initialProps, TransactionManager mgr, Object env) {
+      _txm=mgr;
+      _ds=(DataSource)env;
+      if (_txm == null){
+          __log.error("Hibernate BpelDAOConnectionFactoryImpl requires a JTA Transaction Manager to be set.");
+      }
+      _sessionManager = setupSessionManager(initialProps, _txm, _ds);
+
+
+    }
+
+    public static SessionManager setupSessionManager(Properties initialProps, TransactionManager mgr, DataSource ds){
+        return setupSessionManager(SessionManager.getDefaultConfiguration(), initialProps, mgr, ds);
+     }
+
+    public static SessionManager setupSessionManager(Configuration conf, Properties initialConfig, TransactionManager mgr, DataSource ds){
+        // Don't want to pollute original properties
+        Properties properties = new Properties();
+        if (initialConfig != null) {
+            for (Object prop : initialConfig.keySet()) {
+                properties.put(prop, initialConfig.get(prop));
+            }
+        }
+
+        // Note that we don't allow the following properties to be overriden by
+        // the client.
+        /*
+        if (properties.containsKey(Environment.CONNECTION_PROVIDER))
+            __log.warn("Ignoring user-specified Hibernate property: " + Environment.CONNECTION_PROVIDER);
+        if (properties.containsKey(Environment.TRANSACTION_MANAGER_STRATEGY))
+            __log.warn("Ignoring user-specified Hibernate property: " + Environment.TRANSACTION_MANAGER_STRATEGY);
+        if (properties.containsKey(Environment.SESSION_FACTORY_NAME))
+            __log.warn("Ignoring user-specified Hibernate property: " + Environment.SESSION_FACTORY_NAME);
+        */
+        if (ds!=null){
+          properties.put(Environment.CONNECTION_PROVIDER, DataSourceConnectionProvider.class.getName());
+          // Guess Hibernate dialect if not specified in hibernate.properties
+          if (properties.get(Environment.DIALECT) == null) {
+              try {
+                  properties.put(Environment.DIALECT, guessDialect(ds));
+              } catch (Exception ex) {
+                  String errmsg = "Unable to detect Hibernate dialect!";
+
+                  if (__log.isDebugEnabled())
+                      __log.debug(errmsg, ex);
+
+                  __log.error(errmsg);
+              }
+          }
+        }
+
+        if (mgr!=null){
+          properties.put(Environment.TRANSACTION_MANAGER_STRATEGY, HibernateTransactionManagerLookup.class.getName());
+           /*
+          * Since Hibernate 3.2.6, Hibernate JTATransaction requires User Transaction bound on JNDI. Let's work around
+          * by implementing Hibernate JTATransactionFactory that hooks up to the JTATransactionManager(ODE uses geronimo
+          * by default).
+          */
+          //properties.put(Environment.TRANSACTION_STRATEGY, "org.hibernate.transaction.JTATransactionFactory");
+          properties.put(Environment.TRANSACTION_STRATEGY, "org.apache.ode.dao.hib.JotmTransactionFactory");
+          properties.put(Environment.CURRENT_SESSION_CONTEXT_CLASS, "jta");
+        }else{
+          properties.put(Environment.TRANSACTION_STRATEGY,"org.hibernate.transaction.JDBCTransactionFactory");
+          properties.put(Environment.CURRENT_SESSION_CONTEXT_CLASS,"thread");
+        }
+
+        // Isolation levels override
+        if (System.getProperty("ode.connection.isolation") != null) {
+            String level = System.getProperty("ode.connection.isolation", "2");
+            properties.put(Environment.ISOLATION, level);
+        }
+
+        if (Boolean.valueOf(initialConfig.getProperty(OdeConfigProperties.PROP_DB_EMBEDDED_CREATE, "true"))) {
+            properties.put(Environment.HBM2DDL_AUTO, "create-drop");
+        }
+
+        if (__log.isDebugEnabled()) {
+            Enumeration names = properties.propertyNames();
+            __log.debug("Properties passed to Hibernate:");
+            while (names.hasMoreElements()) {
+                String name = (String) names.nextElement();
+                __log.debug(name + "=" + properties.getProperty(name));
+            }
+        }
+
+        return new SessionManager(conf, properties, ds,mgr);
+
+    }
+
+    protected SessionManager createSessionManager(Properties properties, DataSource ds, TransactionManager tm) {
+        return new SessionManager(properties, ds, tm);
+    }
+
+    private static final String DEFAULT_HIBERNATE_DIALECT = "org.hibernate.dialect.DerbyDialect";
+
+    public void shutdown() {
+        // Not too much to do for hibernate.
+    }
+
+    public static String guessDialect(DataSource dataSource) throws Exception {
+        String dialect = null;
+        // Open a connection and use that connection to figure out database
+        // product name/version number in order to decide which Hibernate
+        // dialect to use.
+        Connection conn = dataSource.getConnection();
+        try {
+            Dialect d = DialectFactory.buildDialect(new Properties(), conn);
+            dialect=d.getClass().getName();
+        } finally {
+            conn.close();
+        }
+
+        if (dialect == null) {
+            __log.info("Cannot determine hibernate dialect for this database: using the default one.");
+            dialect = DEFAULT_HIBERNATE_DIALECT;
+        }
+
+        return dialect;
+
+    }
+
+    public void setDataSource(DataSource ds) {
+        _ds = ds;
+    }
+
+    public DataSource getDataSource() {
+        return _ds;
+    }
+
+    public void setTransactionManager(Object tm) {
+        _txm = (TransactionManager) tm;
+    }
+
+    public void setUnmanagedDataSource(DataSource ds) {
+        // Hibernate doesn't use this.
+    }
+
+}

Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/BpelDAOConnectionImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/BpelDAOConnectionImpl.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/BpelDAOConnectionImpl.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/BpelDAOConnectionImpl.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.dao.hib.bpel;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.xml.namespace.QName;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.common.BpelEventFilter;
+import org.apache.ode.bpel.common.InstanceFilter;
+import org.apache.ode.bpel.common.ProcessState;
+import org.apache.ode.bpel.evt.BpelEvent;
+import org.apache.ode.bpel.evt.ScopeEvent;
+import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
+import org.apache.ode.dao.bpel.BpelDAOConnection;
+import org.apache.ode.dao.bpel.CorrelationSetDAO;
+import org.apache.ode.dao.bpel.FilteredInstanceDeletable;
+import org.apache.ode.dao.bpel.MessageExchangeDAO;
+import org.apache.ode.dao.bpel.ProcessDAO;
+import org.apache.ode.dao.bpel.ProcessInstanceDAO;
+import org.apache.ode.dao.bpel.ProcessManagementDAO;
+import org.apache.ode.dao.bpel.ScopeDAO;
+import org.apache.ode.dao.hib.SessionManager;
+import org.apache.ode.dao.hib.bpel.hobj.HBpelEvent;
+import org.apache.ode.dao.hib.bpel.hobj.HCorrelationSet;
+import org.apache.ode.dao.hib.bpel.hobj.HMessageExchange;
+import org.apache.ode.dao.hib.bpel.hobj.HProcess;
+import org.apache.ode.dao.hib.bpel.hobj.HProcessInstance;
+import org.apache.ode.dao.hib.bpel.hobj.HScope;
+import org.apache.ode.dao.hib.bpel.ql.HibernateInstancesQueryCompiler;
+import org.apache.ode.ql.eval.skel.CommandEvaluator;
+import org.apache.ode.ql.tree.Builder;
+import org.apache.ode.ql.tree.BuilderFactory;
+import org.apache.ode.ql.tree.nodes.Query;
+import org.apache.ode.utils.SerializableUtils;
+import org.apache.ode.utils.stl.CollectionsX;
+import org.apache.ode.utils.stl.UnaryFunctionEx;
+import org.hibernate.Criteria;
+import org.hibernate.FetchMode;
+import org.hibernate.HibernateException;
+import org.hibernate.Session;
+import org.hibernate.criterion.Expression;
+import org.hibernate.criterion.Projections;
+
+/**
+ * Hibernate-based {@link BpelDAOConnection} implementation.
+ */
+public class BpelDAOConnectionImpl implements BpelDAOConnection, FilteredInstanceDeletable {
+    private static final Log __log = LogFactory.getLog(BpelDAOConnectionImpl.class);
+
+    public SessionManager _sm;
+
+    public BpelDAOConnectionImpl(SessionManager sm) {
+        _sm = sm;
+    }
+
+    protected Session getSession(){
+        return _sm.getSession();
+    }
+
+    public MessageExchangeDAO createMessageExchange(char dir) {
+        HMessageExchange mex = new HMessageExchange();
+        mex.setDirection(dir);
+        mex.setInsertTime(new Date(System.currentTimeMillis()));
+        getSession().save(mex);
+        return new MessageExchangeDaoImpl(_sm, mex);
+    }
+
+    public MessageExchangeDAO getMessageExchange(String mexid) {
+        HMessageExchange mex = (HMessageExchange) getSession().get(HMessageExchange.class, new Long(mexid));
+        return mex == null ? null : new MessageExchangeDaoImpl(_sm, mex);
+    }
+
+    public ProcessDAO createProcess(QName pid, QName type, String guid, long version) {
+        HProcess process = new HProcess();
+        process.setProcessId(pid.toString());
+        process.setTypeName(type.getLocalPart());
+        process.setTypeNamespace(type.getNamespaceURI());
+        process.setDeployDate(new Date());
+        process.setGuid(guid);
+        process.setVersion(version);
+        getSession().save(process);
+        return new ProcessDaoImpl(_sm, process);
+    }
+
+    public ProcessDAO createTransientProcess(Long id) {
+        HProcess process = new HProcess();
+        process.setId(id);
+
+        return new ProcessDaoImpl(_sm, process);
+    }
+
+    public ProcessDAO getProcess(QName processId) {
+        try {
+            Criteria criteria = getSession().createCriteria(HProcess.class);
+            criteria.add(Expression.eq("processId", processId.toString()));
+            // For the moment we are expecting only one result.
+            HProcess hprocess = (HProcess) criteria.uniqueResult();
+            return hprocess == null ? null : new ProcessDaoImpl(_sm, hprocess);
+        } catch (HibernateException e) {
+            __log.error("DbError", e);
+            throw e;
+        }
+
+    }
+
+    public void close() {
+    }
+
+    /**
+     * @see org.apache.ode.dao.bpel.ProcessDAO#getInstance(java.lang.Long)
+     */
+    public ProcessInstanceDAO getInstance(Long instanceId) {
+        return _getInstance(_sm, getSession(), instanceId);
+    }
+
+    public int getNumInstances(QName processId) {
+        ProcessDAO process = getProcess(processId);
+        if (process != null)
+            return process.getNumInstances();
+        else return -1;
+    }
+
+    public ScopeDAO getScope(Long siidl) {
+        return _getScope(_sm, getSession(), siidl);
+    }
+
+    public Collection<ProcessInstanceDAO> instanceQuery(InstanceFilter criteria) {
+        if (criteria.getLimit() == 0) {
+            return Collections.emptyList();
+        }
+        List<ProcessInstanceDAO> daos = new ArrayList<ProcessInstanceDAO>();
+
+        Iterator<HProcessInstance> iter = _instanceQuery(getSession(), false, criteria);
+        while (iter.hasNext()) {
+            daos.add(new ProcessInstanceDaoImpl(_sm, iter.next()));
+        }
+
+        return daos;
+    }
+
+    public int deleteInstances(InstanceFilter criteria, Set<CLEANUP_CATEGORY> categories) {
+        if (criteria.getLimit() == 0) {
+            return 0;
+        }
+
+        List<HProcessInstance> instances = _instanceQueryForList(getSession(), false, criteria);
+        if( __log.isDebugEnabled() ) __log.debug("Collected " + instances.size() + " instances to delete.");
+
+        if( !instances.isEmpty() ) {
+            ProcessDaoImpl process = (ProcessDaoImpl)createTransientProcess(instances.get(0).getProcessId());
+            return process.deleteInstances(instances, categories);
+        }
+
+        return 0;
+    }
+
+    static Iterator<HProcessInstance> _instanceQuery(Session session, boolean countOnly, InstanceFilter filter) {
+        return _instanceQueryForList(session, countOnly, filter).iterator();
+    }
+
+    @SuppressWarnings("unchecked")
+    private static List<HProcessInstance> _instanceQueryForList(Session session, boolean countOnly, InstanceFilter filter) {
+        CriteriaBuilder cb = new CriteriaBuilder();
+
+        return cb.buildHQLQuery(session, filter).list();
+    }
+
+    static ProcessInstanceDAO _getInstance(SessionManager sm, Session session, Long iid) {
+        HProcessInstance instance = (HProcessInstance) session.get(HProcessInstance.class, iid);
+        return instance != null ? new ProcessInstanceDaoImpl(sm, instance) : null;
+    }
+
+    static ScopeDAO _getScope(SessionManager sm, Session session, Long siid) {
+        HScope scope = (HScope) session.get(HScope.class, siid);
+        return scope != null ? new ScopeDaoImpl(sm, scope) : null;
+    }
+
+    public void insertBpelEvent(BpelEvent event, ProcessDAO process, ProcessInstanceDAO instance) {
+        _insertBpelEvent(_sm.getSession(), event, process, instance);
+    }
+
+    /**
+     * Helper method for inserting bpel events into the database.
+     *
+     * @param sess
+     * @param event
+     * @param process
+     * @param instance
+     */
+    static void _insertBpelEvent(Session sess, BpelEvent event, ProcessDAO process, ProcessInstanceDAO instance) {
+        HBpelEvent hevent = new HBpelEvent();
+        hevent.setTstamp(new Timestamp(System.currentTimeMillis()));
+        hevent.setType(BpelEvent.eventName(event));
+        hevent.setDetail(event.toString());
+        if (process != null)
+            hevent.setProcess((HProcess) ((ProcessDaoImpl) process).getHibernateObj());
+        if (instance != null)
+            hevent.setInstance((HProcessInstance) ((ProcessInstanceDaoImpl) instance).getHibernateObj());
+        if (event instanceof ScopeEvent)
+            hevent.setScopeId(((ScopeEvent) event).getScopeId());
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            ObjectOutputStream oos = new ObjectOutputStream(bos);
+            oos.writeObject(event);
+            oos.flush();
+            hevent.setData(bos.toByteArray());
+        } catch (Throwable ex) {
+            // this is really unexpected.
+            __log.fatal("InternalError: BpelEvent serialization failed.", ex);
+        }
+        sess.save(hevent);
+    }
+
+    @SuppressWarnings( { "unchecked", "deprecation" })
+    public List<Date> bpelEventTimelineQuery(InstanceFilter ifilter, BpelEventFilter efilter) {
+        CriteriaBuilder cb = new CriteriaBuilder();
+        Criteria crit = getSession().createCriteria(HBpelEvent.class);
+        if (ifilter != null)
+            cb.buildCriteria(crit, efilter);
+        if (ifilter != null)
+            cb.buildCriteria(crit.createCriteria("instance"), ifilter);
+        crit.setFetchMode("tstamp", FetchMode.EAGER);
+        crit.setProjection(Projections.property("tstamp"));
+        return crit.list();
+    }
+
+    @SuppressWarnings("unchecked")
+    public List<BpelEvent> bpelEventQuery(InstanceFilter ifilter, BpelEventFilter efilter) {
+        CriteriaBuilder cb = new CriteriaBuilder();
+        Criteria crit = getSession().createCriteria(HBpelEvent.class);
+        if (efilter != null)
+            cb.buildCriteria(crit, efilter);
+        if (ifilter != null)
+            cb.buildCriteria(crit.createCriteria("instance"), ifilter);
+        List<HBpelEvent> hevents = crit.list();
+        List<BpelEvent> ret = new ArrayList<BpelEvent>(hevents.size());
+        try {
+            CollectionsX.transformEx(ret, hevents, new UnaryFunctionEx<HBpelEvent, BpelEvent>() {
+                public BpelEvent apply(HBpelEvent x) throws Exception {
+                    return (BpelEvent) SerializableUtils.toObject(x.getData(), BpelEvent.class
+                            .getClassLoader());
+                }
+
+            });
+        } catch (Exception ex) {
+            __log.fatal("Internal error: unable to transform HBpelEvent", ex);
+            throw new RuntimeException(ex);
+        }
+        return ret;
+    }
+
+    /**
+     * @see org.apache.ode.dao.bpel.BpelDAOConnection#instanceQuery(String)
+     */
+    @SuppressWarnings("unchecked")
+    public Collection<ProcessInstanceDAO> instanceQuery(String expression) {
+        Builder<String> builder = BuilderFactory.getInstance().createBuilder();
+        final org.apache.ode.ql.tree.nodes.Node rootNode = builder.build(expression);
+
+        HibernateInstancesQueryCompiler compiler = new HibernateInstancesQueryCompiler();
+
+        CommandEvaluator<List, Session> eval = compiler.compile((Query) rootNode);
+        List<HProcessInstance> instancesList = (List<HProcessInstance>) eval.evaluate(getSession());
+
+        Collection<ProcessInstanceDAO> result = new ArrayList<ProcessInstanceDAO>(instancesList.size());
+        for (HProcessInstance instance : instancesList) {
+            result.add(getInstance(instance.getId()));
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    public Map<Long, Collection<CorrelationSetDAO>> getCorrelationSets(Collection<ProcessInstanceDAO> instances) {
+        if (instances.size() == 0) {
+            return new HashMap<Long, Collection<CorrelationSetDAO>>();
+        }
+        ArrayList<Long> iids = new ArrayList<Long>(instances.size());
+        int i=0;
+        for (ProcessInstanceDAO dao: instances) {
+            iids.add(dao.getInstanceId());
+            i++;
+        }
+        Collection<HCorrelationSet> csets = new ArrayList<HCorrelationSet>();
+        // some databases don't like long lists of values with IN operator
+        // so we select in batches.  Oracle 9i, for instance, doesn't support
+        // more than 1000 -- we opt to be conservative.
+        final int batchSize = 100;
+        int index = 0;
+        while (index < iids.size()) {
+            List<Long> subList = iids.subList(index, Math.min(index+batchSize, iids.size()));
+            csets.addAll(getSession().getNamedQuery(HCorrelationSet.SELECT_CORSETS_BY_INSTANCES).setParameterList("instances", subList).list());
+            index += batchSize;
+        }
+        Map<Long, Collection<CorrelationSetDAO>> map = new HashMap<Long, Collection<CorrelationSetDAO>>();
+        for (HCorrelationSet cset: csets) {
+            Long id = cset.getInstance().getId();
+            Collection<CorrelationSetDAO> existing = map.get(id);
+            if (existing == null) {
+                existing = new ArrayList<CorrelationSetDAO>();
+                map.put(id, existing);
+            }
+            existing.add(new CorrelationSetDaoImpl(_sm, cset));
+        }
+        return map;
+    }
+
+    @SuppressWarnings("unchecked")
+    public Collection<CorrelationSetDAO> getActiveCorrelationSets() {
+        ArrayList<CorrelationSetDAO> csetDaos = new ArrayList<CorrelationSetDAO>();
+        Collection<HCorrelationSet> csets = getSession().getNamedQuery(HCorrelationSet.SELECT_CORSETS_BY_PROCESS_STATES).setParameter("states", ProcessState.STATE_ACTIVE).list();
+        for (HCorrelationSet cset : csets)
+            csetDaos.add(new CorrelationSetDaoImpl(_sm, cset));
+        return csetDaos;
+    }
+
+
+    public ProcessManagementDAO getProcessManagement() {
+        return new ProcessManagementDaoImpl(_sm);
+    }
+
+	public boolean isClosed() {
+		return _sm.isClosed();
+	}
+}

Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelationSetDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelationSetDaoImpl.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelationSetDaoImpl.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelationSetDaoImpl.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.dao.hib.bpel;
+
+import org.apache.ode.bpel.common.CorrelationKey;
+import org.apache.ode.dao.bpel.CorrelationSetDAO;
+import org.apache.ode.dao.bpel.ProcessDAO;
+import org.apache.ode.dao.bpel.ProcessInstanceDAO;
+import org.apache.ode.dao.bpel.ScopeDAO;
+import org.apache.ode.dao.hib.SessionManager;
+import org.apache.ode.dao.hib.bpel.hobj.HCorrelationProperty;
+import org.apache.ode.dao.hib.bpel.hobj.HCorrelationSet;
+
+import javax.xml.namespace.QName;
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ * Hibernate-based {@link CorrelationSetDAO} implementation.
+ */
+class CorrelationSetDaoImpl extends HibernateDao
+    implements CorrelationSetDAO {
+
+  private HCorrelationSet _correlationSet;
+
+  public CorrelationSetDaoImpl(SessionManager sessionManager, HCorrelationSet correlationSet) {
+    super(sessionManager, correlationSet);
+      entering("CorrelationSetDaoImpl.CorrelationSetDaoImpl");
+    _correlationSet = correlationSet;
+  }
+
+  public Long getCorrelationSetId() {
+    return _correlationSet.getId();
+  }
+
+  public String getName() {
+    return _correlationSet.getName();
+  }
+
+  public ScopeDAO getScope() {
+      entering("CorrelationSetDaoImpl.getScope");
+    return new ScopeDaoImpl(_sm, _correlationSet.getScope());
+  }
+
+    public void setValue(QName[] names, CorrelationKey values) {
+        entering("CorrelationSetDaoImpl.setValue");
+        _correlationSet.setValue(values.toCanonicalString());
+        if (names != null) {
+            if (_correlationSet.getProperties() == null || _correlationSet.getProperties().size() == 0) {
+                for (int m = 0; m < names.length; m++) {
+                    HCorrelationProperty prop =
+                            new HCorrelationProperty(names[m], values.getValues()[m], _correlationSet);
+                    getSession().save(prop);
+                }
+            } else {
+                for (int m = 0; m < names.length; m++) {
+                    HCorrelationProperty prop = getProperty(names[m]);
+                    if (prop == null) prop = new HCorrelationProperty(names[m], values.getValues()[m], _correlationSet);
+                    else prop.setValue(values.getValues()[m]);
+                    getSession().save(prop);
+                }
+            }
+        }
+        getSession().update(_correlationSet);
+    }
+
+  public CorrelationKey getValue() {
+      entering("CorrelationSetDaoImpl.getValue");
+    if (_correlationSet.getValue() != null) return new CorrelationKey(_correlationSet.getValue());
+    else return null;
+  }
+
+  public Map<QName, String> getProperties() {
+      entering("CorrelationSetDaoImpl.getProperties");
+    HashMap<QName, String> result = new HashMap<QName, String>();
+    for (HCorrelationProperty property : _correlationSet.getProperties()) {
+      result.put(property.getQName(), property.getValue());
+    }
+    return result;
+  }
+
+    public ProcessDAO getProcess() {
+        return new ProcessDaoImpl(_sm, _correlationSet.getProcess());
+    }
+
+    public ProcessInstanceDAO getInstance() {
+        return new ProcessInstanceDaoImpl(_sm, _correlationSet.getInstance());
+    }
+
+    private HCorrelationProperty getProperty(QName qName) {
+      entering("CorrelationSetDaoImpl.getProperty");
+    for (HCorrelationProperty property : _correlationSet.getProperties()) {
+      if (qName.getLocalPart().equals(property.getName())
+              && qName.getNamespaceURI().equals(property.getNamespace()))
+        return property;
+    }
+    return null;
+  }
+
+}

Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelatorDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelatorDaoImpl.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelatorDaoImpl.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelatorDaoImpl.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.dao.hib.bpel;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.xml.namespace.QName;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.common.CorrelationKeySet;
+import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.dao.bpel.CorrelatorDAO;
+import org.apache.ode.dao.bpel.CorrelatorMessageDAO;
+import org.apache.ode.dao.bpel.MessageExchangeDAO;
+import org.apache.ode.dao.bpel.MessageRouteDAO;
+import org.apache.ode.dao.bpel.ProcessInstanceDAO;
+import org.apache.ode.dao.hib.SessionManager;
+import org.apache.ode.dao.hib.bpel.hobj.HCorrelator;
+import org.apache.ode.dao.hib.bpel.hobj.HCorrelatorMessage;
+import org.apache.ode.dao.hib.bpel.hobj.HCorrelatorSelector;
+import org.apache.ode.dao.hib.bpel.hobj.HMessageExchange;
+import org.apache.ode.dao.hib.bpel.hobj.HProcessInstance;
+import org.hibernate.Hibernate;
+import org.hibernate.LockMode;
+import org.hibernate.Query;
+import org.hibernate.Session;
+import org.hibernate.exception.LockAcquisitionException;
+
+/**
+ * Hibernate-based {@link CorrelatorDAO} implementation.
+ */
+public class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO {
+    static Log __log = LogFactory.getLog(CorrelatorDaoImpl.class);
+
+    /** filter for finding a matching selector. */
+    private static final String LOCK_SELECTORS = "update from HCorrelatorSelector as hs set hs.lock = hs.lock+1 where hs.processType = :processType";
+    private static final String CHECK_SELECTORS = "from HCorrelatorSelector as hs where hs.processType = :processType and hs.correlator.correlatorId = :correlatorId";
+    private static final String FLTR_SELECTORS = "from HCorrelatorSelector as hs where hs.processType = :processType and hs.correlator.correlatorId = :correlatorId";
+    private static final String FLTR_SELECTORS_SUBQUERY = ("from HCorrelatorSelector as hs where hs.processType = :processType and hs.correlatorId = " +
+            "(select hc.id from HCorrelator as hc where hc.correlatorId = :correlatorId )").intern();
+
+    /** Query for removing routes. */
+    private static final String QRY_DELSELECTORS = "delete from HCorrelatorSelector where groupId = ? and instance = ?";
+
+    private HCorrelator _hobj;
+
+    public CorrelatorDaoImpl(SessionManager sm, HCorrelator hobj) {
+        super(sm, hobj);
+        entering("CorrelatorDaoImpl.CorrelatorDaoImpl");
+        _hobj = hobj;
+    }
+
+    @SuppressWarnings("unchecked")
+    public MessageExchangeDAO dequeueMessage(CorrelationKeySet keySet) {
+        entering("CorrelatorDaoImpl.dequeueMessage");
+
+        MessageExchangeDAO mex = null;
+
+        String hdr = "dequeueMessage(" + keySet + "): ";
+        __log.debug(hdr);
+
+        List<CorrelationKeySet> subSets = keySet.findSubSets();
+        Query qry = getSession().createFilter(_hobj.getMessageCorrelations(),
+                generateUnmatchedQuery(subSets));
+        for( int i = 0; i < subSets.size(); i++ ) {
+            qry.setString("s" + i, subSets.get(i).toCanonicalString());
+        }
+
+        // We really should consider the possibility of multiple messages matching a criteria.
+        // When the message is handled, its not too convenient to attempt to determine if the
+        // received message conflicts with one already received.
+        Iterator mcors;
+        try {
+            mcors = qry.setLockMode("this", LockMode.UPGRADE).iterate();
+        } catch (LockAcquisitionException e) {
+            throw new Scheduler.JobProcessorException(e, true);
+        }
+        try {
+            if (!mcors.hasNext()) {
+                if (__log.isDebugEnabled())
+                    __log.debug(hdr + "did not find a MESSAGE entry.");
+            } else {
+                HCorrelatorMessage mcor = (HCorrelatorMessage) mcors.next();
+                if (__log.isDebugEnabled())
+                    __log.debug(hdr + "found MESSAGE entry " + mcor.getMessageExchange());
+                mex = new MessageExchangeDaoImpl(_sm, mcor.getMessageExchange());
+            }
+        } finally {
+            Hibernate.close(mcors);
+        }
+
+        return mex;
+    }
+
+    @SuppressWarnings("unchecked")
+    public List<MessageRouteDAO> findRoute(CorrelationKeySet keySet) {
+        List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>();
+
+        entering("CorrelatorDaoImpl.findRoute");
+        String hdr = "findRoute(keySet=" + keySet + "): ";
+        if (__log.isDebugEnabled()) __log.debug(hdr);
+
+        String processType = new QName(_hobj.getProcess().getTypeNamespace(), _hobj.getProcess().getTypeName()).toString();
+        List<CorrelationKeySet> subSets = keySet.findSubSets();
+
+        Query q = getSession().createQuery(generateSelectorQuery(_sm.canJoinForUpdate() ? FLTR_SELECTORS : FLTR_SELECTORS_SUBQUERY, subSets));
+        q.setString("processType", processType);
+        q.setString("correlatorId", _hobj.getCorrelatorId());
+
+        for( int i = 0; i < subSets.size(); i++ ) {
+            q.setString("s" + i, subSets.get(i).toCanonicalString());
+        }
+        // Make sure we obtain a lock for the selector we want to find.
+        q.setLockMode("hs", LockMode.UPGRADE);
+
+        List<HProcessInstance> targets = new ArrayList<HProcessInstance>();
+        List<HCorrelatorSelector> list;
+        try {
+            list = (List<HCorrelatorSelector>) q.list();
+        } catch (LockAcquisitionException e) {
+            throw new Scheduler.JobProcessorException(e, true);
+        }
+        for (HCorrelatorSelector selector : list) {
+            if (selector != null) {
+                boolean isRoutePolicyOne = selector.getRoute() == null || "one".equals(selector.getRoute());
+                if ("all".equals(selector.getRoute()) ||
+                        (isRoutePolicyOne && !targets.contains(selector.getInstance()))) {
+                    routes.add(new MessageRouteDaoImpl(_sm, selector));
+                    targets.add(selector.getInstance());
+                }
+            }
+        }
+
+        if(__log.isDebugEnabled()) __log.debug(hdr + "found " + routes);
+
+        return routes;
+    }
+
+    private String generateUnmatchedQuery(List<CorrelationKeySet> subSets) {
+        StringBuffer filterQuery = new StringBuffer();
+
+        if( subSets.size() == 1 ) {
+            filterQuery.append(" where this.correlationKey = :s0");
+        } else if( subSets.size() > 1 ) {
+            filterQuery.append(" where this.correlationKey in(");
+            for( int i = 0; i < subSets.size(); i++ ) {
+                if( i > 0 ) {
+                    filterQuery.append(", ");
+                }
+                filterQuery.append(":s").append(i);
+            }
+            filterQuery.append(")");
+        }
+
+        return filterQuery.toString();
+    }
+
+    private String generateSelectorQuery(String header, List<CorrelationKeySet> subSets) {
+        StringBuffer filterQuery = new StringBuffer(header);
+
+        if( subSets.size() == 1 ) {
+            filterQuery.append(" and hs.correlationKey = :s0");
+        } else if( subSets.size() > 1 ) {
+            filterQuery.append(" and hs.correlationKey in(");
+            for( int i = 0; i < subSets.size(); i++ ) {
+                if( i > 0 ) {
+                    filterQuery.append(", ");
+                }
+                filterQuery.append(":s").append(i);
+            }
+            filterQuery.append(")");
+        }
+
+        return filterQuery.toString();
+    }
+
+    public void enqueueMessage(MessageExchangeDAO mex, CorrelationKeySet correlationKeySet) {
+        entering("CorrelatorDaoImpl.enqueueMessage");
+        String hdr = "enqueueMessage(mex=" + ((MessageExchangeDaoImpl) mex)._hobj.getId() + " keySet="
+                + correlationKeySet.toCanonicalString() + "): ";
+
+        if (__log.isDebugEnabled())
+            __log.debug(hdr);
+
+        for( CorrelationKeySet aSubSet : correlationKeySet.findSubSets() ) {
+            HCorrelatorMessage mcor = new HCorrelatorMessage();
+            mcor.setCorrelator(_hobj);
+            mcor.setCreated(new Date());
+            mcor.setMessageExchange((HMessageExchange) ((MessageExchangeDaoImpl) mex)._hobj);
+            mcor.setCorrelationKey(aSubSet.toCanonicalString());
+            getSession().save(mcor);
+
+            if (__log.isDebugEnabled())
+                __log.debug(hdr + "saved " + mcor);
+        }
+    }
+
+    public void addRoute(String routeGroupId, ProcessInstanceDAO target, int idx, CorrelationKeySet correlationKeySet, String routePolicy) {
+        entering("CorrelatorDaoImpl.addRoute");
+        String hdr = "addRoute(" + routeGroupId + ", iid=" + target.getInstanceId() + ", idx=" + idx + ", ckeySet="
+                + correlationKeySet + "): ";
+
+        if (__log.isDebugEnabled())
+            __log.debug(hdr);
+        HCorrelatorSelector hsel = new HCorrelatorSelector();
+        hsel.setGroupId(routeGroupId);
+        hsel.setIndex(idx);
+        hsel.setLock(0);
+        hsel.setCorrelationKey(correlationKeySet.toCanonicalString());
+        hsel.setInstance((HProcessInstance) ((ProcessInstanceDaoImpl) target).getHibernateObj());
+        hsel.setProcessType(target.getProcess().getType().toString());
+        hsel.setCorrelator(_hobj);
+        hsel.setCreated(new Date());
+        hsel.setRoute(routePolicy);
+        try {
+            getSession().save(hsel);
+        } catch (LockAcquisitionException e) {
+            throw new Scheduler.JobProcessorException(e, true);
+        }
+
+        if (__log.isDebugEnabled())
+            __log.debug(hdr + "saved " + hsel);
+    }
+
+    public boolean checkRoute(CorrelationKeySet correlationKeySet) {
+        entering("CorrelatorDaoImpl.checkRoute");
+        Query q = getSession().getNamedQuery(HCorrelatorSelector.SELECT_MESSAGE_ROUTE);
+        q.setEntity("corr",_hobj);
+        q.setString("ckey", correlationKeySet.toCanonicalString());
+        q.setReadOnly(true);
+        return q.list().isEmpty();
+    }
+
+    public String getCorrelatorId() {
+        return _hobj.getCorrelatorId();
+    }
+
+    public void setCorrelatorId(String newId) {
+        _hobj.setCorrelatorId(newId);
+    }
+
+    public void removeRoutes(String routeGroupId, ProcessInstanceDAO target) {
+        entering("CorrelatorDaoImpl.removeRoutes");
+        String hdr = "removeRoutes(" + routeGroupId + ", iid=" + target.getInstanceId() + "): ";
+        __log.debug(hdr);
+        Session session = getSession();
+        Query q = session.createQuery(QRY_DELSELECTORS);
+        q.setString(0, routeGroupId); // groupId
+        q.setEntity(1, ((ProcessInstanceDaoImpl) target).getHibernateObj()); // instance
+        int updates = q.executeUpdate();
+        session.flush(); // explicit flush to ensure route removed
+        if (__log.isDebugEnabled())
+            __log.debug(hdr + "deleted " + updates + " rows");
+    }
+
+    public Collection<CorrelatorMessageDAO> getAllMessages() {
+        Collection<CorrelatorMessageDAO> msgs = new ArrayList<CorrelatorMessageDAO>();
+        for (HCorrelatorMessage correlatorMessage : _hobj.getMessageCorrelations())
+            msgs.add(new CorrelatorMessageDaoImpl(_sm, correlatorMessage));
+        return msgs;
+    }
+
+    public Collection<MessageRouteDAO> getAllRoutes() {
+        Collection<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>();
+        for (HCorrelatorSelector selector : _hobj.getSelectors())
+            routes.add(new MessageRouteDaoImpl(_sm, selector));
+        return routes;
+    }
+
+}
\ No newline at end of file

Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelatorMessageDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelatorMessageDaoImpl.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelatorMessageDaoImpl.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelatorMessageDaoImpl.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.hib.bpel;
+
+import org.apache.ode.bpel.common.CorrelationKey;
+import org.apache.ode.dao.bpel.CorrelatorMessageDAO;
+import org.apache.ode.dao.hib.SessionManager;
+import org.apache.ode.dao.hib.bpel.hobj.HCorrelatorMessage;
+
+public class CorrelatorMessageDaoImpl extends HibernateDao implements CorrelatorMessageDAO {
+
+    private HCorrelatorMessage _hobj;
+
+    public CorrelatorMessageDaoImpl(SessionManager sm, HCorrelatorMessage hobj) {
+        super(sm, hobj);
+        entering("CorrelatorDaoImpl.CorrelatorDaoImpl");
+        _hobj = hobj;
+    }
+
+    public CorrelationKey getCorrelationKey() {
+        return new CorrelationKey(_hobj.getCorrelationKey());
+    }
+
+    public void setCorrelationKey(CorrelationKey ckey) {
+        _hobj.setCorrelationKey(ckey.toCanonicalString());
+    }
+}

Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CriteriaBuilder.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CriteriaBuilder.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CriteriaBuilder.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CriteriaBuilder.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.hib.bpel;
+
+import java.sql.Timestamp;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.common.BpelEventFilter;
+import org.apache.ode.bpel.common.Filter;
+import org.apache.ode.bpel.common.InstanceFilter;
+import org.apache.ode.utils.ISO8601DateParser;
+import org.apache.ode.utils.RelativeDateParser;
+import org.hibernate.Criteria;
+import org.hibernate.Query;
+import org.hibernate.Session;
+import org.hibernate.criterion.Disjunction;
+import org.hibernate.criterion.Property;
+import org.hibernate.criterion.Restrictions;
+
+/**
+ * Class used for converting "filter" objects into Hibernate
+ * {@link org.hibernate.Criteria} objects.
+ */
+class CriteriaBuilder {
+    static final Log __log = LogFactory.getLog(CriteriaBuilder.class);
+
+    /**
+     * Build a HQL query from an instance filter.
+     * @param filter filter
+     */
+    Query buildHQLQuery(Session session, InstanceFilter filter) {
+        Map<String, Object> parameters = new HashMap<String, Object>();
+
+        StringBuffer query = new StringBuffer();
+
+        query.append("select pi from HProcessInstance as pi left join fetch pi.fault ");
+
+        if (filter != null) {
+            // Building each clause
+            ArrayList<String> clauses = new ArrayList<String>();
+
+            // iid filter
+            if ( filter.getIidFilter() != null ) {
+                StringBuffer filters = new StringBuffer();
+                List<String> iids = filter.getIidFilter();
+                for (int m = 0; m < iids.size(); m++) {
+                    filters.append(" pi.id = :iid").append(m);
+                    parameters.put("iid" + m, Long.parseLong(iids.get(m)));
+                    if (m < iids.size() - 1) filters.append(" or");
+                }
+                clauses.add(" (" + filters + ")");
+            }
+
+            // pid filter
+            if (filter.getPidFilter() != null) {
+                StringBuffer filters = new StringBuffer();
+                List<String> pids = filter.getPidFilter();
+                String cmp;
+                if (filter.arePidsNegative()) {
+                    cmp = " != ";
+                } else {
+                    cmp = " = ";
+                }
+                for (int m = 0; m < pids.size(); m++) {
+                    filters.append(" pi.process.processId ").append(cmp).append(" :pid").append(m);
+                    parameters.put("pid" + m, pids.get(m));
+                    if (m < pids.size() - 1) filters.append(" or");
+                }
+                clauses.add(" (" + filters + ")");
+            }
+
+            // name filter
+            if (filter.getNameFilter() != null) {
+                clauses.add(" pi.process.typeName like :pname");
+                parameters.put("pname", filter.getNameFilter().replaceAll("\\*", "%"));
+            }
+
+            // name space filter
+            if (filter.getNamespaceFilter() != null) {
+                clauses.add(" pi.process.typeNamespace like :pnamespace");
+                parameters.put("pnamespace", filter.getNamespaceFilter().replaceAll("\\*", "%"));
+            }
+
+            // started filter
+            if (filter.getStartedDateFilter() != null) {
+                for ( String ds : filter.getStartedDateFilter() ) {
+                    // named parameters not needed as date is parsed and is hence not
+                    // prone to HQL injections
+                    clauses.add(" pi.created " + dateFilter(ds));
+                }
+            }
+
+            // last-active filter
+            if (filter.getLastActiveDateFilter() != null) {
+                for ( String ds : filter.getLastActiveDateFilter() ) {
+                    // named parameters not needed as date is parsed and is hence not
+                    // prone to HQL injections
+                    clauses.add(" pi.lastActiveTime " + dateFilter(ds));
+                }
+            }
+
+            // status filter
+            if (filter.getStatusFilter() != null) {
+                StringBuffer filters = new StringBuffer();
+                List<Short> states = filter.convertFilterState();
+                for (int m = 0; m < states.size(); m++) {
+                    filters.append(" pi.state = :pstate").append(m);
+                    parameters.put("pstate" + m, states.get(m));
+                    if (m < states.size() - 1) filters.append(" or");
+                }
+                clauses.add(" (" + filters.toString() + ")");
+            }
+
+            // $property filter
+            if (filter.getPropertyValuesFilter() != null) {
+                Map<String,String> props = filter.getPropertyValuesFilter();
+                // join to correlation sets
+                query.append(" inner join pi.correlationSets as cs");
+                int i = 0;
+                for (String propKey : props.keySet()) {
+                    i++;
+                    // join to props for each prop
+                    query.append(" inner join cs.properties as csp"+i);
+                    // add clause for prop key and value
+
+                    // spaces have to be escaped, might be better handled in InstanceFilter
+                    String value = props.get(propKey).replaceAll("&#32;", " ");
+                    if (propKey.startsWith("{")) {
+                        String namespace = propKey.substring(1, propKey.lastIndexOf("}"));
+                        clauses.add(" csp" + i + ".name = :cspname" + i +
+                                " and csp" + i + ".namespace = :cspnamespace" + i +
+                                " and csp" + i + ".value = :cspvalue" + i);
+
+                        parameters.put("cspname" + i, propKey.substring(propKey.lastIndexOf("}") + 1, propKey.length()));
+                        parameters.put("cspnamespace" + i, namespace);
+                        parameters.put("cspvalue" + i, value);
+                    } else {
+                        clauses.add(" csp" + i + ".name = :cspname" + i +
+                                " and csp" + i + ".value = :cspvalue" + i);
+
+                        parameters.put("cspname" + i, propKey);
+                        parameters.put("cspvalue" + i, value);
+                    }
+                }
+            }
+
+            // order by
+            StringBuffer orderby = new StringBuffer("");
+            if (filter.getOrders() != null) {
+                orderby.append(" order by");
+                List<String> orders = filter.getOrders();
+                for (int m = 0; m < orders.size(); m++) {
+                    String field = orders.get(m);
+                    String ord = " asc";
+                    if (field.startsWith("-")) {
+                        ord = " desc";
+                    }
+                    String fieldName = " pi.id";
+                    if (field.endsWith("name")) {
+                        fieldName = " pi.process.typeName";
+                    }
+                    if (field.endsWith("namespace")) {
+                        fieldName = " pi.process.typeNamespace";
+                    }
+                    if ( field.endsWith("version")) {
+                        fieldName = " pi.process.version";
+                    }
+                    if ( field.endsWith("status")) {
+                        fieldName = " pi.state";
+                    }
+                    if ( field.endsWith("started")) {
+                        fieldName = " pi.created";
+                    }
+                    if ( field.endsWith("last-active")) {
+                        fieldName = " pi.lastActiveTime";
+                    }
+                    orderby.append(fieldName + ord);
+                    if (m < orders.size() - 1) orderby.append(", ");
+                }
+
+            }
+
+            // Preparing the statement
+            if (clauses.size() > 0) {
+                query.append(" where");
+                for (int m = 0; m < clauses.size(); m++) {
+                    query.append(clauses.get(m));
+                    if (m < clauses.size() - 1) query.append(" and");
+                }
+            }
+
+            query.append(orderby);
+        }
+
+        if (__log.isDebugEnabled()) {
+            __log.debug(query.toString());
+        }
+
+        Query q = session.createQuery(query.toString());
+
+        for (String p : parameters.keySet()) {
+            q.setParameter(p, parameters.get(p));
+        }
+
+        if (filter.getLimit() != 0) {
+            q.setMaxResults(filter.getLimit());
+        }
+
+        return q;
+    }
+
+    private static String dateFilter(String filter) {
+        String date = Filter.getDateWithoutOp(filter);
+        String op = filter.substring(0,filter.indexOf(date));
+        Date dt = null;
+        try {
+            dt = ISO8601DateParser.parse(date);
+        } catch (ParseException e) {
+            e.printStackTrace();
+        }
+        Timestamp ts = new Timestamp(dt.getTime());
+        return op + " '" + ts.toString() + "'";
+    }
+
+
+
+  /**
+   * Build a Hibernate {@link Criteria} from an instance filter.
+   * @param crit target (destination) criteria
+   * @param filter filter
+   */
+  void buildCriteria(Criteria crit, InstanceFilter filter) {
+    Criteria processCrit = crit.createCriteria("process");
+
+    // Filtering on PID
+    List<String> pids = filter.getPidFilter();
+    if (pids != null && pids.size() > 0) {
+        Disjunction disj = Restrictions.disjunction();
+        for (String pid: pids) {
+            if( !filter.arePidsNegative() ) {
+                disj.add(Restrictions.eq("processId", pid));
+            } else {
+                disj.add(Restrictions.ne("processId", pid));
+            }
+        }
+        processCrit.add(disj);
+    }
+
+    List<String> iids = filter.getIidFilter();
+    if (iids != null && iids.size() > 0) {
+        Disjunction disj = Restrictions.disjunction();
+        for (String iid: iids) {
+            disj.add(Restrictions.eq("id", new Long(iid)));
+        }
+        crit.add(disj);
+    }
+
+    // Filtering on name and namespace
+    if (filter.getNameFilter() != null) {
+      processCrit.add(Restrictions.like("typeName", filter.getNameFilter().replaceAll("\\*", "%")));
+    }
+    if (filter.getNamespaceFilter() != null) {
+      processCrit.add(Restrictions.like("typeNamespace", filter.getNamespaceFilter().replaceAll("\\*", "%")));
+    }
+
+    // Specific filter for status (using a disjunction between possible statuses)
+    if (filter.getStatusFilter() != null) {
+      List<Short> statuses = filter.convertFilterState();
+      Disjunction disj = Restrictions.disjunction();
+      for (short status : statuses) {
+        disj.add(Restrictions.eq("state", status));
+      }
+      crit.add(disj);
+    }
+
+    // Specific filter for started and last active dates.
+    if (filter.getStartedDateFilter() != null) {
+      for (String sdf : filter.getStartedDateFilter()) {
+        addFilterOnPrefixedDate(crit, sdf, "created");
+      }
+    }
+    if (filter.getLastActiveDateFilter() != null) {
+      for (String ladf : filter.getLastActiveDateFilter()) {
+        addFilterOnPrefixedDate(crit, ladf,  "lastActiveTime");
+      }
+    }
+
+    // Specific filter for correlation properties
+    if (filter.getPropertyValuesFilter() != null) {
+      Criteria propCrit = crit.createCriteria("correlationSets").createCriteria("properties");
+      for (Map.Entry<String, String> corValue : filter.getPropertyValuesFilter().entrySet()) {
+        String propName = (String)corValue.getKey();
+        if (propName.startsWith("{")) {
+          String namespace = propName.substring(1, propName.lastIndexOf("}"));
+          propName = propName.substring(propName.lastIndexOf("}") + 1, propName.length());
+          propCrit.add(Restrictions.eq("name", propName))
+                  .add(Restrictions.eq("namespace", namespace))
+                  .add(Restrictions.eq("value", corValue.getValue()));
+        } else {
+          propCrit.add(Restrictions.eq("name", corValue.getKey()))
+                  .add(Restrictions.eq("value", corValue.getValue()));
+        }
+      }
+    }
+
+    // Ordering
+    if (filter.orders != null) {
+      for (String key : filter.orders) {
+        boolean ascending = true;
+        String orderKey = key;
+        if (key.startsWith("+") || key.startsWith("-")) {
+          orderKey = key.substring(1, key.length());
+          if (key.startsWith("-")) ascending = false;
+        }
+
+        if ("name".equals(orderKey)) {
+          if (ascending) processCrit.addOrder(Property.forName("typeName").asc());
+          else processCrit.addOrder(Property.forName("typeName").desc());
+        } else if ("namespace".equals(orderKey)) {
+          if (ascending) processCrit.addOrder(Property.forName("typeNamespace").asc());
+          else processCrit.addOrder(Property.forName("typeNamespace").desc());
+        } else if ("pid".equals(orderKey)) {
+          if (ascending) processCrit.addOrder(Property.forName("processId").asc());
+          else processCrit.addOrder(Property.forName("processId").desc());
+        } else if ("version".equals(orderKey)) {
+          if (ascending) processCrit.addOrder(Property.forName("version").asc());
+          else processCrit.addOrder(Property.forName("version").desc());
+        } else if ("status".equals(orderKey)) {
+          if (ascending) crit.addOrder(Property.forName("state").asc());
+          else crit.addOrder(Property.forName("state").desc());
+        } else if ("started".equals(orderKey)) {
+          if (ascending) crit.addOrder(Property.forName("created").asc());
+          else crit.addOrder(Property.forName("created").desc());
+        } else if ("last-active".equals(orderKey)) {
+          if (ascending) crit.addOrder(Property.forName("lastActiveTime").asc());
+          else crit.addOrder(Property.forName("lastActiveTime").desc());
+        }
+      }
+    }
+
+    if (filter.getLimit() > 0) crit.setMaxResults(filter.getLimit());
+  }
+
+  /**
+   * Build criteria for an event filter.
+   * @param crit target criteria
+   * @param efilter event filter
+   */
+  void buildCriteria(Criteria crit, BpelEventFilter efilter) {
+    if (efilter.getTypeFilter() != null)
+      crit.add(Restrictions.like("type", efilter.getTypeFilter().replace('*','%')));
+
+    // Specific filter for started and last active dates.
+    if (efilter.getTimestampFilter() != null) {
+      for (Filter.Restriction<Date> sdf : efilter.getTimestampFilter()) {
+        addFilterOnPrefixedDate(crit, sdf.op, sdf.value, "tstamp");
+      }
+    }
+
+    if (efilter.limit > 0) crit.setMaxResults(efilter.limit);
+  }
+
+  void addScopeFilter(Criteria crit, String scopeId) {
+    crit.add(Restrictions.eq("",scopeId));
+  }
+
+  static void addFilterOnPrefixedDate(Criteria crit, String prefixedDate, String dateAttribute) {
+    Date realDate = null;
+    try {
+      realDate = parseDateExpression(getDateWithoutOp(prefixedDate));
+    } catch (ParseException e) {
+      // Never occurs, the deploy date format is pre-validated by the filter
+    }
+    addFilterOnPrefixedDate(crit,prefixedDate,realDate,dateAttribute);
+  }
+
+  private static Date parseDateExpression(String date) throws ParseException {
+      if( date.toLowerCase().startsWith("-") && date.length() > 1 ) {
+          return RelativeDateParser.parseRelativeDate(date.substring(1));
+      } else {
+          return ISO8601DateParser.parse(date);
+      }
+  }
+
+  static void addFilterOnPrefixedDate(Criteria crit, String op, Date date, String dateAttribute) {
+    if (op.startsWith("=")) {
+      crit.add(Restrictions.eq(dateAttribute, date));
+    } else if (op.startsWith("<=")) {
+      crit.add(Restrictions.le(dateAttribute, date));
+    } else if (op.startsWith(">=")) {
+      crit.add(Restrictions.ge(dateAttribute, date));
+    } else if (op.startsWith("<")) {
+      crit.add(Restrictions.lt(dateAttribute, date));
+    } else if (op.startsWith(">")) {
+      crit.add(Restrictions.gt(dateAttribute, date));
+    }
+  }
+
+  private static String getDateWithoutOp(String ddf) {
+    return Filter.getDateWithoutOp(ddf);
+  }
+
+
+}

Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/FaultDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/FaultDAOImpl.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/FaultDAOImpl.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/FaultDAOImpl.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.hib.bpel;
+
+import java.io.IOException;
+
+import javax.xml.namespace.QName;
+
+import org.apache.ode.dao.bpel.FaultDAO;
+import org.apache.ode.dao.hib.SessionManager;
+import org.apache.ode.dao.hib.bpel.hobj.HFaultData;
+import org.apache.ode.utils.DOMUtils;
+import org.apache.ode.utils.QNameUtils;
+import org.w3c.dom.Element;
+import org.xml.sax.SAXException;
+
+/**
+ * Hibernate based {@link FaultDAO} implementation
+ */
+public class FaultDAOImpl extends HibernateDao implements FaultDAO {
+
+
+    HFaultData _self;
+
+    public FaultDAOImpl(SessionManager sm, HFaultData fault) {
+        super(sm, fault);
+        entering("FaultDAOImpl.FaultDAOImpl");
+        _self = fault;
+    }
+
+    public QName getName() {
+        return QNameUtils.toQName(_self.getName());
+    }
+
+    public Element getData() {
+        entering("FaultDAOImpl.getData");
+        if (_self.getData() == null) return null;
+        try {
+            return DOMUtils.stringToDOM(_self.getData());
+        } catch (SAXException e) {
+            throw new RuntimeException(e);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public String getExplanation() {
+        return _self.getExplanation();
+    }
+
+    public int getLineNo() {
+        return _self.getLineNo();
+    }
+
+    public int getActivityId() {
+        return _self.getActivityId();
+    }
+}

Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/HibernateDao.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/HibernateDao.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/HibernateDao.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/HibernateDao.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.dao.hib.bpel;
+
+import org.apache.ode.dao.hib.SessionManager;
+import org.apache.ode.dao.hib.bpel.hobj.HObject;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.hibernate.Query;
+import org.hibernate.Session;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Base class for our DAO objects.
+ * <p>
+ * All subclass methods that might trigger SQL queries should log a message in the log category 'org.apache.ode.bpel.DAO' when entered.
+ * A typical message could be "className.methodName". <br/>
+ * Typical candidates are setters, finders and getters of entities. Getters of simple properties won't provide relevant information.
+ */
+public abstract class HibernateDao {
+
+    // logger used by subclasses to track entered methods that may trigger sql query
+    // we don't use the package name to avoid interferences with other logging info.
+    static final Log logDao = LogFactory.getLog("org.apache.ode.bpel.DAO");
+
+    protected final SessionManager _sm;
+    protected final HObject _hobj;
+
+    protected HibernateDao(SessionManager sessionManager, HObject hobj) {
+        _sm = sessionManager;
+        _hobj = hobj;
+    }
+
+    void entering(String msg){
+        // add a prefix to be parser friendly
+        if(logDao.isDebugEnabled()) logDao.debug("entering "+msg);
+    }
+
+    void leaving(String msg){
+        if(logDao.isDebugEnabled()) logDao.debug("leaving "+msg);
+    }
+
+    /**
+     * @see org.apache.ode.utils.dao.DAO#getDHandle()
+     */
+    public Serializable getDHandle() {
+        return new HibernateHandle(getClass(), _hobj.getClass(), getSession().getIdentifier(_hobj));
+    }
+
+    protected Session getSession() {
+        return _sm.getSession();
+    }
+
+    public HObject getHibernateObj() {
+        return _hobj;
+    }
+
+    public Serializable getId() {
+        if( _hobj != null ) {
+            return _hobj.getId();
+        }
+        return null;
+    }
+
+    public boolean equals(Object obj) {
+        assert obj instanceof HibernateDao;
+        return _hobj.getId().equals(((HibernateDao) obj)._hobj.getId());
+    }
+
+    public int hashCode() {
+        return _hobj.getId().hashCode();
+    }
+
+    protected void update() {
+        _sm.getSession().update(_hobj);
+    }
+
+    @SuppressWarnings("unchecked")
+    protected void deleteByIds(Class entity, List<Long> ids) {
+        deleteByColumn(entity, "id", ids);
+    }
+
+    @SuppressWarnings("unchecked")
+    protected void deleteByColumn(Class entity, String column, List<Long> values) {
+        if( values != null && values.size() > 0 ) {
+            final String delete = "delete from "+entity.getName()+" as e where e."+column+" in (:values)";
+
+            // some databases don't like long lists of values with IN operator
+            // so we delete in batches.  Oracle 9i, for instance, doesn't support
+            // more than 1000 -- we opt to be conservative.
+            final int batchSize = 100;
+
+            int index = 0;
+            while (index < values.size()) {
+                List<Long> subList = values.subList(index, Math.min(index+batchSize, values.size()));
+                Query query = getSession().createQuery(delete);
+                query.setParameterList("values", subList);
+                query.executeUpdate();
+                index += batchSize;
+            }
+        }
+    }
+}

Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/HibernateHandle.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/HibernateHandle.java?rev=986561&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/HibernateHandle.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/HibernateHandle.java Wed Aug 18 04:12:49 2010
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.dao.hib.bpel;
+
+import java.io.Serializable;
+
+/**
+ * Serializable class for obtain a reference to a hibernate POJO
+ */
+class HibernateHandle implements Serializable{
+
+  private static final long serialVersionUID = 1L;
+  private Class _daoCls;
+    private Class _hibCls;
+  private Serializable _id;
+  /**
+     *
+     */
+    public HibernateHandle(Class daoCls, Class hibCls, Serializable id) {
+        _daoCls = daoCls;
+    _hibCls = hibCls;
+    _id = id;
+    }
+
+  public Class getHibernateClass(){
+    return _hibCls;
+  }
+
+  public Class getDAOClass(){
+    return _daoCls;
+  }
+
+  public Serializable getId(){
+    return _id;
+  }
+}