You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ofbiz.apache.org by jo...@apache.org on 2007/12/01 17:50:01 UTC

svn commit: r600164 - in /ofbiz/trunk/framework: entity/src/org/ofbiz/entity/jdbc/ entity/src/org/ofbiz/entity/transaction/ entity/src/org/ofbiz/entity/util/ geronimo/src/org/ofbiz/geronimo/ service/src/org/ofbiz/service/ service/src/org/ofbiz/service/...

Author: jonesde
Date: Sat Dec  1 08:50:00 2007
New Revision: 600164

URL: http://svn.apache.org/viewvc?rev=600164&view=rev
Log:
This is a fix for the lock wait timeout problem that was happening for large sets of orders and other high volume operations, and long-lived operations with lots of asynchronous service calls; this includes some diagnostic code to show the status of transactions in other threads when a lock wait timeout error is detected; the main fix here is a big cleanup of the concurrency handling in the JobManager and JobPoller objects, including the use of sleep instead of wait for more strict and consistent sync locking, and the use of different objects for locking different parts of that class since some are meant to be used by various other threads (like the queueNow and next methods) and others are only for the poller thread, like the run method; this also includes certain other small cleanups and improvements, like using FastList/FastMap more

Modified:
    ofbiz/trunk/framework/entity/src/org/ofbiz/entity/jdbc/SQLProcessor.java
    ofbiz/trunk/framework/entity/src/org/ofbiz/entity/transaction/TransactionUtil.java
    ofbiz/trunk/framework/entity/src/org/ofbiz/entity/util/SequenceUtil.java
    ofbiz/trunk/framework/geronimo/src/org/ofbiz/geronimo/GeronimoTransactionFactory.java
    ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceDispatcher.java
    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobInvoker.java
    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
    ofbiz/trunk/framework/webtools/webapp/webtools/WEB-INF/actions/service/threads.bsh
    ofbiz/trunk/framework/webtools/webapp/webtools/service/threads.ftl

Modified: ofbiz/trunk/framework/entity/src/org/ofbiz/entity/jdbc/SQLProcessor.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/entity/src/org/ofbiz/entity/jdbc/SQLProcessor.java?rev=600164&r1=600163&r2=600164&view=diff
==============================================================================
--- ofbiz/trunk/framework/entity/src/org/ofbiz/entity/jdbc/SQLProcessor.java (original)
+++ ofbiz/trunk/framework/entity/src/org/ofbiz/entity/jdbc/SQLProcessor.java Sat Dec  1 08:50:00 2007
@@ -396,6 +396,7 @@
             // if (Debug.verboseOn()) Debug.logVerbose("[SQLProcessor.executeQuery] ps=" + _ps.toString(), module);
             _rs = _ps.executeQuery();
         } catch (SQLException sqle) {
+        	this.checkLockWaitInfo(sqle);
             throw new GenericDataSourceException("SQL Exception while executing the following:" + _sql, sqle);
         }
 
@@ -424,8 +425,10 @@
     public int executeUpdate() throws GenericDataSourceException {
         try {
             // if (Debug.verboseOn()) Debug.logVerbose("[SQLProcessor.executeUpdate] ps=" + _ps.toString(), module);
+        	//TransactionUtil.printAllThreadsTransactionBeginStacks();
             return _ps.executeUpdate();
         } catch (SQLException sqle) {
+        	this.checkLockWaitInfo(sqle);
             // don't display this here, may not be critical, allow handling further up... Debug.logError(sqle, "SQLProcessor.executeUpdate() : ERROR : ", module);
             throw new GenericDataSourceException("SQL Exception while executing the following:" + _sql, sqle);
         }
@@ -860,5 +863,18 @@
         if (fetchSize > -1) {
             stmt.setFetchSize(fetchSize);
         }
+    }
+    
+    private void checkLockWaitInfo(Exception sqle) {
+    	String eMsg = sqle.getMessage();
+    	
+    	// see if there is a lock wait timeout error, if so try to get and print more info about it
+    	//   the string for Derby is "A lock could not be obtained within the time requested"
+    	//   the string for MySQL is "Lock wait timeout exceeded; try restarting transaction"
+    	if (eMsg.indexOf("A lock could not be obtained within the time requested") >= 0 ||
+    			eMsg.indexOf("Lock wait timeout exceeded") >= 0) {
+    		Debug.logWarning(sqle, "Lock wait timeout error found in thread [" + Thread.currentThread().getId() + "]: (" + eMsg + ") when executing the SQL [" + _sql + "]", module);
+    		TransactionUtil.printAllThreadsTransactionBeginStacks();
+    	}
     }
 }

Modified: ofbiz/trunk/framework/entity/src/org/ofbiz/entity/transaction/TransactionUtil.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/entity/src/org/ofbiz/entity/transaction/TransactionUtil.java?rev=600164&r1=600163&r2=600164&view=diff
==============================================================================
--- ofbiz/trunk/framework/entity/src/org/ofbiz/entity/transaction/TransactionUtil.java (original)
+++ ofbiz/trunk/framework/entity/src/org/ofbiz/entity/transaction/TransactionUtil.java Sat Dec  1 08:50:00 2007
@@ -33,6 +33,9 @@
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
+import javolution.util.FastList;
+import javolution.util.FastMap;
+
 import org.apache.commons.collections.map.ListOrderedMap;
 import org.ofbiz.base.util.Debug;
 import org.ofbiz.base.util.UtilDateTime;
@@ -49,6 +52,16 @@
     public static Map<Xid, DebugXaResource> debugResMap = new HashMap<Xid, DebugXaResource>();
     public static boolean debugResources = true;
 
+    private static ThreadLocal<List<Transaction>> suspendedTxStack = new ThreadLocal<List<Transaction>>();
+    private static ThreadLocal<Exception> transactionBeginStack = new ThreadLocal<Exception>();
+    private static ThreadLocal<List<Exception>> transactionBeginStackSave = new ThreadLocal<List<Exception>>();
+    private static Map<Long, Exception> allThreadsTransactionBeginStack = FastMap.newInstance();
+    private static Map<Long, List<Exception>> allThreadsTransactionBeginStackSave = FastMap.newInstance();
+    private static ThreadLocal<RollbackOnlyCause> setRollbackOnlyCause = new ThreadLocal<RollbackOnlyCause>();
+    private static ThreadLocal<List<RollbackOnlyCause>> setRollbackOnlyCauseSave = new ThreadLocal<List<RollbackOnlyCause>>();
+    private static ThreadLocal<Timestamp> transactionStartStamp = new ThreadLocal<Timestamp>();
+    private static ThreadLocal<Timestamp> transactionLastNowStamp = new ThreadLocal<Timestamp>();
+
     /** Begins a transaction in the current thread IF transactions are available; only
      * tries if the current transaction status is ACTIVE, if not active it returns false.
      * If and on only if it begins a transaction it will return true. In other words, if
@@ -488,9 +501,8 @@
     }
 
     // =======================================
+    // SUSPENDED TRANSACTIONS
     // =======================================
-    private static ThreadLocal<List<Transaction>> suspendedTxStack = new ThreadLocal<List<Transaction>>();
-
     /** BE VERY CARFUL WHERE YOU CALL THIS!! */
     public static int cleanSuspendedTransactions() throws GenericTransactionException {
         Transaction trans = null;
@@ -541,19 +553,34 @@
     }
 
     // =======================================
+    // TRANSACTION BEGIN STACK
     // =======================================
-    private static ThreadLocal<Exception> transactionBeginStack = new ThreadLocal<Exception>();
-    private static ThreadLocal<List<Exception>> transactionBeginStackSave = new ThreadLocal<List<Exception>>();
-
     private static void pushTransactionBeginStackSave(Exception e) {
+    	// use the ThreadLocal one because it is more reliable than the all threads Map
         List<Exception> el = transactionBeginStackSave.get();
         if (el == null) {
-            el = new LinkedList<Exception>();
+            el = FastList.newInstance();
             transactionBeginStackSave.set(el);
         }
         el.add(0, e);
+        
+        Long curThreadId = Thread.currentThread().getId();
+        List<Exception> ctEl = allThreadsTransactionBeginStackSave.get(curThreadId);
+        if (ctEl == null) {
+        	ctEl = FastList.newInstance();
+        	allThreadsTransactionBeginStackSave.put(curThreadId, ctEl);
+        }
+        ctEl.add(0, e);
     }
     private static Exception popTransactionBeginStackSave() {
+    	// do the unofficial all threads Map one first, and don't do a real return
+        Long curThreadId = Thread.currentThread().getId();
+        List<Exception> ctEl = allThreadsTransactionBeginStackSave.get(curThreadId);
+        if (ctEl != null && ctEl.size() > 0) {
+            ctEl.remove(0);
+        }
+    	
+    	// then do the more reliable ThreadLocal one
         List<Exception> el = transactionBeginStackSave.get();
         if (el != null && el.size() > 0) {
             return el.remove(0);
@@ -561,7 +588,50 @@
             return null;
         }
     }
-
+    public static int getTransactionBeginStackSaveSize() {
+        List<Exception> el = transactionBeginStackSave.get();
+        if (el != null) {
+            return el.size();
+        } else {
+        	return 0;
+        }
+    }
+    public static List<Exception> getTransactionBeginStackSave() {
+        List<Exception> el = transactionBeginStackSave.get();
+        List<Exception> elClone = FastList.newInstance();
+        elClone.addAll(el);
+        return elClone;
+    }
+    public static Map<Long, List<Exception>> getAllThreadsTransactionBeginStackSave() {
+    	Map<Long, List<Exception>> attbssMap = allThreadsTransactionBeginStackSave;
+    	Map<Long, List<Exception>> attbssMapClone = FastMap.newInstance();
+    	attbssMapClone.putAll(attbssMap);
+        return attbssMapClone;
+    }
+    public static void printAllThreadsTransactionBeginStacks() {
+    	if (!Debug.infoOn()) {
+    		return;
+    	}
+    	
+		for (Map.Entry<Long, Exception> attbsMapEntry : allThreadsTransactionBeginStack.entrySet()) {
+			Long curThreadId = (Long) attbsMapEntry.getKey();
+			Exception transactionBeginStack = attbsMapEntry.getValue();
+    		List<Exception> txBeginStackList = allThreadsTransactionBeginStackSave.get(curThreadId);
+    		
+			Debug.logInfo(transactionBeginStack, "===================================================\n===================================================\n Current tx begin stack for thread [" + curThreadId + "]:", module);
+    		
+    		if (txBeginStackList != null && txBeginStackList.size() > 0) {
+        		int stackLevel = 0;
+        		for (Exception stack : txBeginStackList) {
+        			Debug.logInfo(stack, "===================================================\n===================================================\n Tx begin stack history for thread [" + curThreadId + "] history number [" + stackLevel + "]:", module);
+        			stackLevel++;
+        		}
+    		} else {
+    			Debug.logInfo("========================================== No tx begin stack history found for thread [" + curThreadId + "]", module);
+    		}
+		}
+    }
+    
     private static void setTransactionBeginStack() {
         Exception e = new Exception("Tx Stack Placeholder");
         setTransactionBeginStack(e);
@@ -574,8 +644,13 @@
             Debug.logWarning(e2, "WARNING: In setTransactionBeginStack a stack placeholder was already in place, here is the current location: ", module);
         }
         transactionBeginStack.set(newExc);
+        Long curThreadId = Thread.currentThread().getId();
+        allThreadsTransactionBeginStack.put(curThreadId, newExc);
     }
     private static Exception clearTransactionBeginStack() {
+        Long curThreadId = Thread.currentThread().getId();
+        allThreadsTransactionBeginStack.remove(curThreadId);
+        
         Exception e = transactionBeginStack.get();
         if (e == null) {
             Exception e2 = new Exception("Current Stack Trace");
@@ -596,6 +671,7 @@
     }
 
     // =======================================
+    // ROLLBACK ONLY CAUSE
     // =======================================
     private static class RollbackOnlyCause {
         protected String causeMessage;
@@ -610,9 +686,6 @@
         public boolean isEmpty() { return (UtilValidate.isEmpty(this.getCauseMessage()) && this.getCauseThrowable() == null); }
     }
     
-    private static ThreadLocal<RollbackOnlyCause> setRollbackOnlyCause = new ThreadLocal<RollbackOnlyCause>();
-    private static ThreadLocal<List<RollbackOnlyCause>> setRollbackOnlyCauseSave = new ThreadLocal<List<RollbackOnlyCause>>();
-
     private static void pushSetRollbackOnlyCauseSave(RollbackOnlyCause e) {
         List<RollbackOnlyCause> el = setRollbackOnlyCauseSave.get();
         if (el == null) {
@@ -667,6 +740,7 @@
     }
 
     // =======================================
+    // SUSPENDED TRANSACTIONS START TIMESTAMPS
     // =======================================
     
     /**
@@ -730,9 +804,6 @@
             transactionStartStamp.set(UtilDateTime.nowTimestamp());
         }
     }
-
-    private static ThreadLocal<Timestamp> transactionStartStamp = new ThreadLocal<Timestamp>();
-    private static ThreadLocal<Timestamp> transactionLastNowStamp = new ThreadLocal<Timestamp>();
 
     public static Timestamp getTransactionStartStamp() {
         Timestamp curStamp = transactionStartStamp.get();

Modified: ofbiz/trunk/framework/entity/src/org/ofbiz/entity/util/SequenceUtil.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/entity/src/org/ofbiz/entity/util/SequenceUtil.java?rev=600164&r1=600163&r2=600164&view=diff
==============================================================================
--- ofbiz/trunk/framework/entity/src/org/ofbiz/entity/util/SequenceUtil.java (original)
+++ ofbiz/trunk/framework/entity/src/org/ofbiz/entity/util/SequenceUtil.java Sat Dec  1 08:50:00 2007
@@ -185,7 +185,7 @@
                 if (Debug.verboseOn()) Debug.logVerbose("[SequenceUtil.SequenceBank.fillBank] Trying to get a bank of sequenced ids for " +
                         this.seqName + "; start of loop val1=" + val1 + ", val2=" + val2 + ", bankSize=" + bankSize, module);
                 
-                // not sure if this syncrhonized block is totally necessary, the method is syncrhonized but it does do a wait/sleep 
+                // not sure if this synchronized block is totally necessary, the method is synchronized but it does do a wait/sleep 
                 //outside of this block, and this is the really sensitive block, so making sure it is isolated; there is some overhead 
                 //to this, but very bad things can happen if we try to do too many of these at once for a single sequencer
                 synchronized (this) {

Modified: ofbiz/trunk/framework/geronimo/src/org/ofbiz/geronimo/GeronimoTransactionFactory.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/geronimo/src/org/ofbiz/geronimo/GeronimoTransactionFactory.java?rev=600164&r1=600163&r2=600164&view=diff
==============================================================================
--- ofbiz/trunk/framework/geronimo/src/org/ofbiz/geronimo/GeronimoTransactionFactory.java (original)
+++ ofbiz/trunk/framework/geronimo/src/org/ofbiz/geronimo/GeronimoTransactionFactory.java Sat Dec  1 08:50:00 2007
@@ -79,7 +79,7 @@
      */
     public UserTransaction getUserTransaction() {  
         return geronimoTransactionManager;
-    }                
+    }
     
     public String getTxMgrName() {
         return "geronimo";

Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceDispatcher.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceDispatcher.java?rev=600164&r1=600163&r2=600164&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceDispatcher.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceDispatcher.java Sat Dec  1 08:50:00 2007
@@ -281,6 +281,7 @@
         // start the transaction
         boolean beganTrans = false;
         try {
+            //Debug.logInfo("=========================== " + modelService.name + " 1 tx status =" + TransactionUtil.getStatusString() + ", modelService.requireNewTransaction=" + modelService.requireNewTransaction + ", modelService.useTransaction=" + modelService.useTransaction + ", TransactionUtil.isTransactionInPlace()=" + TransactionUtil.isTransactionInPlace(), module);
             if (modelService.useTransaction) {
                 if (TransactionUtil.isTransactionInPlace()) {
                     // if a new transaction is needed, do it here; if not do nothing, just use current tx
@@ -482,7 +483,6 @@
                 if (isFailure) {
                     Debug.logWarning("Service Failure [" + modelService.name + "]: " + ServiceUtil.getErrorMessage(result), module);
                 }
-
             } catch (Throwable t) {
                 if (Debug.timingOn()) {
                     UtilTimer.closeTimer(localName + " / " + modelService.name, "Sync service failed...", module);

Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobInvoker.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobInvoker.java?rev=600164&r1=600163&r2=600164&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobInvoker.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobInvoker.java Sat Dec  1 08:50:00 2007
@@ -132,6 +132,14 @@
             return 0;
         }
     }
+    
+    public Long getThreadId() {
+    	if (this.thread != null) {
+    		return this.thread.getId();
+    	} else {
+    		return null;
+    	}
+    }
 
     /**
      * Get the current running job's ID.

Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=600164&r1=600163&r2=600164&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Sat Dec  1 08:50:00 2007
@@ -104,9 +104,8 @@
         return this.delegator;
     }
 
-    public synchronized Iterator<Job> poll() {
+    public synchronized List<Job> poll() {
         List<Job> poll = FastList.newInstance();
-        Collection<GenericValue> jobEnt = null;
 
         // sort the results by time
         List<String> order = UtilMisc.toList("runTime");
@@ -135,67 +134,70 @@
         boolean pollDone = false;
 
         while (!pollDone) {
-            boolean beganTransaction = false;
+        	// an extra protection for synchronization, help make sure we don't get in here more than once
+        	synchronized (this) {
+                boolean beganTransaction = false;
 
-            try {
-                beganTransaction = TransactionUtil.begin();
-                if (!beganTransaction) {
-                    Debug.logError("Unable to poll for jobs; transaction was not started by this process", module);
-                    return null;
-                }
-                
-                List<Job> localPoll = FastList.newInstance();
-                
-                // first update the jobs w/ this instance running information
-                delegator.storeByCondition("JobSandbox", updateFields, mainCondition);
-
-                // now query all the 'queued' jobs for this instance
-                jobEnt = delegator.findByAnd("JobSandbox", updateFields, order);
-                //jobEnt = delegator.findByCondition("JobSandbox", mainCondition, null, order);
-
-                if (jobEnt != null && jobEnt.size() > 0) {
-                    for (GenericValue v: jobEnt) {
-                        DispatchContext dctx = getDispatcher().getDispatchContext();
-                        if (dctx == null) {
-                            Debug.logError("Unable to locate DispatchContext object; not running job!", module);
-                            continue;
-                        }
-                        Job job = new PersistedServiceJob(dctx, v, null); // TODO fix the requester
-                        try {
-                            job.queue();
-                            localPoll.add(job);
-                        } catch (InvalidJobException e) {
-                            Debug.logError(e, module);
+                try {
+                    beganTransaction = TransactionUtil.begin();
+                    if (!beganTransaction) {
+                        Debug.logError("Unable to poll for jobs; transaction was not started by this process", module);
+                        return null;
+                    }
+                    
+                    List<Job> localPoll = FastList.newInstance();
+                    
+                    // first update the jobs w/ this instance running information
+                    delegator.storeByCondition("JobSandbox", updateFields, mainCondition);
+
+                    // now query all the 'queued' jobs for this instance
+                    List<GenericValue> jobEnt = delegator.findByAnd("JobSandbox", updateFields, order);
+                    //jobEnt = delegator.findByCondition("JobSandbox", mainCondition, null, order);
+
+                    if (jobEnt != null && jobEnt.size() > 0) {
+                        for (GenericValue v: jobEnt) {
+                            DispatchContext dctx = getDispatcher().getDispatchContext();
+                            if (dctx == null) {
+                                Debug.logError("Unable to locate DispatchContext object; not running job!", module);
+                                continue;
+                            }
+                            Job job = new PersistedServiceJob(dctx, v, null); // TODO fix the requester
+                            try {
+                                job.queue();
+                                localPoll.add(job);
+                            } catch (InvalidJobException e) {
+                                Debug.logError(e, module);
+                            }
                         }
+                    } else {
+                        pollDone = true;
+                    }
+                    
+                    // nothing should go wrong at this point, so add to the general list
+                    poll.addAll(localPoll);
+                } catch (Throwable t) {
+                    // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation
+                    String errMsg = "Error in polling JobSandbox: [" + t.toString() + "]. Rolling back transaction.";
+                    Debug.logError(t, errMsg, module);
+                    try {
+                        // only rollback the transaction if we started one...
+                        TransactionUtil.rollback(beganTransaction, errMsg, t);
+                    } catch (GenericEntityException e2) {
+                        Debug.logError(e2, "[GenericDelegator] Could not rollback transaction: " + e2.toString(), module);
+                    }
+                } finally {
+                    try {
+                        // only commit the transaction if we started one... but make sure we try
+                        TransactionUtil.commit(beganTransaction);
+                    } catch (GenericTransactionException e) {
+                        String errMsg = "Transaction error trying to commit when polling and updating the JobSandbox: " + e.toString();
+                        // we don't really want to do anything different, so just log and move on
+                        Debug.logError(e, errMsg, module);
                     }
-                } else {
-                    pollDone = true;
-                }
-                
-                // nothing should go wrong at this point, so add to the general list
-                poll.addAll(localPoll);
-            } catch (Throwable t) {
-                // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation
-                String errMsg = "Error in polling JobSandbox: [" + t.toString() + "]. Rolling back transaction.";
-                Debug.logError(t, errMsg, module);
-                try {
-                    // only rollback the transaction if we started one...
-                    TransactionUtil.rollback(beganTransaction, errMsg, t);
-                } catch (GenericEntityException e2) {
-                    Debug.logError(e2, "[GenericDelegator] Could not rollback transaction: " + e2.toString(), module);
-                }
-            } finally {
-                try {
-                    // only commit the transaction if we started one... but make sure we try
-                    TransactionUtil.commit(beganTransaction);
-                } catch (GenericTransactionException e) {
-                    String errMsg = "Transaction error trying to commit when polling and updating the JobSandbox: " + e.toString();
-                    // we don't really want to do anything different, so just log and move on
-                    Debug.logError(e, errMsg, module);
                 }
-            }
+        	}
         }
-        return poll.iterator();
+        return poll;
     }
 
     public synchronized void reloadCrashedJobs() {

Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=600164&r1=600163&r2=600164&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Sat Dec  1 08:50:00 2007
@@ -18,12 +18,14 @@
  *******************************************************************************/
 package org.ofbiz.service.job;
 
-import java.util.*;
+import java.util.List;
+import java.util.Map;
 
+import javolution.util.FastList;
 import javolution.util.FastMap;
 
-import org.ofbiz.service.config.ServiceConfigUtil;
 import org.ofbiz.base.util.Debug;
+import org.ofbiz.service.config.ServiceConfigUtil;
 
 /**
  * JobPoller - Polls for persisted jobs to run.
@@ -39,8 +41,8 @@
     //public static final long MAX_TTL = 18000000;
 
     protected Thread thread = null;
-    protected LinkedList<JobInvoker> pool = null;
-    protected LinkedList<Job> run = null;
+    protected List<JobInvoker> pool = null;
+    protected List<Job> run = null;
     protected JobManager jm = null;
 
     protected volatile boolean isRunning = false;
@@ -51,7 +53,7 @@
      */
     public JobPoller(JobManager jm, boolean enabled) {
         this.jm = jm;
-        this.run = new LinkedList<Job>();
+        this.run = FastList.newInstance();
 
         // create the thread pool
         this.pool = createThreadPool();
@@ -77,30 +79,28 @@
     protected JobPoller() {}
 
     public synchronized void run() {
-        if (Debug.infoOn()) Debug.logInfo("JobPoller: (" + thread.getName() + ") Thread Running...", module);
         try {
             // wait 30 seconds before the first poll
-            wait(30000);
+            java.lang.Thread.sleep(30000);
         } catch (InterruptedException e) {
         }
         while (isRunning) {
             try {
                 // grab a list of jobs to run.
-                Iterator poll = jm.poll();
+                List<Job> pollList = jm.poll();
 
-                while (poll.hasNext()) {
-                    Job job = (Job) poll.next();
-
-                    if (job.isValid())
+                for (Job job : pollList) {
+                    if (job.isValid()) {
                         queueNow(job);
+                    }
                 }
-                wait(pollWaitTime());
+                // NOTE: using sleep instead of wait for stricter locking
+                java.lang.Thread.sleep(pollWaitTime());
             } catch (InterruptedException e) {
                 Debug.logError(e, module);
                 stop();
             }
         }
-        if (Debug.infoOn()) Debug.logInfo("JobPoller: (" + thread.getName() + ") Thread ending...", module);
     }
 
     /**
@@ -119,10 +119,11 @@
     }
 
     public List<Map<String, Object>> getPoolState() {
-        List<Map<String, Object>> stateList = new ArrayList<Map<String, Object>>();
+        List<Map<String, Object>> stateList = FastList.newInstance();
         for (JobInvoker invoker: this.pool) {
             Map<String, Object> stateMap = FastMap.newInstance();
             stateMap.put("threadName", invoker.getName());
+            stateMap.put("threadId", invoker.getThreadId());
             stateMap.put("jobName", invoker.getJobName());
             stateMap.put("serviceName", invoker.getServiceName());
             stateMap.put("runTime", Long.valueOf(invoker.getCurrentRuntime()));
@@ -164,26 +165,38 @@
     /**
      * Returns the next job to run
      */
-    public synchronized Job next() {
-        if (run.size() > 0)
-            return run.removeFirst();
+    public Job next() {
+        if (run.size() > 0) {
+        	synchronized (run) {
+        		return run.remove(0);
+        	}
+        }
         return null;
     }
 
     /**
      * Adds a job to the RUN queue
      */
-    public synchronized void queueNow(Job job) {
-        run.add(job);
+    public void queueNow(Job job) {
+    	//Debug.logInfo("[" + Thread.currentThread().getId() + "] Begin queueNow; holds run lock? " + Thread.holdsLock(run), module);
+    	
+    	// NOTE DEJ20071201 MUST use a different object for the lock here because the "this" object is always held by the poller thread in the run method above (which sleeps and runs)
+    	synchronized (run) {
+            run.add(job);
+    	}
         if (Debug.verboseOn()) Debug.logVerbose("New run queue size: " + run.size(), module);
         if (run.size() > pool.size() && pool.size() < maxThreads()) {
-            int calcSize = (run.size() / jobsPerThread()) - (pool.size());
-            int addSize = calcSize > maxThreads() ? maxThreads() : calcSize;
-
-            for (int i = 0; i < addSize; i++) {
-                JobInvoker iv = new JobInvoker(this, invokerWaitTime());
-                pool.add(iv);
-            }
+	    	synchronized (pool) {
+	            if (run.size() > pool.size() && pool.size() < maxThreads()) {
+	                int calcSize = (run.size() / jobsPerThread()) - (pool.size());
+	                int addSize = calcSize > maxThreads() ? maxThreads() : calcSize;
+	
+	                for (int i = 0; i < addSize; i++) {
+	                    JobInvoker iv = new JobInvoker(this, invokerWaitTime());
+	                    pool.add(iv);
+	                }
+	            }
+	    	}
         }
     }
 
@@ -203,8 +216,8 @@
     }
 
     // Creates the invoker pool
-    private LinkedList<JobInvoker> createThreadPool() {
-        LinkedList<JobInvoker> threadPool = new LinkedList<JobInvoker>();
+    private List<JobInvoker> createThreadPool() {
+        List<JobInvoker> threadPool = FastList.newInstance();
 
         while (threadPool.size() < minThreads()) {
             JobInvoker iv = new JobInvoker(this, invokerWaitTime());

Modified: ofbiz/trunk/framework/webtools/webapp/webtools/WEB-INF/actions/service/threads.bsh
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/webtools/webapp/webtools/WEB-INF/actions/service/threads.bsh?rev=600164&r1=600163&r2=600164&view=diff
==============================================================================
--- ofbiz/trunk/framework/webtools/webapp/webtools/WEB-INF/actions/service/threads.bsh (original)
+++ ofbiz/trunk/framework/webtools/webapp/webtools/WEB-INF/actions/service/threads.bsh Sat Dec  1 08:50:00 2007
@@ -50,7 +50,7 @@
             case -1: status = uiLabelMap.get("WebtoolsStatusShuttingDown"); break;
             default: status = uiLabelMap.get("WebtoolsStatusInvalid"); break;
         }
-        threads.add(UtilMisc.toMap("serviceName", job.get("serviceName"), "threadName", job.get("threadName"), "jobName", job.get("jobName"), "status", status));
+        threads.add(UtilMisc.toMap("serviceName", job.get("serviceName"), "threadName", job.get("threadName"), "threadId", job.get("threadId"), "jobName", job.get("jobName"), "status", status));
     }
 }
 context.put("threads", threads);

Modified: ofbiz/trunk/framework/webtools/webapp/webtools/service/threads.ftl
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/webtools/webapp/webtools/service/threads.ftl?rev=600164&r1=600163&r2=600164&view=diff
==============================================================================
--- ofbiz/trunk/framework/webtools/webapp/webtools/service/threads.ftl (original)
+++ ofbiz/trunk/framework/webtools/webapp/webtools/service/threads.ftl Sat Dec  1 08:50:00 2007
@@ -18,7 +18,7 @@
 -->
 <#assign javaVer = Static["java.lang.System"].getProperty("java.vm.version")/>
 <#assign isJava5 = javaVer.startsWith("1.5")/>
-<#if parameters.maxElements?has_content><#assign maxElements = parameters.maxElements?number/><#else><#assign maxElements = 5/></#if>
+<#if parameters.maxElements?has_content><#assign maxElements = parameters.maxElements?number/><#else><#assign maxElements = 10/></#if>
 
 <div class="screenlet">
   <div class="screenlet-title-bar">
@@ -34,7 +34,7 @@
   </tr>
   <#list threads as thread>
   <tr>
-    <td>${thread.threadName?if_exists}</td>
+    <td>[${thread.threadId?if_exists}] ${thread.threadName?if_exists}</td>
     <td>${thread.status?if_exists}</td>
     <td>${thread.jobName?default("[${uiLabelMap.CommonNone}]")}</td>
     <td>${thread.serviceName?default("[${uiLabelMap.CommonNone}]")}</td>
@@ -65,20 +65,20 @@
     <#if javaThread?exists>
       <#if isJava5><#assign stackTraceArray = javaThread.getStackTrace()/></#if>
       <tr>
-        <td>${(javaThread.getThreadGroup().getName())?if_exists}</td>
-        <td><#if isJava5>${javaThread.getId()?string}</#if></td>
-        <td>
+        <td valign="top">${(javaThread.getThreadGroup().getName())?if_exists}</td>
+        <td valign="top"><#if isJava5>${javaThread.getId()?string}</#if></td>
+        <td valign="top">
           <b>${javaThread.getName()?if_exists}</b>
           <#if isJava5>
             <#list 1..maxElements as stackIdx>
               <#assign stackElement = stackTraceArray[stackIdx]?if_exists/>
-              <#if (stackElement.toString())?has_content>${stackElement.toString()}</#if>
+              <#if (stackElement.toString())?has_content><div>${stackElement.toString()}</div></#if>
             </#list>
           </#if>
         </td>
-        <td><#if isJava5>${javaThread.getState().name()?if_exists}</#if>&nbsp;</td>
-        <td>${javaThread.getPriority()}</td>
-        <td>${javaThread.isDaemon()?string}<#-- /${javaThread.isAlive()?string}/${javaThread.isInterrupted()?string} --></td>
+        <td valign="top"><#if isJava5>${javaThread.getState().name()?if_exists}</#if>&nbsp;</td>
+        <td valign="top">${javaThread.getPriority()}</td>
+        <td valign="top">${javaThread.isDaemon()?string}<#-- /${javaThread.isAlive()?string}/${javaThread.isInterrupted()?string} --></td>
       </tr>
     </#if>
   </#list>