You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by je...@apache.org on 2010/05/21 17:41:07 UTC

svn commit: r947046 [5/12] - in /ode/trunk: ./ axis2/src/main/java/org/apache/ode/axis2/ bpel-dao/ bpel-dao/src/main/java/org/apache/ode/dao/ bpel-dao/src/main/java/org/apache/ode/dao/bpel/ bpel-dao/src/main/java/org/apache/ode/dao/store/ bpel-epr/ bpe...

Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/DataSourceConnectionProvider.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/DataSourceConnectionProvider.java?rev=947046&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/DataSourceConnectionProvider.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/DataSourceConnectionProvider.java Fri May 21 15:40:59 2010
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.hib;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Properties;
+
+import org.apache.ode.utils.DbIsolation;
+
+import org.hibernate.HibernateException;
+import org.hibernate.connection.ConnectionProvider;
+
+
+public class DataSourceConnectionProvider implements ConnectionProvider {
+
+  private Properties _props;
+  
+  public DataSourceConnectionProvider() {
+  }
+  
+  public void configure(Properties props) throws HibernateException {
+    _props = props;
+  }
+
+  public Connection getConnection() throws SQLException {
+    Connection c = SessionManager.getConnection(_props);
+    DbIsolation.setIsolationLevel(c);
+    return c;
+  }
+
+  public void closeConnection(Connection con) throws SQLException {
+    con.close();
+  }
+
+  public void close() throws HibernateException {
+
+  }
+
+  public boolean supportsAggressiveRelease() {
+    return true;
+  }
+
+}

Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/HibernateTransactionManagerLookup.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/HibernateTransactionManagerLookup.java?rev=947046&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/HibernateTransactionManagerLookup.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/HibernateTransactionManagerLookup.java Fri May 21 15:40:59 2010
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.dao.hib;
+
+import java.util.Properties;
+
+import javax.transaction.TransactionManager;
+import javax.transaction.Transaction;
+
+import org.hibernate.HibernateException;
+import org.hibernate.transaction.TransactionManagerLookup;
+
+/**
+ * Implementation of the {@link org.hibernate.transaction.TransactionManagerLookup} interface that
+ * uses {@link SessionManager} to obtain the JTA {@link TransactionManager} object.
+ */
+public class HibernateTransactionManagerLookup implements TransactionManagerLookup {
+
+	/** Constructor. */
+	public HibernateTransactionManagerLookup() {
+		super();
+	}
+
+	public TransactionManager getTransactionManager(Properties props)
+			throws HibernateException {
+		return SessionManager.getTransactionManager(props);
+	}
+
+	public String getUserTransactionName() {
+		return null;
+	}
+
+    public Object getTransactionIdentifier(Transaction transaction) {
+        return transaction;
+    }
+}

Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/JotmTransaction.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/JotmTransaction.java?rev=947046&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/JotmTransaction.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/JotmTransaction.java Fri May 21 15:40:59 2010
@@ -0,0 +1,360 @@
+/*
+ * Hibernate, Relational Persistence for Idiomatic Java
+ *
+ * Copyright (c) 2008, Red Hat Middleware LLC or third-party contributors as
+ * indicated by the @author tags or express copyright attribution
+ * statements applied by the authors.  All third-party contributions are
+ * distributed under license by Red Hat Middleware LLC.
+ *
+ * This copyrighted material is made available to anyone wishing to use, modify,
+ * copy, or redistribute it subject to the terms and conditions of the GNU
+ * Lesser General Public License, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this distribution; if not, write to:
+ * Free Software Foundation, Inc.
+ * 51 Franklin Street, Fifth Floor
+ * Boston, MA  02110-1301  USA
+ *
+ */
+package org.apache.ode.dao.hib;
+
+import javax.transaction.Status;
+import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
+import javax.transaction.UserTransaction;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.hibernate.HibernateException;
+import org.hibernate.Transaction;
+import org.hibernate.TransactionException;
+import org.hibernate.jdbc.JDBCContext;
+import org.hibernate.transaction.TransactionFactory;
+import org.hibernate.util.JTAHelper;
+
+/**
+ * {@link Transaction} implementation based on transaction management through
+ * a JTA {@link UserTransaction}.  Similar to {@link CMTTransaction}, except
+ * here we are actually managing the transactions through the Hibernate
+ * transaction mechanism.
+ *
+ * @author Gavin King
+ * @author Steve Ebersole
+ * @author Les Hazlewood
+ * 
+ * Scraped from org.hibernate
+ */
+public class JotmTransaction implements Transaction {
+    private static final Log log = LogFactory.getLog( JotmTransaction.class );
+
+    private final JDBCContext jdbcContext;
+    private final TransactionFactory.Context transactionContext;
+
+    private UserTransaction userTransaction;
+    private boolean newTransaction;
+    private boolean begun;
+    private boolean commitFailed;
+    private boolean commitSucceeded;
+    private boolean callback;
+
+    public JotmTransaction(
+            UserTransaction userTransaction,
+            JDBCContext jdbcContext,
+            TransactionFactory.Context transactionContext) {
+        this.jdbcContext = jdbcContext;
+        this.transactionContext = transactionContext;
+        this.userTransaction = userTransaction;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void begin() throws HibernateException {
+        if ( begun ) {
+            return;
+        }
+        if ( commitFailed ) {
+            throw new TransactionException( "cannot re-start transaction after failed commit" );
+        }
+
+        log.debug( "begin" );
+
+        try {
+            newTransaction = userTransaction.getStatus() == Status.STATUS_NO_TRANSACTION;
+            if ( newTransaction ) {
+                userTransaction.begin();
+                log.debug( "Began a new JTA transaction" );
+            }
+        }
+        catch ( Exception e ) {
+            log.error( "JTA transaction begin failed", e );
+            throw new TransactionException( "JTA transaction begin failed", e );
+        }
+
+        /*if (newTransaction) {
+            // don't need a synchronization since we are committing
+            // or rolling back the transaction ourselves - assuming
+            // that we do no work in beforeTransactionCompletion()
+            synchronization = false;
+        }*/
+
+        boolean synchronization = jdbcContext.registerSynchronizationIfPossible();
+
+        if ( !newTransaction && !synchronization ) {
+            log.warn( "You should set hibernate.transaction.manager_lookup_class if cache is enabled" );
+        }
+
+        if ( !synchronization ) {
+            //if we could not register a synchronization,
+            //do the before/after completion callbacks
+            //ourself (but we need to let jdbcContext
+            //know that this is what we are going to
+            //do, so it doesn't keep trying to register
+            //synchronizations)
+            callback = jdbcContext.registerCallbackIfNecessary();
+        }
+
+        begun = true;
+        commitSucceeded = false;
+
+        jdbcContext.afterTransactionBegin( this );
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void commit() throws HibernateException {
+        if ( !begun ) {
+            throw new TransactionException( "Transaction not successfully started" );
+        }
+
+        log.debug( "commit" );
+
+        boolean flush = !transactionContext.isFlushModeNever()
+                && ( callback || !transactionContext.isFlushBeforeCompletionEnabled() );
+
+        if ( flush ) {
+            transactionContext.managedFlush(); //if an exception occurs during flush, user must call rollback()
+        }
+
+        if ( callback && newTransaction ) {
+            jdbcContext.beforeTransactionCompletion( this );
+        }
+
+        closeIfRequired();
+
+        if ( newTransaction ) {
+            try {
+                userTransaction.commit();
+                commitSucceeded = true;
+                log.debug( "Committed JTA UserTransaction" );
+            }
+            catch ( Exception e ) {
+                commitFailed = true; // so the transaction is already rolled back, by JTA spec
+                log.error( "JTA commit failed", e );
+                throw new TransactionException( "JTA commit failed: ", e );
+            }
+            finally {
+                afterCommitRollback();
+            }
+        }
+        else {
+            // this one only really needed for badly-behaved applications!
+            // (if the TransactionManager has a Sychronization registered,
+            // its a noop)
+            // (actually we do need it for downgrading locks)
+            afterCommitRollback();
+        }
+
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void rollback() throws HibernateException {
+        if ( !begun && !commitFailed ) {
+            throw new TransactionException( "Transaction not successfully started" );
+        }
+
+        log.debug( "rollback" );
+
+        try {
+            closeIfRequired();
+        }
+        catch ( Exception e ) {
+            // swallow it, and continue to roll back JTA transaction
+            log.error( "could not close session during rollback", e );
+        }
+
+        try {
+            if ( newTransaction ) {
+                if ( !commitFailed ) {
+                    userTransaction.rollback();
+                    log.debug( "Rolled back JTA UserTransaction" );
+                }
+            }
+            else {
+                userTransaction.setRollbackOnly();
+                log.debug( "set JTA UserTransaction to rollback only" );
+            }
+        }
+        catch ( Exception e ) {
+            log.error( "JTA rollback failed", e );
+            throw new TransactionException( "JTA rollback failed", e );
+        }
+        finally {
+            afterCommitRollback();
+        }
+    }
+
+    private static final int NULL = Integer.MIN_VALUE;
+
+    private void afterCommitRollback() throws TransactionException {
+
+        begun = false;
+        // this method is a noop if there is a Synchronization!
+        if ( callback ) {
+            if ( !newTransaction ) {
+                log.warn( "You should set hibernate.transaction.manager_lookup_class if cache is enabled" );
+            }
+            int status = NULL;
+            try {
+                status = userTransaction.getStatus();
+            }
+            catch ( Exception e ) {
+                log.error( "Could not determine transaction status after commit", e );
+                throw new TransactionException( "Could not determine transaction status after commit", e );
+            }
+            finally {
+                jdbcContext.afterTransactionCompletion( status == Status.STATUS_COMMITTED, this );
+            }
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public boolean wasRolledBack() throws TransactionException {
+        final int status;
+        try {
+            status = userTransaction.getStatus();
+        }
+        catch ( SystemException se ) {
+            log.error( "Could not determine transaction status", se );
+            throw new TransactionException( "Could not determine transaction status", se );
+        }
+        if ( status == Status.STATUS_UNKNOWN ) {
+            throw new TransactionException( "Could not determine transaction status" );
+        }
+        else {
+            return JTAHelper.isRollback( status );
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public boolean wasCommitted() throws TransactionException {
+        final int status;
+        try {
+            status = userTransaction.getStatus();
+        }
+        catch ( SystemException se ) {
+            log.error( "Could not determine transaction status", se );
+            throw new TransactionException( "Could not determine transaction status: ", se );
+        }
+        if ( status == Status.STATUS_UNKNOWN ) {
+            throw new TransactionException( "Could not determine transaction status" );
+        }
+        else {
+            return status == Status.STATUS_COMMITTED;
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public boolean isActive() throws TransactionException {
+        if ( !begun || commitFailed || commitSucceeded ) {
+            return false;
+        }
+
+        final int status;
+        try {
+            status = userTransaction.getStatus();
+        }
+        catch ( SystemException se ) {
+            log.error( "Could not determine transaction status", se );
+            throw new TransactionException( "Could not determine transaction status: ", se );
+        }
+        if ( status == Status.STATUS_UNKNOWN ) {
+            throw new TransactionException( "Could not determine transaction status" );
+        }
+        else {
+            return status == Status.STATUS_ACTIVE;
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void registerSynchronization(Synchronization sync) throws HibernateException {
+        if ( getTransactionManager() == null ) {
+            throw new IllegalStateException( "JTA TransactionManager not available" );
+        }
+        else {
+            try {
+                getTransactionManager().getTransaction().registerSynchronization( sync );
+            }
+            catch ( Exception e ) {
+                throw new TransactionException( "could not register synchronization", e );
+            }
+        }
+    }
+
+    /**
+     * Getter for property 'transactionManager'.
+     *
+     * @return Value for property 'transactionManager'.
+     */
+    private TransactionManager getTransactionManager() {
+        return transactionContext.getFactory().getTransactionManager();
+    }
+
+    private void closeIfRequired() throws HibernateException {
+        boolean close = callback &&
+                transactionContext.shouldAutoClose() &&
+                !transactionContext.isClosed();
+        if ( close ) {
+            transactionContext.managedClose();
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void setTimeout(int seconds) {
+        try {
+            userTransaction.setTransactionTimeout( seconds );
+        }
+        catch ( SystemException se ) {
+            throw new TransactionException( "could not set transaction timeout", se );
+        }
+    }
+
+    /**
+     * Getter for property 'userTransaction'.
+     *
+     * @return Value for property 'userTransaction'.
+     */
+    protected UserTransaction getUserTransaction() {
+        return userTransaction;
+    }
+}

Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/JotmTransactionFactory.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/JotmTransactionFactory.java?rev=947046&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/JotmTransactionFactory.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/JotmTransactionFactory.java Fri May 21 15:40:59 2010
@@ -0,0 +1,309 @@
+/*
+ * Hibernate, Relational Persistence for Idiomatic Java
+ *
+ * Copyright (c) 2008, Red Hat Middleware LLC or third-party contributors as
+ * indicated by the @author tags or express copyright attribution
+ * statements applied by the authors.  All third-party contributions are
+ * distributed under license by Red Hat Middleware LLC.
+ *
+ * This copyrighted material is made available to anyone wishing to use, modify,
+ * copy, or redistribute it subject to the terms and conditions of the GNU
+ * Lesser General Public License, as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this distribution; if not, write to:
+ * Free Software Foundation, Inc.
+ * 51 Franklin Street, Fifth Floor
+ * Boston, MA  02110-1301  USA
+ *
+ */
+package org.apache.ode.dao.hib;
+
+import java.util.Properties;
+
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import javax.transaction.HeuristicMixedException;
+import javax.transaction.HeuristicRollbackException;
+import javax.transaction.NotSupportedException;
+import javax.transaction.RollbackException;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
+import javax.transaction.UserTransaction;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.hibernate.ConnectionReleaseMode;
+import org.hibernate.HibernateException;
+import org.hibernate.Transaction;
+import org.hibernate.TransactionException;
+import org.hibernate.jdbc.JDBCContext;
+import org.hibernate.cfg.Environment;
+import org.hibernate.transaction.TransactionFactory;
+import org.hibernate.transaction.TransactionManagerLookup;
+import org.hibernate.transaction.TransactionManagerLookupFactory;
+import org.hibernate.util.NamingHelper;
+import org.hibernate.util.JTAHelper;
+
+/**
+ * Factory for {@link JotmTransaction} instances.
+ * <p/>
+ * To be completely accurate to the JTA spec, JTA implementations should
+ * publish their contextual {@link UserTransaction} reference into JNDI.
+ * However, in practice there are quite a few <tt>stand-alone</tt>
+ * implementations intended for use outside of J2EE/JEE containers and
+ * which therefore do not publish their {@link UserTransaction} references
+ * into JNDI but which otherwise follow the aspects of the JTA specification.
+ * This {@link TransactionFactory} implementation can support both models.
+ * <p/>
+ * For complete JTA implementations (including dependence on JNDI), the
+ * {@link UserTransaction} reference is obtained by a call to
+ * {@link #resolveInitialContext}.  Hibernate will then attempt to locate the
+ * {@link UserTransaction} within this resolved
+ * {@link InitialContext} based on the namespace returned by
+ * {@link #resolveUserTransactionName}.
+ * <p/>
+ * For the so-called <tt>stand-alone</tt> implementations, we do not care at
+ * all about the JNDI aspects just described.  Here, the implementation would
+ * have a specific manner to obtain a reference to its contextual
+ * {@link UserTransaction}; usually this would be a static code reference, but
+ * again it varies.  Anyway, for each implementation the integration would need
+ * to override the {@link #getUserTransaction} method and return the appropriate
+ * thing.
+ *
+ * @author Gavin King
+ * @author Steve Ebersole
+ * @author Les Hazlewood
+ * 
+ * Scraped from org.hibernate
+ */
+public class JotmTransactionFactory implements TransactionFactory {
+    public static final String DEFAULT_USER_TRANSACTION_NAME = "java:comp/UserTransaction";
+    private static final Log log = LogFactory.getLog( JotmTransactionFactory.class );
+
+    protected InitialContext initialContext;
+    protected String userTransactionName;
+
+    private TransactionManager txManager;
+    
+    /**
+     * Configure this transaction factory.  Specifically here we are attempting to
+     * resolve both an {@link #getInitialContext InitialContext} as well as the
+     * {@link #getUserTransactionName() JNDI namespace} for the {@link UserTransaction}.
+     *
+     * @param props The configuration properties
+     *
+     * @exception HibernateException
+     */
+    public void configure(Properties props) throws HibernateException {
+        this.initialContext = resolveInitialContext( props );
+        this.userTransactionName = resolveUserTransactionName( props );
+        log.debug( "Configured JTATransactionFactory to use [" + userTransactionName + "] for UserTransaction JDNI namespace" );
+        txManager = new HibernateTransactionManagerLookup().getTransactionManager(props);
+    }
+
+    /**
+     * Given the lot of Hibernate configuration properties, resolve appropriate
+     * reference to JNDI {@link InitialContext}.
+     * <p/>
+     * In general, the properties in which we are interested here all begin with
+     * <tt>hibernate.jndi</tt>.  Especially important depending on your
+     * environment are {@link Environment#JNDI_URL hibernate.jndi.url} and
+     *  {@link Environment#JNDI_CLASS hibernate.jndi.class}
+     *
+     * @param properties The Hibernate config properties.
+     * @return The resolved InitialContext.
+     */
+    protected final InitialContext resolveInitialContext(Properties properties) {
+        try {
+            return NamingHelper.getInitialContext( properties );
+        }
+        catch ( NamingException ne ) {
+            throw new HibernateException( "Could not obtain initial context", ne );
+        }
+    }
+
+    /**
+     * Given the lot of Hibernate configuration properties, resolve appropriate
+     * JNDI namespace to use for {@link UserTransaction} resolution.
+     * <p/>
+     * We determine the namespace to use by<ol>
+     * <li>Any specified {@link Environment#USER_TRANSACTION jta.UserTransaction} config property</li>
+     * <li>If a {@link TransactionManagerLookup} was indicated, use its
+     * {@link TransactionManagerLookup#getUserTransactionName}</li>
+     * <li>finally, as a last resort, we use {@link #DEFAULT_USER_TRANSACTION_NAME}</li>
+     * </ol>
+     *
+     * @param properties The Hibernate config properties.
+     * @return The resolved {@link UserTransaction} namespace
+     */
+    protected final String resolveUserTransactionName(Properties properties) {
+        String utName = properties.getProperty( Environment.USER_TRANSACTION );
+        if ( utName == null ) {
+            TransactionManagerLookup lookup = TransactionManagerLookupFactory.getTransactionManagerLookup( properties );
+            if ( lookup != null ) {
+                utName = lookup.getUserTransactionName();
+            }
+        }
+        return utName == null ? DEFAULT_USER_TRANSACTION_NAME : utName;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public Transaction createTransaction(JDBCContext jdbcContext, Context transactionContext)
+            throws HibernateException {
+        /*
+         * JTA TransactionManager returns a JTA transaction. We need a user transaction to use
+         * hibernate's JTATransactionFactory.
+         */
+        // UserTransaction ut = getUserTransaction();
+        UserTransaction ut;
+        try {
+            ut = new UserTransaction() {
+                javax.transaction.Transaction transaction = txManager.getTransaction();
+                public void begin() throws NotSupportedException, SystemException {
+                    // TODO Auto-generated method stub
+                }
+
+                public void commit() throws HeuristicMixedException,
+                        HeuristicRollbackException, IllegalStateException,
+                        RollbackException, SecurityException, SystemException {
+                    transaction.commit();
+                }
+
+                public int getStatus() throws SystemException {
+                    // TODO Auto-generated method stub
+                    return transaction.getStatus();
+                }
+
+                public void rollback() throws IllegalStateException,
+                        SecurityException, SystemException {
+                    // TODO Auto-generated method stub
+                    transaction.rollback();
+                }
+
+                public void setRollbackOnly() throws IllegalStateException,
+                        SystemException {
+                    // TODO Auto-generated method stub
+                    transaction.setRollbackOnly();
+                }
+
+                public void setTransactionTimeout(int i) throws SystemException {
+                    // TODO Auto-generated method stub
+                }
+            };
+        } catch (SystemException e) {
+            throw new HibernateException(e);
+        }
+        
+        return new JotmTransaction( ut, jdbcContext, transactionContext );
+    }
+
+    /**
+     * Get the {@link UserTransaction} reference.
+     *
+     * @return The appropriate {@link UserTransaction} reference.
+     */
+    protected UserTransaction getUserTransaction() {
+        final String utName = getUserTransactionName();
+        log.debug( "Attempting to locate UserTransaction via JNDI [" + utName + "]");
+
+        try {
+            UserTransaction ut = ( UserTransaction ) getInitialContext().lookup( utName );
+            if ( ut == null ) {
+                throw new TransactionException( "Naming service lookup for UserTransaction returned null [" + utName +"]" );
+            }
+
+            log.trace( "Obtained UserTransaction" );
+
+            return ut;
+        }
+        catch ( NamingException ne ) {
+            throw new TransactionException( "Could not find UserTransaction in JNDI [" + utName + "]", ne );
+        }
+    }
+
+    /**
+     * Getter for property 'initialContext'.
+     *
+     * @return Value for property 'initialContext'.
+     */
+    protected InitialContext getInitialContext() {
+        return initialContext;
+    }
+
+    /**
+     * Getter for property 'userTransactionName'.
+     * The algorithm here is
+     *
+     * @return Value for property 'userTransactionName'.
+     */
+    protected String getUserTransactionName() {
+        return userTransactionName;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public ConnectionReleaseMode getDefaultReleaseMode() {
+        return ConnectionReleaseMode.AFTER_STATEMENT;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public boolean isTransactionManagerRequired() {
+        return false;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public boolean areCallbacksLocalToHibernateTransactions() {
+        return false;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public boolean isTransactionInProgress(
+            JDBCContext jdbcContext,
+            Context transactionContext,
+            Transaction transaction) {
+        try {
+            // Essentially:
+            // 1) If we have a local (Hibernate) transaction in progress
+            //      and it already has the UserTransaction cached, use that
+            //      UserTransaction to determine the status.
+            // 2) If a transaction manager has been located, use
+            //      that transaction manager to determine the status.
+            // 3) Finally, as the last resort, try to lookup the
+            //      UserTransaction via JNDI and use that to determine the
+            //      status.
+            if ( transaction != null ) {
+                UserTransaction ut = ( ( JotmTransaction ) transaction ).getUserTransaction();
+                if ( ut != null ) {
+                    return JTAHelper.isInProgress( ut.getStatus() );
+                }
+            }
+
+            if ( jdbcContext.getFactory().getTransactionManager() != null ) {
+                return JTAHelper.isInProgress( jdbcContext.getFactory().getTransactionManager().getStatus() );
+            }
+            else {
+                UserTransaction ut = getUserTransaction();
+                return ut != null && JTAHelper.isInProgress( ut.getStatus() );
+            }
+        }
+        catch ( SystemException se ) {
+            throw new TransactionException( "Unable to check transaction status", se );
+        }
+    }
+
+}

Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/NativeHiLoGenerator.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/NativeHiLoGenerator.java?rev=947046&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/NativeHiLoGenerator.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/NativeHiLoGenerator.java Fri May 21 15:40:59 2010
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.hib;
+
+import java.io.Serializable;
+import java.util.Properties;
+
+import org.hibernate.HibernateException;
+import org.hibernate.MappingException;
+import org.hibernate.dialect.Dialect;
+import org.hibernate.engine.SessionImplementor;
+import org.hibernate.id.Configurable;
+import org.hibernate.id.IdentifierGenerator;
+import org.hibernate.id.PersistentIdentifierGenerator;
+import org.hibernate.id.SequenceHiLoGenerator;
+import org.hibernate.id.TableHiLoGenerator;
+import org.hibernate.type.Type;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class NativeHiLoGenerator implements IdentifierGenerator, PersistentIdentifierGenerator, Configurable {
+    private static final Log __log = LogFactory.getLog(NativeHiLoGenerator.class);
+    private IdentifierGenerator _proxy;
+
+    public NativeHiLoGenerator() {
+        super();
+    }
+
+    public Serializable generate(SessionImplementor session, Object object) throws HibernateException {
+        return _proxy.generate(session, object);
+    }
+
+    public Object generatorKey() {
+        if (_proxy instanceof PersistentIdentifierGenerator)
+            return ((PersistentIdentifierGenerator) _proxy).generatorKey();
+        else
+            return this;
+    }
+
+    public String[] sqlCreateStrings(Dialect dialect) throws HibernateException {
+        if (_proxy instanceof PersistentIdentifierGenerator)
+            return ((PersistentIdentifierGenerator) _proxy).sqlCreateStrings(dialect);
+        else
+            return new String[] {};
+    }
+
+    public String[] sqlDropStrings(Dialect dialect) throws HibernateException {
+        if (_proxy instanceof PersistentIdentifierGenerator)
+            return ((PersistentIdentifierGenerator) _proxy).sqlDropStrings(dialect);
+        else
+            return null;
+    }
+
+    public void configure(Type type, Properties params, Dialect dialect) throws MappingException {
+        Class generatorClass = null;
+        if (dialect.supportsSequences()) {
+            __log.debug("Using SequenceHiLoGenerator");
+            generatorClass = SequenceHiLoGenerator.class;
+        } else {
+            generatorClass = TableHiLoGenerator.class;
+            __log.debug("Using native dialect generator " + generatorClass);
+        }
+
+        IdentifierGenerator g = null;
+        try {
+            g = (IdentifierGenerator) generatorClass.newInstance();
+        } catch (Exception e) {
+            throw new MappingException("", e);
+        }
+
+        if (g instanceof Configurable)
+            ((Configurable) g).configure(type, params, dialect);
+
+        this._proxy = g;
+    }
+}

Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/SessionManager.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/SessionManager.java?rev=947046&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/SessionManager.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/SessionManager.java Fri May 21 15:40:59 2010
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.dao.hib;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.dao.hib.bpel.hobj.HActivityRecovery;
+import org.apache.ode.dao.hib.bpel.hobj.HBpelEvent;
+import org.apache.ode.dao.hib.bpel.hobj.HCorrelationProperty;
+import org.apache.ode.dao.hib.bpel.hobj.HCorrelationSet;
+import org.apache.ode.dao.hib.bpel.hobj.HCorrelator;
+import org.apache.ode.dao.hib.bpel.hobj.HCorrelatorMessage;
+import org.apache.ode.dao.hib.bpel.hobj.HCorrelatorSelector;
+import org.apache.ode.dao.hib.bpel.hobj.HFaultData;
+import org.apache.ode.dao.hib.bpel.hobj.HMessage;
+import org.apache.ode.dao.hib.bpel.hobj.HMessageExchange;
+import org.apache.ode.dao.hib.bpel.hobj.HMessageExchangeProperty;
+import org.apache.ode.dao.hib.bpel.hobj.HPartnerLink;
+import org.apache.ode.dao.hib.bpel.hobj.HProcess;
+import org.apache.ode.dao.hib.bpel.hobj.HProcessInstance;
+import org.apache.ode.dao.hib.bpel.hobj.HScope;
+import org.apache.ode.dao.hib.bpel.hobj.HVariableProperty;
+import org.apache.ode.dao.hib.bpel.hobj.HXmlData;
+import org.apache.ode.dao.hib.store.hobj.DeploymentUnitDaoImpl;
+import org.apache.ode.dao.hib.store.hobj.ProcessConfDaoImpl;
+import org.apache.ode.dao.hib.store.hobj.VersionTrackerDAOImpl;
+import org.apache.ode.utils.uuid.UUID;
+import org.hibernate.HibernateException;
+import org.hibernate.MappingException;
+import org.hibernate.Session;
+import org.hibernate.SessionFactory;
+import org.hibernate.cfg.Configuration;
+import org.hibernate.cfg.Environment;
+
+/**
+ * Manages hibernate sessions, and their association with 
+ * a transaction thread.  Uses a ThreadLocal strategy for 
+ * managing sessions.
+ */
+public class SessionManager {
+    private static final Log __log = LogFactory.getLog(SessionManager.class);
+
+    public static final String PROP_GUID = "ode.hibernate.guid";
+
+    private static final Map<String, TransactionManager> _txManagers =
+            Collections.synchronizedMap(new HashMap<String, TransactionManager>());
+    private static final Map<String, DataSource> _dataSources =
+            Collections.synchronizedMap(new HashMap<String,DataSource>());
+
+    private static final String[] CANNOT_JOIN_FOR_UPDATE_DIALECTS =
+            {"org.hibernate.dialect.IngresDialect"};
+
+    private final String _uuid = new UUID().toString();
+    private final TransactionManager _txManager;
+    private final SessionFactory _sessionFactory;
+    private boolean _canJoinForUpdate = true;
+    
+    private final TxContext _ctx;
+    
+   public SessionManager(Properties env, DataSource ds, TransactionManager tx) throws HibernateException {
+        this(getDefaultConfiguration(), env, ds, tx);
+    }
+
+    /** Inaccessible constructor. */
+    public SessionManager(Configuration conf, Properties env, DataSource ds, TransactionManager tx) throws HibernateException {
+        
+    	if (tx!=null){
+            _ctx = new HibernateJtaTxContext();
+        }else{
+            _ctx = new HibernateNonTxContext();
+        }
+
+        _txManager = tx;
+        _txManagers.put(_uuid,tx);
+        _dataSources.put(_uuid,ds);
+
+        _sessionFactory = conf.setProperties(env).setProperty(PROP_GUID, _uuid).buildSessionFactory();
+
+        /*
+        Some Hibernate dialects (like IngresDialect) do not support update for join.
+        We need to distinguish them and explicitly define subqueries, otherwise Hibernate
+        implicitly generates joins which causes problems during update for such DBMS.
+        See org.apache.ode.daohib.bpel.CorrelatorDaoImpl for instance.
+        */
+        String currentHibDialect = env.getProperty(Environment.DIALECT);
+        for (String dialect : CANNOT_JOIN_FOR_UPDATE_DIALECTS) {
+            if (dialect.equals(currentHibDialect)) {
+                _canJoinForUpdate = false;
+            }
+        }
+    }
+
+    TransactionManager getTransactionManager() {
+        return _txManager;
+    }
+
+    public static void registerTransactionManager(String uuid, TransactionManager txm) {
+        _txManagers.put(uuid, txm);
+    }
+
+    /**
+     * Get the current Hibernate Session.
+     */
+    public Session getSession() {
+        return _sessionFactory.getCurrentSession();
+    }
+    
+    public void shutdown() {
+        _sessionFactory.close();
+    }
+
+    /**
+     * Returns flag which shows whether " where .. join ... for update" kind of queries can be used (supported
+     * by currently effective {@link org.hibernate.dialect.Dialect}. If it's {@code false} than sub-query fallback
+     * should be invoked instead.
+     *
+     * @return currently returns false only for {@link org.hibernate.dialect.IngresDialect}
+     */
+    public boolean canJoinForUpdate() {
+        return _canJoinForUpdate;
+    }
+
+
+    /**
+     * Returns a hibernate configuration with hibernate DAO objects added as resources.
+     * @return
+     * @throws MappingException
+     */
+    public static Configuration getDefaultConfiguration() throws MappingException {
+        return new Configuration()//bpel classes
+                .addClass(HProcess.class)
+                .addClass(HProcessInstance.class)
+                .addClass(HCorrelator.class)
+                .addClass(HCorrelatorMessage.class)
+                .addClass(HCorrelationProperty.class)
+                .addClass(HCorrelatorSelector.class)
+                .addClass(HMessageExchange.class)
+                .addClass(HMessage.class)
+                .addClass(HPartnerLink.class)
+                .addClass(HScope.class)
+                .addClass(HCorrelationSet.class)
+                .addClass(HXmlData.class)
+                .addClass(HVariableProperty.class)
+                .addClass(HBpelEvent.class)
+                .addClass(HFaultData.class)
+                .addClass(HActivityRecovery.class)
+                .addClass(HMessageExchangeProperty.class)//store classes
+                .addClass(ProcessConfDaoImpl.class)
+                .addClass(DeploymentUnitDaoImpl.class)
+                .addClass(VersionTrackerDAOImpl.class);
+    }
+
+    public static TransactionManager getTransactionManager(Properties props) {
+        String guid = props.getProperty(PROP_GUID);
+        return _txManagers.get(guid);
+    }
+
+    public static Connection getConnection(Properties props) throws SQLException {
+        String guid = props.getProperty(PROP_GUID);
+        return _dataSources.get(guid).getConnection();
+    }
+    
+    public boolean isClosed() {
+        return _sessionFactory.isClosed();
+    }
+    
+    
+    public void begin(){
+        _ctx.begin();
+     }
+
+     public void commit(){
+       _ctx.commit();
+     }
+
+     public void rollback(){
+       _ctx.rollback();
+     }
+
+     public interface TxContext {
+
+         public void begin();
+
+         public void commit();
+
+         public void rollback();
+     }
+
+     public class HibernateNonTxContext implements TxContext {
+
+         public void begin() {
+             getSession().beginTransaction();
+         }
+
+         public void commit() {
+             getSession().getTransaction().commit();
+         }
+
+         public void rollback() {
+             getSession().getTransaction().rollback();
+         }
+     }
+
+     public class HibernateJtaTxContext implements TxContext {
+
+         public void begin() {
+         }
+
+         public void commit() {
+         }
+
+         public void rollback() {
+         }
+     }
+}

Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/ActivityRecoveryDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/ActivityRecoveryDaoImpl.java?rev=947046&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/ActivityRecoveryDaoImpl.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/ActivityRecoveryDaoImpl.java Fri May 21 15:40:59 2010
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.dao.hib.bpel;
+
+import java.io.IOException;
+import java.util.Date;
+
+import org.apache.ode.dao.bpel.ActivityRecoveryDAO;
+import org.apache.ode.dao.hib.SessionManager;
+import org.apache.ode.dao.hib.bpel.hobj.HActivityRecovery;
+import org.apache.ode.utils.DOMUtils;
+import org.w3c.dom.Element;
+import org.xml.sax.SAXException;
+
+/**
+ * Hibernate based {@link ActivityRecoveryDao} implementation
+ */
+public class ActivityRecoveryDaoImpl extends HibernateDao implements ActivityRecoveryDAO {
+    HActivityRecovery _self;
+
+    public ActivityRecoveryDaoImpl(SessionManager sm, HActivityRecovery recovery) {
+        super(sm, recovery);
+        entering("ActivityRecoveryDaoImpl.ActivityRecoveryDaoImpl");
+        _self = recovery;
+    }
+
+    public long getActivityId() {
+        return _self.getActivityId();
+    }
+
+    public String getChannel() {
+        return _self.getChannel();
+    }
+
+    public String getReason() {
+        return _self.getReason();
+    }
+
+    public Date getDateTime() {
+        return _self.getDateTime();
+    }
+    
+    public Element getDetails() {
+        entering("ActivityRecoveryDaoImpl.getDetails");
+        if (_self.getDetails() == null) {
+            return null;
+        }
+        try {
+            return DOMUtils.stringToDOM(_self.getDetails());
+        } catch (SAXException e) {
+            throw new RuntimeException(e);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public String getActions() {
+        return _self.getActions();
+    }
+
+    public String[] getActionsList() {
+        return _self.getActions().split(" ");
+    }
+
+    public int getRetries() {
+        return _self.getRetries();
+    }
+}

Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/BpelDAOConnectionFactoryImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/BpelDAOConnectionFactoryImpl.java?rev=947046&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/BpelDAOConnectionFactoryImpl.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/BpelDAOConnectionFactoryImpl.java Fri May 21 15:40:59 2010
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.dao.hib.bpel;
+
+import java.sql.Connection;
+import java.util.Enumeration;
+import java.util.Properties;
+
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.dao.bpel.BpelDAOConnection;
+import org.apache.ode.dao.bpel.BpelDAOConnectionFactory;
+import org.apache.ode.dao.hib.DataSourceConnectionProvider;
+import org.apache.ode.dao.hib.HibernateTransactionManagerLookup;
+import org.apache.ode.dao.hib.SessionManager;
+import org.apache.ode.il.config.OdeConfigProperties;
+import org.hibernate.HibernateException;
+import org.hibernate.cfg.Configuration;
+import org.hibernate.cfg.Environment;
+import org.hibernate.dialect.Dialect;
+import org.hibernate.dialect.resolver.DialectFactory;
+
+/**
+ * Hibernate-based {@link org.apache.ode.dao.bpel.BpelDAOConnectionFactory}
+ * implementation.
+ */
+public class BpelDAOConnectionFactoryImpl implements BpelDAOConnectionFactory {
+    private static final Log __log = LogFactory.getLog(BpelDAOConnectionFactoryImpl.class);
+
+    protected SessionManager _sessionManager;
+
+    private DataSource _ds;
+
+    private TransactionManager _tm;
+
+    /**
+     * Constructor.
+     */
+    public BpelDAOConnectionFactoryImpl() {
+    }
+
+    public BpelDAOConnection getConnection() {
+        try {
+            return new BpelDAOConnectionImpl(_sessionManager);
+        } catch (HibernateException e) {
+            __log.error("DbError", e);
+            throw e;
+        }
+    }
+    
+    /**
+     * @see org.apache.ode.dao.bpel.BpelDAOConnectionFactory#init(java.util.Properties)
+     */
+    @SuppressWarnings("unchecked")
+    public void init(Properties initialProps, TransactionManager mgr, Object env) {
+      _tm=mgr;
+      _ds=(DataSource)env;
+      if (_tm == null){
+          __log.error("Hibernate BpelDAOConnectionFactoryImpl requires a JTA Transaction Manager to be set.");
+      }
+      _sessionManager = setupSessionManager(initialProps, _tm, _ds);
+
+
+    }
+
+    public static SessionManager setupSessionManager(Properties initialProps, TransactionManager mgr, DataSource ds){
+        return setupSessionManager(SessionManager.getDefaultConfiguration(), initialProps, mgr, ds);
+     }
+
+    public static SessionManager setupSessionManager(Configuration conf, Properties initialConfig, TransactionManager mgr, DataSource ds){
+        // Don't want to pollute original properties
+        Properties properties = new Properties();
+        if (initialConfig != null) {
+            for (Object prop : initialConfig.keySet()) {
+                properties.put(prop, initialConfig.get(prop));
+            }
+        }
+
+        // Note that we don't allow the following properties to be overriden by
+        // the client.
+        /*
+        if (properties.containsKey(Environment.CONNECTION_PROVIDER))
+            __log.warn("Ignoring user-specified Hibernate property: " + Environment.CONNECTION_PROVIDER);
+        if (properties.containsKey(Environment.TRANSACTION_MANAGER_STRATEGY))
+            __log.warn("Ignoring user-specified Hibernate property: " + Environment.TRANSACTION_MANAGER_STRATEGY);
+        if (properties.containsKey(Environment.SESSION_FACTORY_NAME))
+            __log.warn("Ignoring user-specified Hibernate property: " + Environment.SESSION_FACTORY_NAME);
+        */
+        if (ds!=null){
+          properties.put(Environment.CONNECTION_PROVIDER, DataSourceConnectionProvider.class.getName());
+          // Guess Hibernate dialect if not specified in hibernate.properties
+          if (properties.get(Environment.DIALECT) == null) {
+              try {
+                  properties.put(Environment.DIALECT, guessDialect(ds));
+              } catch (Exception ex) {
+                  String errmsg = "Unable to detect Hibernate dialect!";
+
+                  if (__log.isDebugEnabled())
+                      __log.debug(errmsg, ex);
+
+                  __log.error(errmsg);
+              }
+          }
+        }
+
+        if (mgr!=null){
+          properties.put(Environment.TRANSACTION_MANAGER_STRATEGY, HibernateTransactionManagerLookup.class.getName());
+           /*
+          * Since Hibernate 3.2.6, Hibernate JTATransaction requires User Transaction bound on JNDI. Let's work around
+          * by implementing Hibernate JTATransactionFactory that hooks up to the JTATransactionManager(ODE uses geronimo
+          * by default).
+          */
+          //properties.put(Environment.TRANSACTION_STRATEGY, "org.hibernate.transaction.JTATransactionFactory");
+          properties.put(Environment.TRANSACTION_STRATEGY, "org.apache.ode.dao.hib.JotmTransactionFactory");
+          properties.put(Environment.CURRENT_SESSION_CONTEXT_CLASS, "jta");
+        }else{
+          properties.put(Environment.TRANSACTION_STRATEGY,"org.hibernate.transaction.JDBCTransactionFactory");
+          properties.put(Environment.CURRENT_SESSION_CONTEXT_CLASS,"thread");
+        }
+
+        // Isolation levels override
+        if (System.getProperty("ode.connection.isolation") != null) {
+            String level = System.getProperty("ode.connection.isolation", "2");
+            properties.put(Environment.ISOLATION, level);
+        }
+
+        if (Boolean.valueOf(initialConfig.getProperty(OdeConfigProperties.PROP_DB_EMBEDDED_CREATE, "true"))) {
+            properties.put(Environment.HBM2DDL_AUTO, "create-drop");
+        }
+
+        if (__log.isDebugEnabled()) {
+            Enumeration names = properties.propertyNames();
+            __log.debug("Properties passed to Hibernate:");
+            while (names.hasMoreElements()) {
+                String name = (String) names.nextElement();
+                __log.debug(name + "=" + properties.getProperty(name));
+            }
+        }
+
+        return new SessionManager(conf, properties, ds,mgr);
+
+    }
+
+    private static final String DEFAULT_HIBERNATE_DIALECT = "org.hibernate.dialect.DerbyDialect";
+
+    public void shutdown() {
+        // Not too much to do for hibernate.
+    }
+
+    public static String guessDialect(DataSource dataSource) throws Exception {
+
+        String dialect = null;
+        // Open a connection and use that connection to figure out database
+        // product name/version number in order to decide which Hibernate
+        // dialect to use.
+        Connection conn = dataSource.getConnection();
+        try {
+            Dialect d = DialectFactory.buildDialect(new Properties(), conn);
+            dialect=d.getClass().getName();
+        } finally {
+            conn.close();
+        }
+
+        if (dialect == null) {
+            __log.info("Cannot determine hibernate dialect for this database: using the default one.");
+            dialect = DEFAULT_HIBERNATE_DIALECT;
+        }
+
+        return dialect;
+
+    }
+
+    public void setDataSource(DataSource ds) {
+        _ds = ds;
+    }
+
+    public DataSource getDataSource() {
+        return _ds;
+    }
+
+    public void setTransactionManager(Object tm) {
+        _tm = (TransactionManager) tm;
+    }
+
+    public void setUnmanagedDataSource(DataSource ds) {
+        // Hibernate doesn't use this.
+    }
+
+}

Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/BpelDAOConnectionImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/BpelDAOConnectionImpl.java?rev=947046&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/BpelDAOConnectionImpl.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/BpelDAOConnectionImpl.java Fri May 21 15:40:59 2010
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.dao.hib.bpel;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.xml.namespace.QName;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.common.BpelEventFilter;
+import org.apache.ode.bpel.common.InstanceFilter;
+import org.apache.ode.bpel.common.ProcessState;
+import org.apache.ode.dao.bpel.BpelDAOConnection;
+import org.apache.ode.dao.bpel.CorrelationSetDAO;
+import org.apache.ode.dao.bpel.FilteredInstanceDeletable;
+import org.apache.ode.dao.bpel.MessageExchangeDAO;
+import org.apache.ode.dao.bpel.ProcessDAO;
+import org.apache.ode.dao.bpel.ProcessInstanceDAO;
+import org.apache.ode.dao.bpel.ProcessManagementDAO;
+import org.apache.ode.dao.bpel.ScopeDAO;
+import org.apache.ode.dao.hib.SessionManager;
+import org.apache.ode.dao.hib.bpel.hobj.HBpelEvent;
+import org.apache.ode.dao.hib.bpel.hobj.HCorrelationSet;
+import org.apache.ode.dao.hib.bpel.hobj.HMessageExchange;
+import org.apache.ode.dao.hib.bpel.hobj.HProcess;
+import org.apache.ode.dao.hib.bpel.hobj.HProcessInstance;
+import org.apache.ode.dao.hib.bpel.hobj.HScope;
+import org.apache.ode.dao.hib.bpel.ql.HibernateInstancesQueryCompiler;
+import org.apache.ode.bpel.evt.BpelEvent;
+import org.apache.ode.bpel.evt.ScopeEvent;
+import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
+import org.apache.ode.ql.eval.skel.CommandEvaluator;
+import org.apache.ode.ql.tree.Builder;
+import org.apache.ode.ql.tree.BuilderFactory;
+import org.apache.ode.ql.tree.nodes.Query;
+import org.apache.ode.utils.SerializableUtils;
+import org.apache.ode.utils.stl.CollectionsX;
+import org.apache.ode.utils.stl.UnaryFunctionEx;
+import org.hibernate.Criteria;
+import org.hibernate.FetchMode;
+import org.hibernate.HibernateException;
+import org.hibernate.Session;
+import org.hibernate.criterion.DetachedCriteria;
+import org.hibernate.criterion.Expression;
+import org.hibernate.criterion.Projections;
+
+/**
+ * Hibernate-based {@link BpelDAOConnection} implementation.
+ */
+public class BpelDAOConnectionImpl implements BpelDAOConnection, FilteredInstanceDeletable {
+    private static final Log __log = LogFactory.getLog(BpelDAOConnectionImpl.class);
+
+    protected SessionManager _sm;
+
+    public BpelDAOConnectionImpl(SessionManager sm) {
+        _sm = sm;
+    }
+
+    protected Session getSession(){
+    	return _sm.getSession();
+    }
+
+    public MessageExchangeDAO createMessageExchange(char dir) {
+        HMessageExchange mex = new HMessageExchange();
+        mex.setDirection(dir);
+        mex.setInsertTime(new Date(System.currentTimeMillis()));
+        getSession().save(mex);
+        return new MessageExchangeDaoImpl(_sm, mex);
+    }
+
+    public MessageExchangeDAO getMessageExchange(String mexid) {
+        HMessageExchange mex = (HMessageExchange) getSession().get(HMessageExchange.class, new Long(mexid));
+        return mex == null ? null : new MessageExchangeDaoImpl(_sm, mex);
+    }
+
+    public ProcessDAO createProcess(QName pid, QName type, String guid, long version) {
+        HProcess process = new HProcess();
+        process.setProcessId(pid.toString());
+        process.setTypeName(type.getLocalPart());
+        process.setTypeNamespace(type.getNamespaceURI());
+        process.setDeployDate(new Date());
+        process.setGuid(guid);
+        process.setVersion(version);
+        getSession().save(process);
+        return new ProcessDaoImpl(_sm, process);
+    }
+
+    public ProcessDAO createTransientProcess(Serializable id) {
+        HProcess process = new HProcess();
+        process.setId((Long)id);
+
+        return new ProcessDaoImpl(_sm, process);
+    }
+    
+    public ProcessDAO getProcess(QName processId) {
+        try {
+            Criteria criteria = getSession().createCriteria(HProcess.class);
+            criteria.add(Expression.eq("processId", processId.toString()));
+            // For the moment we are expecting only one result.
+            HProcess hprocess = (HProcess) criteria.uniqueResult();
+            return hprocess == null ? null : new ProcessDaoImpl(_sm, hprocess);
+        } catch (HibernateException e) {
+            __log.error("DbError", e);
+            throw e;
+        }
+
+    }
+
+    public void close() {
+    }
+
+    /**
+     * @see org.apache.ode.bpel.dao.ProcessDAO#getInstance(java.lang.Long)
+     */
+    public ProcessInstanceDAO getInstance(Long instanceId) {
+        return _getInstance(_sm, getSession(), instanceId);
+    }
+
+    public int getNumInstances(QName processId) {
+        ProcessDAO process = getProcess(processId);
+        if (process != null)
+            return process.getNumInstances();
+        else return -1;
+    }
+
+    public ScopeDAO getScope(Long siidl) {
+        return _getScope(_sm, getSession(), siidl);
+    }
+
+    public Collection<ProcessInstanceDAO> instanceQuery(InstanceFilter criteria) {
+        if (criteria.getLimit() == 0) {
+            return Collections.emptyList();
+        }
+        List<ProcessInstanceDAO> daos = new ArrayList<ProcessInstanceDAO>();
+
+        Iterator<HProcessInstance> iter = _instanceQuery(getSession(), false, criteria);
+        while (iter.hasNext()) {
+            daos.add(new ProcessInstanceDaoImpl(_sm, iter.next()));
+        }
+
+        return daos;
+    }
+
+    public int deleteInstances(InstanceFilter criteria, Set<CLEANUP_CATEGORY> categories) {
+        if (criteria.getLimit() == 0) {
+            return 0;
+        }
+
+        List<HProcessInstance> instances = _instanceQueryForList(getSession(), false, criteria);
+        if( __log.isDebugEnabled() ) __log.debug("Collected " + instances.size() + " instances to delete.");
+        
+        if( !instances.isEmpty() ) {
+            ProcessDaoImpl process = (ProcessDaoImpl)createTransientProcess(instances.get(0).getProcessId());
+            return process.deleteInstances(instances, categories);
+        }
+        
+        return 0;
+    }
+
+    static Iterator<HProcessInstance> _instanceQuery(Session session, boolean countOnly, InstanceFilter filter) {
+        return _instanceQueryForList(session, countOnly, filter).iterator();
+    }
+
+    @SuppressWarnings("unchecked")
+    private static List<HProcessInstance> _instanceQueryForList(Session session, boolean countOnly, InstanceFilter filter) {
+        CriteriaBuilder cb = new CriteriaBuilder();
+        
+        return cb.buildHQLQuery(session, filter).list();
+    }
+
+    static ProcessInstanceDAO _getInstance(SessionManager sm, Session session, Long iid) {
+        HProcessInstance instance = (HProcessInstance) session.get(HProcessInstance.class, iid);
+        return instance != null ? new ProcessInstanceDaoImpl(sm, instance) : null;
+    }
+
+    static ScopeDAO _getScope(SessionManager sm, Session session, Long siid) {
+        HScope scope = (HScope) session.get(HScope.class, siid);
+        return scope != null ? new ScopeDaoImpl(sm, scope) : null;
+    }
+
+    public void insertBpelEvent(BpelEvent event, ProcessDAO process, ProcessInstanceDAO instance) {
+        _insertBpelEvent(_sm.getSession(), event, process, instance);
+    }
+
+    /**
+     * Helper method for inserting bpel events into the database.
+     *
+     * @param sess
+     * @param event
+     * @param process
+     * @param instance
+     */
+    static void _insertBpelEvent(Session sess, BpelEvent event, ProcessDAO process, ProcessInstanceDAO instance) {
+        HBpelEvent hevent = new HBpelEvent();
+        hevent.setTstamp(new Timestamp(System.currentTimeMillis()));
+        hevent.setType(BpelEvent.eventName(event));
+        hevent.setDetail(event.toString());
+        if (process != null)
+            hevent.setProcess((HProcess) ((ProcessDaoImpl) process).getHibernateObj());
+        if (instance != null)
+            hevent.setInstance((HProcessInstance) ((ProcessInstanceDaoImpl) instance).getHibernateObj());
+        if (event instanceof ScopeEvent)
+            hevent.setScopeId(((ScopeEvent) event).getScopeId());
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            ObjectOutputStream oos = new ObjectOutputStream(bos);
+            oos.writeObject(event);
+            oos.flush();
+            hevent.setData(bos.toByteArray());
+        } catch (Throwable ex) {
+            // this is really unexpected.
+            __log.fatal("InternalError: BpelEvent serialization failed.", ex);
+        }
+        sess.save(hevent);
+    }
+
+    @SuppressWarnings( { "unchecked", "deprecation" })
+    public List<Date> bpelEventTimelineQuery(InstanceFilter ifilter, BpelEventFilter efilter) {
+        CriteriaBuilder cb = new CriteriaBuilder();
+        Criteria crit = getSession().createCriteria(HBpelEvent.class);
+        if (ifilter != null)
+            cb.buildCriteria(crit, efilter);
+        if (ifilter != null)
+            cb.buildCriteria(crit.createCriteria("instance"), ifilter);
+        crit.setFetchMode("tstamp", FetchMode.EAGER);
+        crit.setProjection(Projections.property("tstamp"));
+        return crit.list();
+    }
+
+    @SuppressWarnings("unchecked")
+    public List<BpelEvent> bpelEventQuery(InstanceFilter ifilter, BpelEventFilter efilter) {
+        CriteriaBuilder cb = new CriteriaBuilder();
+        Criteria crit = getSession().createCriteria(HBpelEvent.class);
+        if (efilter != null)
+            cb.buildCriteria(crit, efilter);
+        if (ifilter != null)
+            cb.buildCriteria(crit.createCriteria("instance"), ifilter);
+        List<HBpelEvent> hevents = crit.list();
+        List<BpelEvent> ret = new ArrayList<BpelEvent>(hevents.size());
+        try {
+            CollectionsX.transformEx(ret, hevents, new UnaryFunctionEx<HBpelEvent, BpelEvent>() {
+                public BpelEvent apply(HBpelEvent x) throws Exception {
+                    return (BpelEvent) SerializableUtils.toObject(x.getData(), BpelEvent.class
+                            .getClassLoader());
+                }
+
+            });
+        } catch (Exception ex) {
+            __log.fatal("Internal error: unable to transform HBpelEvent", ex);
+            throw new RuntimeException(ex);
+        }
+        return ret;
+    }
+
+    /**
+     * @see org.apache.ode.bpel.dao.BpelDAOConnection#instanceQuery(String)
+     */
+    @SuppressWarnings("unchecked")
+    public Collection<ProcessInstanceDAO> instanceQuery(String expression) {
+        Builder<String> builder = BuilderFactory.getInstance().createBuilder();
+        final org.apache.ode.ql.tree.nodes.Node rootNode = builder.build(expression);
+
+        HibernateInstancesQueryCompiler compiler = new HibernateInstancesQueryCompiler();
+
+        CommandEvaluator<List, Session> eval = compiler.compile((Query) rootNode);
+        List<HProcessInstance> instancesList = (List<HProcessInstance>) eval.evaluate(getSession());
+
+        Collection<ProcessInstanceDAO> result = new ArrayList<ProcessInstanceDAO>(instancesList.size());
+        for (HProcessInstance instance : instancesList) {
+            result.add(getInstance(instance.getId()));
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    public Map<Long, Collection<CorrelationSetDAO>> getCorrelationSets(Collection<ProcessInstanceDAO> instances) {
+        if (instances.size() == 0) {
+            return new HashMap<Long, Collection<CorrelationSetDAO>>();
+        }
+        ArrayList<Long> iids = new ArrayList<Long>(instances.size());
+        int i=0;
+        for (ProcessInstanceDAO dao: instances) {
+            iids.add(dao.getInstanceId());
+            i++;
+        }
+        Collection<HCorrelationSet> csets = new ArrayList<HCorrelationSet>();
+        // some databases don't like long lists of values with IN operator
+        // so we select in batches.  Oracle 9i, for instance, doesn't support
+        // more than 1000 -- we opt to be conservative.
+        final int batchSize = 100;
+        int index = 0;
+        while (index < iids.size()) {
+            List<Long> subList = iids.subList(index, Math.min(index+batchSize, iids.size()));
+            csets.addAll(getSession().getNamedQuery(HCorrelationSet.SELECT_CORSETS_BY_INSTANCES).setParameterList("instances", subList).list());
+            index += batchSize;
+        }
+        Map<Long, Collection<CorrelationSetDAO>> map = new HashMap<Long, Collection<CorrelationSetDAO>>();
+        for (HCorrelationSet cset: csets) {
+            Long id = cset.getInstance().getId();
+            Collection<CorrelationSetDAO> existing = map.get(id);
+            if (existing == null) {
+                existing = new ArrayList<CorrelationSetDAO>();
+                map.put(id, existing);
+            }
+            existing.add(new CorrelationSetDaoImpl(_sm, cset));
+        }
+        return map;
+    }
+
+    @SuppressWarnings("unchecked")
+	public Collection<CorrelationSetDAO> getActiveCorrelationSets() {
+        ArrayList<CorrelationSetDAO> csetDaos = new ArrayList<CorrelationSetDAO>();
+        Collection<HCorrelationSet> csets = getSession().getNamedQuery(HCorrelationSet.SELECT_CORSETS_BY_PROCESS_STATES).setParameter("states", ProcessState.STATE_ACTIVE).list();
+        for (HCorrelationSet cset : csets)
+            csetDaos.add(new CorrelationSetDaoImpl(_sm, cset));
+        return csetDaos;
+    }
+
+
+    public ProcessManagementDAO getProcessManagement() {
+        return new ProcessManagementDaoImpl(_sm);
+    }
+    
+	public boolean isClosed() {
+		return _sm.isClosed();
+	}
+}

Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelationSetDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelationSetDaoImpl.java?rev=947046&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelationSetDaoImpl.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelationSetDaoImpl.java Fri May 21 15:40:59 2010
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.dao.hib.bpel;
+
+import org.apache.ode.dao.bpel.CorrelationSetDAO;
+import org.apache.ode.dao.bpel.ScopeDAO;
+import org.apache.ode.dao.bpel.ProcessDAO;
+import org.apache.ode.dao.bpel.ProcessInstanceDAO;
+import org.apache.ode.dao.hib.SessionManager;
+import org.apache.ode.dao.hib.bpel.hobj.HCorrelationProperty;
+import org.apache.ode.dao.hib.bpel.hobj.HCorrelationSet;
+import org.apache.ode.bpel.common.CorrelationKey;
+
+import javax.xml.namespace.QName;
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ * Hibernate-based {@link CorrelationSetDAO} implementation.
+ */
+class CorrelationSetDaoImpl extends HibernateDao
+    implements CorrelationSetDAO {
+
+  private HCorrelationSet _correlationSet;
+
+  public CorrelationSetDaoImpl(SessionManager sessionManager, HCorrelationSet correlationSet) {
+    super(sessionManager, correlationSet);
+      entering("CorrelationSetDaoImpl.CorrelationSetDaoImpl");
+    _correlationSet = correlationSet;
+  }
+
+  public Long getCorrelationSetId() {
+    return _correlationSet.getId();
+  }
+
+  public String getName() {
+    return _correlationSet.getName();
+  }
+
+  public ScopeDAO getScope() {
+      entering("CorrelationSetDaoImpl.getScope");
+    return new ScopeDaoImpl(_sm, _correlationSet.getScope());
+  }
+
+    public void setValue(QName[] names, CorrelationKey values) {
+        entering("CorrelationSetDaoImpl.setValue");
+        _correlationSet.setValue(values.toCanonicalString());
+        if (names != null) {
+            if (_correlationSet.getProperties() == null || _correlationSet.getProperties().size() == 0) {
+                for (int m = 0; m < names.length; m++) {
+                    HCorrelationProperty prop =
+                            new HCorrelationProperty(names[m], values.getValues()[m], _correlationSet);
+                    getSession().save(prop);
+                }
+            } else {
+                for (int m = 0; m < names.length; m++) {
+                    HCorrelationProperty prop = getProperty(names[m]);
+                    if (prop == null) prop = new HCorrelationProperty(names[m], values.getValues()[m], _correlationSet);
+                    else prop.setValue(values.getValues()[m]);
+                    getSession().save(prop);
+                }
+            }
+        }
+        getSession().update(_correlationSet);
+    }
+
+  public CorrelationKey getValue() {
+      entering("CorrelationSetDaoImpl.getValue");
+    if (_correlationSet.getValue() != null) return new CorrelationKey(_correlationSet.getValue());
+    else return null;
+  }
+
+  public Map<QName, String> getProperties() {
+      entering("CorrelationSetDaoImpl.getProperties");
+    HashMap<QName, String> result = new HashMap<QName, String>();
+    for (HCorrelationProperty property : _correlationSet.getProperties()) {
+      result.put(property.getQName(), property.getValue());
+    }
+    return result;
+  }
+
+    public ProcessDAO getProcess() {
+        return new ProcessDaoImpl(_sm, _correlationSet.getProcess());
+    }
+
+    public ProcessInstanceDAO getInstance() {
+        return new ProcessInstanceDaoImpl(_sm, _correlationSet.getInstance());
+    }
+
+    private HCorrelationProperty getProperty(QName qName) {
+      entering("CorrelationSetDaoImpl.getProperty");
+    for (HCorrelationProperty property : _correlationSet.getProperties()) {
+      if (qName.getLocalPart().equals(property.getName())
+              && qName.getNamespaceURI().equals(property.getNamespace()))
+        return property;
+    }
+    return null;
+  }
+
+}

Added: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelatorDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelatorDaoImpl.java?rev=947046&view=auto
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelatorDaoImpl.java (added)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/dao/hib/bpel/CorrelatorDaoImpl.java Fri May 21 15:40:59 2010
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.dao.hib.bpel;
+
+import java.util.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.common.CorrelationKey;
+import org.apache.ode.bpel.common.CorrelationKeySet;
+import org.apache.ode.dao.bpel.CorrelatorDAO;
+import org.apache.ode.dao.bpel.CorrelatorMessageDAO;
+import org.apache.ode.dao.bpel.MessageExchangeDAO;
+import org.apache.ode.dao.bpel.MessageRouteDAO;
+import org.apache.ode.dao.bpel.ProcessInstanceDAO;
+import org.apache.ode.dao.hib.SessionManager;
+import org.apache.ode.dao.hib.bpel.hobj.HCorrelator;
+import org.apache.ode.dao.hib.bpel.hobj.HCorrelatorMessage;
+import org.apache.ode.dao.hib.bpel.hobj.HCorrelatorSelector;
+import org.apache.ode.dao.hib.bpel.hobj.HMessageExchange;
+import org.apache.ode.dao.hib.bpel.hobj.HProcess;
+import org.apache.ode.dao.hib.bpel.hobj.HProcessInstance;
+import org.hibernate.Hibernate;
+import org.hibernate.LockMode;
+import org.hibernate.Query;
+import org.hibernate.Session;
+
+import javax.xml.namespace.QName;
+
+/**
+ * Hibernate-based {@link CorrelatorDAO} implementation.
+ */
+class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO {
+    static Log __log = LogFactory.getLog(CorrelatorDaoImpl.class);
+
+    /** filter for finding a matching selector. */
+    private static final String LOCK_SELECTORS = "update from HCorrelatorSelector as hs set hs.lock = hs.lock+1 where hs.processType = :processType";
+    private static final String CHECK_SELECTORS = "from HCorrelatorSelector as hs where hs.processType = :processType and hs.correlator.correlatorId = :correlatorId";
+    private static final String FLTR_SELECTORS = "from HCorrelatorSelector as hs where hs.processType = :processType and hs.correlator.correlatorId = :correlatorId";
+    private static final String FLTR_SELECTORS_SUBQUERY = ("from HCorrelatorSelector as hs where hs.processType = :processType and hs.correlatorId = " +
+            "(select hc.id from HCorrelator as hc where hc.correlatorId = :correlatorId )").intern();
+
+    /** Query for removing routes. */
+    private static final String QRY_DELSELECTORS = "delete from HCorrelatorSelector where groupId = ? and instance = ?";
+
+    private HCorrelator _hobj;
+
+    public CorrelatorDaoImpl(SessionManager sm, HCorrelator hobj) {
+        super(sm, hobj);
+        entering("CorrelatorDaoImpl.CorrelatorDaoImpl");
+        _hobj = hobj;
+    }
+
+    @SuppressWarnings("unchecked")
+    public MessageExchangeDAO dequeueMessage(CorrelationKeySet keySet) {
+        entering("CorrelatorDaoImpl.dequeueMessage");
+
+        MessageExchangeDAO mex = null;
+
+        String hdr = "dequeueMessage(" + keySet + "): ";
+        __log.debug(hdr);
+
+        List<CorrelationKeySet> subSets = keySet.findSubSets();
+        Query qry = getSession().createFilter(_hobj.getMessageCorrelations(),
+                generateUnmatchedQuery(subSets));
+        for( int i = 0; i < subSets.size(); i++ ) {
+            qry.setString("s" + i, subSets.get(i).toCanonicalString());
+        }
+
+        // We really should consider the possibility of multiple messages matching a criteria.
+        // When the message is handled, its not too convenient to attempt to determine if the
+        // received message conflicts with one already received.
+        Iterator mcors = qry.iterate();
+        try {
+            if (!mcors.hasNext()) {
+                if (__log.isDebugEnabled())
+                    __log.debug(hdr + "did not find a MESSAGE entry.");
+            } else {
+                HCorrelatorMessage mcor = (HCorrelatorMessage) mcors.next();
+                if (__log.isDebugEnabled())
+                    __log.debug(hdr + "found MESSAGE entry " + mcor.getMessageExchange());
+                mex = new MessageExchangeDaoImpl(_sm, mcor.getMessageExchange());
+            }
+        } finally {
+            Hibernate.close(mcors);
+        }
+
+        return mex;
+    }
+
+    @SuppressWarnings("unchecked")
+    public List<MessageRouteDAO> findRoute(CorrelationKeySet keySet) {
+        List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>();
+
+        entering("CorrelatorDaoImpl.findRoute");
+        String hdr = "findRoute(keySet=" + keySet + "): ";
+        if (__log.isDebugEnabled()) __log.debug(hdr);
+
+        String processType = new QName(_hobj.getProcess().getTypeNamespace(), _hobj.getProcess().getTypeName()).toString();
+        List<CorrelationKeySet> subSets = keySet.findSubSets();
+
+        Query q = getSession().createQuery(generateSelectorQuery(_sm.canJoinForUpdate() ? FLTR_SELECTORS : FLTR_SELECTORS_SUBQUERY, subSets));
+        q.setString("processType", processType);
+        q.setString("correlatorId", _hobj.getCorrelatorId());
+
+        for( int i = 0; i < subSets.size(); i++ ) {
+            q.setString("s" + i, subSets.get(i).toCanonicalString());
+        }
+        // Make sure we obtain a lock for the selector we want to find.
+        q.setLockMode("hs", LockMode.UPGRADE);
+
+        List<HProcessInstance> targets = new ArrayList<HProcessInstance>();
+        for (HCorrelatorSelector selector : (List<HCorrelatorSelector>)q.list()) {
+            if (selector != null) {
+                boolean isRoutePolicyOne = selector.getRoute() == null || "one".equals(selector.getRoute());
+                if ("all".equals(selector.getRoute()) ||
+                        (isRoutePolicyOne && !targets.contains(selector.getInstance()))) {
+                    routes.add(new MessageRouteDaoImpl(_sm, selector));
+                    targets.add(selector.getInstance());
+                }
+            }
+        }
+
+        if(__log.isDebugEnabled()) __log.debug(hdr + "found " + routes);
+
+        // obtain a lock on the correlator to eliminate potential race condition.
+        if(__log.isDebugEnabled()) __log.debug("Obtain record lock on " + _hobj);
+        Query correlatorLockQuery = getSession().createQuery("from HCorrelator as hc where id = :id");
+        correlatorLockQuery.setLong("id", _hobj.getId());
+        correlatorLockQuery.setLockMode("hc", LockMode.UPGRADE);
+        correlatorLockQuery.list();
+
+        return routes;
+    }
+
+    private String generateUnmatchedQuery(List<CorrelationKeySet> subSets) {
+        StringBuffer filterQuery = new StringBuffer();
+
+        if( subSets.size() == 1 ) {
+            filterQuery.append(" where this.correlationKey = :s0");
+        } else if( subSets.size() > 1 ) {
+            filterQuery.append(" where this.correlationKey in(");
+            for( int i = 0; i < subSets.size(); i++ ) {
+                if( i > 0 ) {
+                    filterQuery.append(", ");
+                }
+                filterQuery.append(":s").append(i);
+            }
+            filterQuery.append(")");
+        }
+
+        return filterQuery.toString();
+    }
+
+    private String generateSelectorQuery(String header, List<CorrelationKeySet> subSets) {
+        StringBuffer filterQuery = new StringBuffer(header);
+
+        if( subSets.size() == 1 ) {
+            filterQuery.append(" and hs.correlationKey = :s0");
+        } else if( subSets.size() > 1 ) {
+            filterQuery.append(" and hs.correlationKey in(");
+            for( int i = 0; i < subSets.size(); i++ ) {
+                if( i > 0 ) {
+                    filterQuery.append(", ");
+                }
+                filterQuery.append(":s").append(i);
+            }
+            filterQuery.append(")");
+        }
+
+        return filterQuery.toString();
+    }
+
+    public void enqueueMessage(MessageExchangeDAO mex, CorrelationKeySet correlationKeySet) {
+        entering("CorrelatorDaoImpl.enqueueMessage");
+        String hdr = "enqueueMessage(mex=" + ((MessageExchangeDaoImpl) mex)._hobj.getId() + " keySet="
+                + correlationKeySet.toCanonicalString() + "): ";
+
+        if (__log.isDebugEnabled())
+            __log.debug(hdr);
+
+        for( CorrelationKeySet aSubSet : correlationKeySet.findSubSets() ) {
+            HCorrelatorMessage mcor = new HCorrelatorMessage();
+            mcor.setCorrelator(_hobj);
+            mcor.setCreated(new Date());
+            mcor.setMessageExchange((HMessageExchange) ((MessageExchangeDaoImpl) mex)._hobj);
+            mcor.setCorrelationKey(aSubSet.toCanonicalString());
+            getSession().save(mcor);
+
+            if (__log.isDebugEnabled())
+                __log.debug(hdr + "saved " + mcor);
+        }
+    }
+
+    public void addRoute(String routeGroupId, ProcessInstanceDAO target, int idx, CorrelationKeySet correlationKeySet, String routePolicy) {
+        entering("CorrelatorDaoImpl.addRoute");
+        String hdr = "addRoute(" + routeGroupId + ", iid=" + target.getInstanceId() + ", idx=" + idx + ", ckeySet="
+                + correlationKeySet + "): ";
+
+        if (__log.isDebugEnabled())
+            __log.debug(hdr);
+        HCorrelatorSelector hsel = new HCorrelatorSelector();
+        hsel.setGroupId(routeGroupId);
+        hsel.setIndex(idx);
+        hsel.setLock(0);
+        hsel.setCorrelationKey(correlationKeySet.toCanonicalString());
+        hsel.setInstance((HProcessInstance) ((ProcessInstanceDaoImpl) target).getHibernateObj());
+        hsel.setProcessType(target.getProcess().getType().toString());
+        hsel.setCorrelator(_hobj);
+        hsel.setCreated(new Date());
+        hsel.setRoute(routePolicy);
+        getSession().save(hsel);
+
+        if (__log.isDebugEnabled())
+            __log.debug(hdr + "saved " + hsel);
+    }
+
+    public boolean checkRoute(CorrelationKeySet correlationKeySet) {
+        entering("CorrelatorDaoImpl.checkRoute");
+        Query q = getSession().getNamedQuery(HCorrelatorSelector.SELECT_MESSAGE_ROUTE);
+        q.setEntity("corr",_hobj);
+        q.setString("ckey", correlationKeySet.toCanonicalString());
+        q.setReadOnly(true);
+        return q.list().isEmpty();
+    }
+
+    public String getCorrelatorId() {
+        return _hobj.getCorrelatorId();
+    }
+
+    public void setCorrelatorId(String newId) {
+        _hobj.setCorrelatorId(newId);
+    }
+
+    public void removeRoutes(String routeGroupId, ProcessInstanceDAO target) {
+        entering("CorrelatorDaoImpl.removeRoutes");
+        String hdr = "removeRoutes(" + routeGroupId + ", iid=" + target.getInstanceId() + "): ";
+        __log.debug(hdr);
+        Session session = getSession();
+        Query q = session.createQuery(QRY_DELSELECTORS);
+        q.setString(0, routeGroupId); // groupId
+        q.setEntity(1, ((ProcessInstanceDaoImpl) target).getHibernateObj()); // instance
+        int updates = q.executeUpdate();
+        session.flush(); // explicit flush to ensure route removed
+        if (__log.isDebugEnabled())
+            __log.debug(hdr + "deleted " + updates + " rows");
+    }
+
+    public Collection<CorrelatorMessageDAO> getAllMessages() {
+        Collection<CorrelatorMessageDAO> msgs = new ArrayList<CorrelatorMessageDAO>();
+        for (HCorrelatorMessage correlatorMessage : _hobj.getMessageCorrelations())
+            msgs.add(new CorrelatorMessageDaoImpl(_sm, correlatorMessage));
+        return msgs;
+    }
+
+    public Collection<MessageRouteDAO> getAllRoutes() {
+        Collection<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>();
+        for (HCorrelatorSelector selector : _hobj.getSelectors())
+            routes.add(new MessageRouteDaoImpl(_sm, selector));
+        return routes;
+    }
+
+}
\ No newline at end of file