You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mr...@apache.org on 2007/01/26 19:07:01 UTC

svn commit: r500293 - in /incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz: JobStoreJTA.java NotSoSimpleSemaphore.java QuartzSchedulerImpl.java

Author: mriou
Date: Fri Jan 26 10:07:00 2007
New Revision: 500293

URL: http://svn.apache.org/viewvc?view=rev&rev=500293
Log:
Fixing Quartz in-memory semaphore.

Added:
    incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/NotSoSimpleSemaphore.java
Modified:
    incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/JobStoreJTA.java
    incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/QuartzSchedulerImpl.java

Modified: incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/JobStoreJTA.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/JobStoreJTA.java?view=diff&rev=500293&r1=500292&r2=500293
==============================================================================
--- incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/JobStoreJTA.java (original)
+++ incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/JobStoreJTA.java Fri Jan 26 10:07:00 2007
@@ -26,6 +26,7 @@
 import org.quartz.core.SchedulingContext;
 import org.quartz.impl.jdbcjobstore.Constants;
 import org.quartz.impl.jdbcjobstore.JobStoreSupport;
+import org.quartz.impl.jdbcjobstore.Semaphore;
 import org.quartz.spi.ClassLoadHelper;
 import org.quartz.spi.JobStore;
 import org.quartz.spi.SchedulerSignaler;
@@ -51,6 +52,9 @@
 
     private TransactionManager _txm;
 
+    // Quartz in-mem semaphore has a bug, ours is identical but fixes it
+    private Semaphore _customerLockHandler = null;
+
     protected boolean setTxIsolationLevelReadCommitted = true;
 
     /** Thread-local for holding the transaction that was suspended if any */
@@ -76,8 +80,7 @@
     public void initialize(ClassLoadHelper loadHelper, SchedulerSignaler signaler)
             throws SchedulerConfigException {
 
-        setUseDBLocks(true); // *must* use DB locks with CMT...
-
+        _customerLockHandler = new NotSoSimpleSemaphore();
         super.initialize(loadHelper, signaler);
     }
 
@@ -1378,4 +1381,8 @@
         return false;
     }
 
+
+    protected Semaphore getLockHandler() {
+        return _customerLockHandler;
+    }
 }

Added: incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/NotSoSimpleSemaphore.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/NotSoSimpleSemaphore.java?view=auto&rev=500293
==============================================================================
--- incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/NotSoSimpleSemaphore.java (added)
+++ incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/NotSoSimpleSemaphore.java Fri Jan 26 10:07:00 2007
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2004-2005 OpenSymphony
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy
+ * of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * Previously Copyright (c) 2001-2004 James House
+ */
+package org.apache.ode.bpel.scheduler.quartz;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.quartz.impl.jdbcjobstore.Semaphore;
+
+import java.sql.Connection;
+import java.util.HashSet;
+import java.util.List;
+
+/**
+ * An interface for providing thread/resource locking in order to protect
+ * resources from being altered by multiple threads at the same time.
+ *
+ * @author jhouse
+ */
+public class NotSoSimpleSemaphore implements Semaphore {
+
+    /*
+     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+     *
+     * Data members.
+     *
+     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+     */
+
+    ThreadLocal lockOwners = new ThreadLocal();
+
+    HashSet locks = new HashSet();
+
+    /*
+     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+     *
+     * Interface.
+     *
+     * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+     */
+
+    Log getLog() {
+        return LogFactory.getLog(getClass());
+        //return LogFactory.getLog("LOCK:"+Thread.currentThread().getName());
+    }
+
+    private HashSet getThreadLocks() {
+        HashSet threadLocks = (HashSet) lockOwners.get();
+        if (threadLocks == null) {
+            threadLocks = new HashSet();
+            lockOwners.set(threadLocks);
+        }
+        return threadLocks;
+    }
+
+    /**
+     * Grants a lock on the identified resource to the calling thread (blocking
+     * until it is available).
+     *
+     * @return true if the lock was obtained.
+     */
+    public synchronized boolean obtainLock(Connection conn, String lockName) {
+
+        lockName = lockName.intern();
+
+        Log log = getLog();
+
+        if(log.isDebugEnabled())
+            log.debug(
+                "Lock '" + lockName + "' is desired by: "
+                        + Thread.currentThread().getName());
+
+        if (!isLockOwner(conn, lockName)) {
+            if(log.isDebugEnabled())
+                log.debug(
+                    "Lock '" + lockName + "' is being obtained: "
+                            + Thread.currentThread().getName());
+            while (locks.contains(lockName)) {
+                try {
+                    this.wait();
+                } catch (InterruptedException ie) {
+                    if(log.isDebugEnabled())
+                        log.debug(
+                            "Lock '" + lockName + "' was not obtained by: "
+                                    + Thread.currentThread().getName());
+                }
+            }
+
+            if(log.isDebugEnabled())
+                log.debug(
+                    "Lock '" + lockName + "' given to: "
+                            + Thread.currentThread().getName());
+            getThreadLocks().add(lockName);
+            locks.add(lockName);
+        } else
+            if(log.isDebugEnabled())
+                log.debug(
+                    "Lock '" + lockName + "' already owned by: "
+                            + Thread.currentThread().getName()
+                            + " -- but not owner!",
+                    new Exception("stack-trace of wrongful returner"));
+
+        return true;
+    }
+
+    /**
+     * Release the lock on the identified resource if it is held by the calling
+     * thread.
+     */
+    public synchronized void releaseLock(Connection conn, String lockName) {
+
+        lockName = lockName.intern();
+
+        if (isLockOwner(conn, lockName)) {
+            if(getLog().isDebugEnabled())
+                getLog().debug(
+                    "Lock '" + lockName + "' retuned by: "
+                            + Thread.currentThread().getName());
+            getThreadLocks().remove(lockName);
+            locks.remove(lockName);
+            this.notifyAll();
+        } else
+            if(getLog().isDebugEnabled())
+                getLog().debug(
+                    "Lock '" + lockName + "' attempt to retun by: "
+                            + Thread.currentThread().getName()
+                            + " -- but not owner!",
+                    new Exception("stack-trace of wrongful returner"));
+    }
+
+    /**
+     * Determine whether the calling thread owns a lock on the identified
+     * resource.
+     */
+    public synchronized boolean isLockOwner(Connection conn, String lockName) {
+
+        lockName = lockName.intern();
+
+        return getThreadLocks().contains(lockName);
+    }
+
+    public void init(Connection conn, List listOfLocks) {
+        // nothing to do...
+    }
+
+}

Modified: incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/QuartzSchedulerImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/QuartzSchedulerImpl.java?view=diff&rev=500293&r1=500292&r2=500293
==============================================================================
--- incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/QuartzSchedulerImpl.java (original)
+++ incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/QuartzSchedulerImpl.java Fri Jan 26 10:07:00 2007
@@ -19,34 +19,12 @@
 
 package org.apache.ode.bpel.scheduler.quartz;
 
-import java.sql.Connection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-
-import javax.sql.DataSource;
-import javax.transaction.Status;
-import javax.transaction.Synchronization;
-import javax.transaction.TransactionManager;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.ode.bpel.iapi.BpelEngineException;
-import org.apache.ode.bpel.iapi.BpelServer;
 import org.apache.ode.bpel.iapi.ContextException;
 import org.apache.ode.bpel.iapi.Scheduler;
 import org.apache.ode.utils.GUID;
-import org.quartz.JobDataMap;
-import org.quartz.JobDetail;
-import org.quartz.JobExecutionContext;
-import org.quartz.JobExecutionException;
-import org.quartz.SchedulerException;
-import org.quartz.SimpleTrigger;
-import org.quartz.Trigger;
+import org.quartz.*;
 import org.quartz.core.QuartzScheduler;
 import org.quartz.core.QuartzSchedulerResources;
 import org.quartz.core.SchedulingContext;
@@ -57,6 +35,19 @@
 import org.quartz.spi.ThreadPool;
 import org.quartz.utils.DBConnectionManager;
 
+import javax.sql.DataSource;
+import javax.transaction.Status;
+import javax.transaction.Synchronization;
+import javax.transaction.TransactionManager;
+import java.sql.Connection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
 /**
  * Quartz-based scheduler.
  * 
@@ -324,16 +315,6 @@
     /**
      * Create a QUARTZ scheduler using JTA Job Shell. Unfortunately there is no "easy" way to do this using the standard scheduler
      * factory.
-     * 
-     * @param schedulerName
-     * @param schedulerInstanceId
-     * @param threadPool
-     * @param jobStore
-     * @param rmiRegistryHost
-     * @param rmiRegistryPort
-     * @param idleWaitTime
-     * @param dbFailureRetryInterval
-     * @throws SchedulerException
      */
     private org.quartz.Scheduler createScheduler(String schedulerName, String schedulerInstanceId, ThreadPool threadPool,
             JobStoreJTA jobStore) throws SchedulerException {