You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mr...@apache.org on 2007/01/26 19:07:01 UTC
svn commit: r500293 - in
/incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz:
JobStoreJTA.java NotSoSimpleSemaphore.java QuartzSchedulerImpl.java
Author: mriou
Date: Fri Jan 26 10:07:00 2007
New Revision: 500293
URL: http://svn.apache.org/viewvc?view=rev&rev=500293
Log:
Fixing Quartz in-memory semaphore.
Added:
incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/NotSoSimpleSemaphore.java
Modified:
incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/JobStoreJTA.java
incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/QuartzSchedulerImpl.java
Modified: incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/JobStoreJTA.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/JobStoreJTA.java?view=diff&rev=500293&r1=500292&r2=500293
==============================================================================
--- incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/JobStoreJTA.java (original)
+++ incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/JobStoreJTA.java Fri Jan 26 10:07:00 2007
@@ -26,6 +26,7 @@
import org.quartz.core.SchedulingContext;
import org.quartz.impl.jdbcjobstore.Constants;
import org.quartz.impl.jdbcjobstore.JobStoreSupport;
+import org.quartz.impl.jdbcjobstore.Semaphore;
import org.quartz.spi.ClassLoadHelper;
import org.quartz.spi.JobStore;
import org.quartz.spi.SchedulerSignaler;
@@ -51,6 +52,9 @@
private TransactionManager _txm;
+ // Quartz in-mem semaphore has a bug, ours is identical but fixes it
+ private Semaphore _customerLockHandler = null;
+
protected boolean setTxIsolationLevelReadCommitted = true;
/** Thread-local for holding the transaction that was suspended if any */
@@ -76,8 +80,7 @@
public void initialize(ClassLoadHelper loadHelper, SchedulerSignaler signaler)
throws SchedulerConfigException {
- setUseDBLocks(true); // *must* use DB locks with CMT...
-
+ _customerLockHandler = new NotSoSimpleSemaphore();
super.initialize(loadHelper, signaler);
}
@@ -1378,4 +1381,8 @@
return false;
}
+
+ protected Semaphore getLockHandler() {
+ return _customerLockHandler;
+ }
}
Added: incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/NotSoSimpleSemaphore.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/NotSoSimpleSemaphore.java?view=auto&rev=500293
==============================================================================
--- incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/NotSoSimpleSemaphore.java (added)
+++ incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/NotSoSimpleSemaphore.java Fri Jan 26 10:07:00 2007
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2004-2005 OpenSymphony
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy
+ * of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * Previously Copyright (c) 2001-2004 James House
+ */
+package org.apache.ode.bpel.scheduler.quartz;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.quartz.impl.jdbcjobstore.Semaphore;
+
+import java.sql.Connection;
+import java.util.HashSet;
+import java.util.List;
+
+/**
+ * An interface for providing thread/resource locking in order to protect
+ * resources from being altered by multiple threads at the same time.
+ *
+ * @author jhouse
+ */
+public class NotSoSimpleSemaphore implements Semaphore {
+
+ /*
+ * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ *
+ * Data members.
+ *
+ * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ */
+
+ ThreadLocal lockOwners = new ThreadLocal();
+
+ HashSet locks = new HashSet();
+
+ /*
+ * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ *
+ * Interface.
+ *
+ * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ */
+
+ Log getLog() {
+ return LogFactory.getLog(getClass());
+ //return LogFactory.getLog("LOCK:"+Thread.currentThread().getName());
+ }
+
+ private HashSet getThreadLocks() {
+ HashSet threadLocks = (HashSet) lockOwners.get();
+ if (threadLocks == null) {
+ threadLocks = new HashSet();
+ lockOwners.set(threadLocks);
+ }
+ return threadLocks;
+ }
+
+ /**
+ * Grants a lock on the identified resource to the calling thread (blocking
+ * until it is available).
+ *
+ * @return true if the lock was obtained.
+ */
+ public synchronized boolean obtainLock(Connection conn, String lockName) {
+
+ lockName = lockName.intern();
+
+ Log log = getLog();
+
+ if(log.isDebugEnabled())
+ log.debug(
+ "Lock '" + lockName + "' is desired by: "
+ + Thread.currentThread().getName());
+
+ if (!isLockOwner(conn, lockName)) {
+ if(log.isDebugEnabled())
+ log.debug(
+ "Lock '" + lockName + "' is being obtained: "
+ + Thread.currentThread().getName());
+ while (locks.contains(lockName)) {
+ try {
+ this.wait();
+ } catch (InterruptedException ie) {
+ if(log.isDebugEnabled())
+ log.debug(
+ "Lock '" + lockName + "' was not obtained by: "
+ + Thread.currentThread().getName());
+ }
+ }
+
+ if(log.isDebugEnabled())
+ log.debug(
+ "Lock '" + lockName + "' given to: "
+ + Thread.currentThread().getName());
+ getThreadLocks().add(lockName);
+ locks.add(lockName);
+ } else
+ if(log.isDebugEnabled())
+ log.debug(
+ "Lock '" + lockName + "' already owned by: "
+ + Thread.currentThread().getName()
+ + " -- but not owner!",
+ new Exception("stack-trace of wrongful returner"));
+
+ return true;
+ }
+
+ /**
+ * Release the lock on the identified resource if it is held by the calling
+ * thread.
+ */
+ public synchronized void releaseLock(Connection conn, String lockName) {
+
+ lockName = lockName.intern();
+
+ if (isLockOwner(conn, lockName)) {
+ if(getLog().isDebugEnabled())
+ getLog().debug(
+ "Lock '" + lockName + "' retuned by: "
+ + Thread.currentThread().getName());
+ getThreadLocks().remove(lockName);
+ locks.remove(lockName);
+ this.notifyAll();
+ } else
+ if(getLog().isDebugEnabled())
+ getLog().debug(
+ "Lock '" + lockName + "' attempt to retun by: "
+ + Thread.currentThread().getName()
+ + " -- but not owner!",
+ new Exception("stack-trace of wrongful returner"));
+ }
+
+ /**
+ * Determine whether the calling thread owns a lock on the identified
+ * resource.
+ */
+ public synchronized boolean isLockOwner(Connection conn, String lockName) {
+
+ lockName = lockName.intern();
+
+ return getThreadLocks().contains(lockName);
+ }
+
+ public void init(Connection conn, List listOfLocks) {
+ // nothing to do...
+ }
+
+}
Modified: incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/QuartzSchedulerImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/QuartzSchedulerImpl.java?view=diff&rev=500293&r1=500292&r2=500293
==============================================================================
--- incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/QuartzSchedulerImpl.java (original)
+++ incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/QuartzSchedulerImpl.java Fri Jan 26 10:07:00 2007
@@ -19,34 +19,12 @@
package org.apache.ode.bpel.scheduler.quartz;
-import java.sql.Connection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-
-import javax.sql.DataSource;
-import javax.transaction.Status;
-import javax.transaction.Synchronization;
-import javax.transaction.TransactionManager;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.ode.bpel.iapi.BpelEngineException;
-import org.apache.ode.bpel.iapi.BpelServer;
import org.apache.ode.bpel.iapi.ContextException;
import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.ode.utils.GUID;
-import org.quartz.JobDataMap;
-import org.quartz.JobDetail;
-import org.quartz.JobExecutionContext;
-import org.quartz.JobExecutionException;
-import org.quartz.SchedulerException;
-import org.quartz.SimpleTrigger;
-import org.quartz.Trigger;
+import org.quartz.*;
import org.quartz.core.QuartzScheduler;
import org.quartz.core.QuartzSchedulerResources;
import org.quartz.core.SchedulingContext;
@@ -57,6 +35,19 @@
import org.quartz.spi.ThreadPool;
import org.quartz.utils.DBConnectionManager;
+import javax.sql.DataSource;
+import javax.transaction.Status;
+import javax.transaction.Synchronization;
+import javax.transaction.TransactionManager;
+import java.sql.Connection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
/**
* Quartz-based scheduler.
*
@@ -324,16 +315,6 @@
/**
* Create a QUARTZ scheduler using JTA Job Shell. Unfortunately there is no "easy" way to do this using the standard scheduler
* factory.
- *
- * @param schedulerName
- * @param schedulerInstanceId
- * @param threadPool
- * @param jobStore
- * @param rmiRegistryHost
- * @param rmiRegistryPort
- * @param idleWaitTime
- * @param dbFailureRetryInterval
- * @throws SchedulerException
*/
private org.quartz.Scheduler createScheduler(String schedulerName, String schedulerInstanceId, ThreadPool threadPool,
JobStoreJTA jobStore) throws SchedulerException {