You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2011/01/10 03:35:42 UTC

svn commit: r1057076 [2/2] - in /incubator/lcf/trunk: ./ framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/ framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/ framework/pull-agent/src/main/java/org/apache/m...

Added: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java?rev=1057076&view=auto
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java (added)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java Mon Jan 10 02:35:42 2011
@@ -0,0 +1,430 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.crawler.system;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.agents.interfaces.*;
+import org.apache.manifoldcf.crawler.interfaces.*;
+import org.apache.manifoldcf.crawler.system.Logging;
+import java.util.*;
+import java.lang.reflect.*;
+
+/** This class represents a document cleanup thread.  This thread's job is to pull document sets to be cleaned up off of
+* a queue, and kill them.  It finishes a cleanup set by getting rid of the corresponding rows in the job queue, and from
+* carrydown, and from hopcount.
+*
+* There are very few decisions that this thread needs to make; essentially all the hard thought went into deciding
+* what documents to queue in the first place.
+*
+* The only caveat is that the ingestion API may not be accepting delete requests at the time that this thread wants it
+* to be able to accept them.  In that case, it's acceptable for the thread to block until the ingestion service is
+* functioning again.
+*
+* Transactions are not much needed for this class; it simply needs to not fail to remove the appropriate jobqueue
+* table rows at the end of the delete.
+*/
+public class DocumentCleanupThread extends Thread
+{
+  public static final String _rcsid = "@(#)$Id$";
+
+
+  // Local data
+  protected String id;
+  // This is a reference to the static main document queue
+  protected DocumentCleanupQueue documentCleanupQueue;
+  /** Delete thread pool reset manager */
+  protected DocCleanupResetManager resetManager;
+  /** Queue tracker */
+  protected QueueTracker queueTracker;
+
+  /** Constructor.
+  *@param id is the worker thread id.
+  */
+  public DocumentCleanupThread(String id, DocumentCleanupQueue documentCleanupQueue, QueueTracker queueTracker, DocCleanupResetManager resetManager)
+    throws ManifoldCFException
+  {
+    super();
+    this.id = id;
+    this.documentCleanupQueue = documentCleanupQueue;
+    this.queueTracker = queueTracker;
+    this.resetManager = resetManager;
+    setName("Document cleanup thread '"+id+"'");
+    setDaemon(true);
+  }
+
+  public void run()
+  {
+    resetManager.registerMe();
+
+    try
+    {
+      // Create a thread context object.
+      IThreadContext threadContext = ThreadContextFactory.make();
+      IIncrementalIngester ingester = IncrementalIngesterFactory.make(threadContext);
+      IJobManager jobManager = JobManagerFactory.make(threadContext);
+      IRepositoryConnectionManager connMgr = RepositoryConnectionManagerFactory.make(threadContext);
+
+      // Loop
+      while (true)
+      {
+        // Do another try/catch around everything in the loop
+        try
+        {
+          if (Thread.currentThread().isInterrupted())
+            throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
+
+          // Before we begin, conditionally reset
+          resetManager.waitForReset(threadContext);
+
+          // Once we pull something off the queue, we MUST make sure that
+          // we update its status, even if there is an exception!!!
+
+          // See if there is anything on the queue for me
+          DocumentCleanupSet dds = documentCleanupQueue.getDocuments();
+          if (dds == null)
+            // It's a reset, so recycle
+            continue;
+
+          if (Thread.currentThread().isInterrupted())
+            throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
+
+          try
+          {
+            long currentTime = System.currentTimeMillis();
+
+            // We need to segregate all the documents by connection, in order to be able to form a decent activities object
+            // to pass into the incremental ingester.  So, first pass through the document descriptions will build that.
+            Map mappedDocs = new HashMap();
+            int j = 0;
+            while (j < dds.getCount())
+            {
+              CleanupQueuedDocument dqd = dds.getDocument(j++);
+              DocumentDescription ddd = dqd.getDocumentDescription();
+              Long jobID = ddd.getJobID();
+              IJobDescription job = jobManager.load(jobID,true);
+              String connectionName = job.getConnectionName();
+              ArrayList list = (ArrayList)mappedDocs.get(connectionName);
+              if (list == null)
+              {
+                list = new ArrayList();
+                mappedDocs.put(connectionName,list);
+              }
+              list.add(dqd);
+            }
+
+            // Now, cycle through all represented connections.
+            // For each connection, construct the necessary pieces to do the deletion.
+            Iterator iter = mappedDocs.keySet().iterator();
+            while (iter.hasNext())
+            {
+              String connectionName = (String)iter.next();
+              ArrayList list = (ArrayList)mappedDocs.get(connectionName);
+
+              // Produce a map of connection name->connection object.  We will use this to perform a request for multiple connector objects
+              IRepositoryConnection connection = connMgr.load(connectionName);
+              ArrayList arrayOutputConnectionNames = new ArrayList();
+              ArrayList arrayDocHashes = new ArrayList();
+              ArrayList arrayDocClasses = new ArrayList();
+              ArrayList arrayDocsToDelete = new ArrayList();
+              ArrayList arrayRelationshipTypes = new ArrayList();
+              ArrayList hopcountMethods = new ArrayList();
+              ArrayList connections = new ArrayList();
+              j = 0;
+              while (j < list.size())
+              {
+                CleanupQueuedDocument dqd = (CleanupQueuedDocument)list.get(j);
+                DocumentDescription ddd = dqd.getDocumentDescription();
+
+                Long jobID = ddd.getJobID();
+                IJobDescription job = jobManager.load(jobID,true);
+                if (job != null && connection != null)
+                {
+                  // We'll need the legal link types; grab those before we proceed
+                  String[] legalLinkTypes = RepositoryConnectorFactory.getRelationshipTypes(threadContext,connection.getClassName());
+                  if (legalLinkTypes != null)
+                  {
+                    arrayOutputConnectionNames.add(job.getOutputConnectionName());
+                    arrayDocClasses.add(connectionName);
+                    arrayDocHashes.add(ddd.getDocumentIdentifierHash());
+                    arrayDocsToDelete.add(dqd);
+                    arrayRelationshipTypes.add(legalLinkTypes);
+                    hopcountMethods.add(new Integer(job.getHopcountMode()));
+                  }
+                }
+                j++;
+              }
+
+              // Next, segregate the documents by output connection name.  This will permit logging to know what actual activity type to use.
+              HashMap outputMap = new HashMap();
+              j = 0;
+              while (j < arrayDocHashes.size())
+              {
+                String outputConnectionName = (String)arrayOutputConnectionNames.get(j);
+                ArrayList subList = (ArrayList)outputMap.get(outputConnectionName);
+                if (subList == null)
+                {
+                  subList = new ArrayList();
+                  outputMap.put(outputConnectionName,subList);
+                }
+                subList.add(new Integer(j));
+                j++;
+              }
+
+              // Grab one connection for each connectionName.  If we fail, nothing is lost and retries are possible.
+              try
+              {
+                IRepositoryConnector connector = RepositoryConnectorFactory.grab(threadContext,connection.getClassName(),connection.getConfigParams(),connection.getMaxConnections());
+                try
+                {
+
+                  // Iterate over the outputs
+                  Iterator outputIterator = outputMap.keySet().iterator();
+                  while (outputIterator.hasNext())
+                  {
+                    String outputConnectionName = (String)outputIterator.next();
+                    ArrayList indexList = (ArrayList)outputMap.get(outputConnectionName);
+                    // Count the number of docs to actually delete.  This will be a subset of the documents in the list.
+                    int k = 0;
+                    int removeCount = 0;
+                    while (k < indexList.size())
+                    {
+                      int index = ((Integer)indexList.get(k++)).intValue();
+                      if (((CleanupQueuedDocument)arrayDocsToDelete.get(index)).shouldBeRemovedFromIndex())
+                        removeCount++;
+                    }
+                    
+                    // Allocate removal arrays
+                    String[] docClassesToRemove = new String[removeCount];
+                    String[] hashedDocsToRemove = new String[removeCount];
+
+                    // Now, iterate over the index list
+                    k = 0;
+                    removeCount = 0;
+                    while (k < indexList.size())
+                    {
+                      int index = ((Integer)indexList.get(k)).intValue();
+                      if (((CleanupQueuedDocument)arrayDocsToDelete.get(index)).shouldBeRemovedFromIndex())
+                      {
+                        docClassesToRemove[removeCount] = (String)arrayDocClasses.get(index);
+                        hashedDocsToRemove[removeCount] = (String)arrayDocHashes.get(index);
+                        removeCount++;
+                      }
+                      k++;
+                    }
+
+                    OutputRemoveActivity activities = new OutputRemoveActivity(connectionName,connMgr,outputConnectionName);
+
+                    // Finally, go ahead and delete the documents from the ingestion system.
+
+                    while (true)
+                    {
+                      try
+                      {
+                        ingester.documentDeleteMultiple(outputConnectionName,docClassesToRemove,hashedDocsToRemove,activities);
+                        break;
+                      }
+                      catch (ServiceInterruption e)
+                      {
+                        // If we get a service interruption here, it means that the ingestion API is down.
+                        // There is no point, therefore, in freeing up this thread to go do something else;
+                        // might as well just wait here for our retries.
+                        // Wait for the prescribed time
+                        long amt = e.getRetryTime();
+                        long now = System.currentTimeMillis();
+                        long waittime = amt-now;
+                        if (waittime <= 0L)
+                          waittime = 300000L;
+                        ManifoldCF.sleep(waittime);
+                      }
+                    }
+
+                    // Successfully deleted some documents from ingestion system.  Now, remove them from job queue.  This
+                    // must currently happen one document at a time, because the jobs and connectors for each document
+                    // potentially differ.
+                    k = 0;
+                    while (k < indexList.size())
+                    {
+                      int index = ((Integer)indexList.get(k)).intValue();
+
+                      DeleteQueuedDocument dqd = (DeleteQueuedDocument)arrayDocsToDelete.get(index);
+                      DocumentDescription ddd = dqd.getDocumentDescription();
+                      Long jobID = ddd.getJobID();
+                      int hopcountMethod = ((Integer)hopcountMethods.get(index)).intValue();
+                      String[] legalLinkTypes = (String[])arrayRelationshipTypes.get(index);
+                      DocumentDescription[] requeueCandidates = jobManager.markDocumentDeleted(jobID,legalLinkTypes,ddd,hopcountMethod);
+                      // Use the common method for doing the requeuing
+                      ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,
+                        connector,connection,queueTracker,currentTime);
+                      // Finally, completed expiration of the document.
+                      dqd.setProcessed();
+                      k++;
+                    }
+                  }
+                }
+                finally
+                {
+                  // Free up the reserved connector instance
+                  RepositoryConnectorFactory.release(connector);
+                }
+              }
+              catch (ManifoldCFException e)
+              {
+                if (e.getErrorCode() == ManifoldCFException.REPOSITORY_CONNECTION_ERROR)
+                {
+                  // This error can only come from grabbing the connections.  So, if this occurs it means that
+                  // all the documents we've been handed have to be stuffed back onto the queue for processing at a later time.
+                  Logging.threads.warn("Document cleanup thread couldn't establish necessary connections, retrying later: "+e.getMessage(),e);
+
+                  // Let the unprocessed documents get requeued!  This is handled at the end of the loop...
+                }
+                else
+                  throw e;
+              }
+            }
+          }
+          catch (ManifoldCFException e)
+          {
+            if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
+              break;
+
+            if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
+              throw e;
+
+            Logging.threads.error("Exception tossed: "+e.getMessage(),e);
+          }
+          finally
+          {
+            // Insure that the documents that were not deleted get restored to the proper state.
+            int j = 0;
+            while (j < dds.getCount())
+            {
+              DeleteQueuedDocument dqd = dds.getDocument(j);
+              if (dqd.wasProcessed() == false)
+              {
+                DocumentDescription ddd = dqd.getDocumentDescription();
+                // Requeue this document!
+                jobManager.resetCleaningDocument(ddd);
+                dqd.setProcessed();
+              }
+              j++;
+            }
+          }
+        }
+        catch (ManifoldCFException e)
+        {
+          if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
+            break;
+
+          if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
+          {
+            // Note the failure, which will cause a reset to occur
+            resetManager.noteEvent();
+            // Wake up all sleeping worker threads
+            documentCleanupQueue.reset();
+
+            Logging.threads.error("Document cleanup thread aborting and restarting due to database connection reset: "+e.getMessage(),e);
+            try
+            {
+              // Give the database a chance to catch up/wake up
+              ManifoldCF.sleep(10000L);
+            }
+            catch (InterruptedException se)
+            {
+              break;
+            }
+            continue;
+          }
+
+          // An exception occurred in the cleanup from another error.
+          // Log the error (but that's all we can do)
+          Logging.threads.error("Exception tossed: "+e.getMessage(),e);
+
+        }
+        catch (InterruptedException e)
+        {
+          // We're supposed to quit
+          break;
+        }
+        catch (OutOfMemoryError e)
+        {
+          System.err.println("agents process ran out of memory - shutting down");
+          e.printStackTrace(System.err);
+          System.exit(-200);
+        }
+        catch (Throwable e)
+        {
+          // A more severe error - but stay alive
+          Logging.threads.fatal("Error tossed: "+e.getMessage(),e);
+        }
+      }
+    }
+    catch (Throwable e)
+    {
+      // Severe error on initialization
+      System.err.println("agents process could not start - shutting down");
+      Logging.threads.fatal("DocumentCleanupThread "+id+" initialization error tossed: "+e.getMessage(),e);
+      System.exit(-300);
+    }
+  }
+
+
+  /** The ingest logger class */
+  protected static class OutputRemoveActivity implements IOutputRemoveActivity
+  {
+
+    // Connection name
+    protected String connectionName;
+    // Connection manager
+    protected IRepositoryConnectionManager connMgr;
+    // Output connection name
+    protected String outputConnectionName;
+
+    /** Constructor */
+    public OutputRemoveActivity(String connectionName, IRepositoryConnectionManager connMgr, String outputConnectionName)
+    {
+      this.connectionName = connectionName;
+      this.connMgr = connMgr;
+      this.outputConnectionName = outputConnectionName;
+    }
+
+    /** Record time-stamped information about the activity of the output connector.
+    *@param startTime is either null or the time since the start of epoch in milliseconds (Jan 1, 1970).  Every
+    *       activity has an associated time; the startTime field records when the activity began.  A null value
+    *       indicates that the start time and the finishing time are the same.
+    *@param activityType is a string which is fully interpretable only in the context of the connector involved, which is
+    *       used to categorize what kind of activity is being recorded.  For example, a web connector might record a
+    *       "fetch document" activity.  Cannot be null.
+    *@param dataSize is the number of bytes of data involved in the activity, or null if not applicable.
+    *@param entityURI is a (possibly long) string which identifies the object involved in the history record.
+    *       The interpretation of this field will differ from connector to connector.  May be null.
+    *@param resultCode contains a terse description of the result of the activity.  The description is limited in
+    *       size to 255 characters, and can be interpreted only in the context of the current connector.  May be null.
+    *@param resultDescription is a (possibly long) human-readable string which adds detail, if required, to the result
+    *       described in the resultCode field.  This field is not meant to be queried on.  May be null.
+    */
+    public void recordActivity(Long startTime, String activityType, Long dataSize,
+      String entityURI, String resultCode, String resultDescription)
+      throws ManifoldCFException
+    {
+      connMgr.recordHistory(connectionName,startTime,ManifoldCF.qualifyOutputActivityName(activityType,outputConnectionName),dataSize,entityURI,resultCode,
+        resultDescription,null);
+    }
+  }
+
+}

Propchange: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
------------------------------------------------------------------------------
    svn:keywords = Id

Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java?rev=1057076&r1=1057075&r2=1057076&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java Mon Jan 10 02:35:42 2011
@@ -46,6 +46,8 @@ public class ManifoldCF extends org.apac
   protected static ExpireThread[] expireThreads = null;
   protected static DocumentDeleteStufferThread deleteStufferThread = null;
   protected static DocumentDeleteThread[] deleteThreads = null;
+  protected static DocumentCleanupStufferThread cleanupStufferThread = null;
+  protected static DocumentCleanupThread[] cleanupThreads = null;
   protected static JobResetThread jobResetThread = null;
   protected static SeedingThread seedingThread = null;
   protected static IdleCleanupThread idleCleanupThread = null;
@@ -56,11 +58,15 @@ public class ManifoldCF extends org.apac
   protected static WorkerResetManager workerResetManager = null;
   /** Delete thread pool reset manager */
   protected static DocDeleteResetManager docDeleteResetManager = null;
+  /** Cleanup thread pool reset manager */
+  protected static DocCleanupResetManager docCleanupResetManager = null;
 
   // Number of worker threads
   protected static int numWorkerThreads = 0;
   // Number of delete threads
   protected static int numDeleteThreads = 0;
+  // Number of cleanup threads
+  protected static int numCleanupThreads = 0;
   // Number of expiration threads
   protected static int numExpireThreads = 0;
   // Factor for low water level in queueing
@@ -70,6 +76,7 @@ public class ManifoldCF extends org.apac
 
   protected static final String workerThreadCountProperty = "org.apache.manifoldcf.crawler.threads";
   protected static final String deleteThreadCountProperty = "org.apache.manifoldcf.crawler.deletethreads";
+  protected static final String cleanupThreadCountProperty = "org.apache.manifoldcf.crawler.cleanupthreads";
   protected static final String expireThreadCountProperty = "org.apache.manifoldcf.crawler.expirethreads";
   protected static final String lowWaterFactorProperty = "org.apache.manifoldcf.crawler.lowwaterfactor";
   protected static final String stuffAmtFactorProperty = "org.apache.manifoldcf.crawler.stuffamountfactor";
@@ -150,12 +157,18 @@ public class ManifoldCF extends org.apac
       String maxDeleteThreads = getProperty(deleteThreadCountProperty);
       if (maxDeleteThreads == null)
         maxDeleteThreads = "10";
+      String maxCleanupThreads = getProperty(cleanupThreadCountProperty);
+      if (maxCleanupThreads == null)
+        maxCleanupThreads = "10";
       String maxExpireThreads = getProperty(expireThreadCountProperty);
       if (maxExpireThreads == null)
         maxExpireThreads = "10";
       numDeleteThreads = new Integer(maxDeleteThreads).intValue();
       if (numDeleteThreads < 1 || numDeleteThreads > 300)
         throw new ManifoldCFException("Illegal value for the number of delete threads");
+      numCleanupThreads = new Integer(maxCleanupThreads).intValue();
+      if (numCleanupThreads < 1 || numCleanupThreads > 300)
+        throw new ManifoldCFException("Illegal value for the number of cleanup threads");
       numExpireThreads = new Integer(maxExpireThreads).intValue();
       if (numExpireThreads < 1 || numExpireThreads > 300)
         throw new ManifoldCFException("Illegal value for the number of expire threads");
@@ -180,12 +193,14 @@ public class ManifoldCF extends org.apac
 
       DocumentQueue documentQueue = new DocumentQueue();
       DocumentDeleteQueue documentDeleteQueue = new DocumentDeleteQueue();
+      DocumentCleanupQueue documentCleanupQueue = new DocumentCleanupQueue();
       DocumentDeleteQueue expireQueue = new DocumentDeleteQueue();
 
       BlockingDocuments blockingDocuments = new BlockingDocuments();
 
       workerResetManager = new WorkerResetManager(documentQueue);
       docDeleteResetManager = new DocDeleteResetManager(documentDeleteQueue);
+      docCleanupResetManager = new DocCleanupResetManager(documentCleanupQueue);
 
       jobStartThread = new JobStartThread();
       startupThread = new StartupThread(queueTracker);
@@ -220,6 +235,16 @@ public class ManifoldCF extends org.apac
         deleteThreads[i] = new DocumentDeleteThread(Integer.toString(i),documentDeleteQueue,docDeleteResetManager);
         i++;
       }
+      
+      cleanupStufferThread = new DocumentCleanupStufferThread(documentCleanupQueue,numCleanupThreads,docCleanupResetManager);
+      cleanupThreads = new DocumentCleanupThread[numCleanupThreads];
+      i = 0;
+      while (i < numCleanupThreads)
+      {
+        cleanupThreads[i] = new DocumentCleanupThread(Integer.toString(i),documentCleanupQueue,queueTracker,docCleanupResetManager);
+        i++;
+      }
+
       jobResetThread = new JobResetThread(queueTracker);
       seedingThread = new SeedingThread(queueTracker);
       idleCleanupThread = new IdleCleanupThread();
@@ -311,6 +336,14 @@ public class ManifoldCF extends org.apac
           i++;
         }
 
+        cleanupStufferThread.start();
+        i = 0;
+        while (i < numCleanupThreads)
+        {
+          cleanupThreads[i].start();
+          i++;
+        }
+
         deleteStufferThread.start();
         i = 0;
         while (i < numDeleteThreads)
@@ -351,6 +384,7 @@ public class ManifoldCF extends org.apac
       while (initializationThread != null || jobDeleteThread != null || startupThread != null || jobStartThread != null || stufferThread != null ||
         finisherThread != null || notificationThread != null || workerThreads != null || expireStufferThread != null | expireThreads != null ||
         deleteStufferThread != null || deleteThreads != null ||
+        cleanupStufferThread != null || cleanupThreads != null ||
         jobResetThread != null || seedingThread != null || idleCleanupThread != null || setPriorityThread != null)
       {
         // Send an interrupt to all threads that are still there.
@@ -412,6 +446,20 @@ public class ManifoldCF extends org.apac
               expireThread.interrupt();
           }
         }
+        if (cleanupStufferThread != null)
+        {
+          cleanupStufferThread.interrupt();
+        }
+        if (cleanupThreads != null)
+        {
+          int i = 0;
+          while (i < cleanupThreads.length)
+          {
+            Thread cleanupThread = cleanupThreads[i++];
+            if (cleanupThread != null)
+              cleanupThread.interrupt();
+          }
+        }
         if (deleteStufferThread != null)
         {
           deleteStufferThread.interrupt();
@@ -534,6 +582,31 @@ public class ManifoldCF extends org.apac
             expireThreads = null;
         }
 
+        if (cleanupStufferThread != null)
+        {
+          if (!cleanupStufferThread.isAlive())
+            cleanupStufferThread = null;
+        }
+        if (cleanupThreads != null)
+        {
+          int i = 0;
+          boolean isAlive = false;
+          while (i < cleanupThreads.length)
+          {
+            Thread cleanupThread = cleanupThreads[i];
+            if (cleanupThread != null)
+            {
+              if (!cleanupThread.isAlive())
+                cleanupThreads[i] = null;
+              else
+                isAlive = true;
+            }
+            i++;
+          }
+          if (!isAlive)
+            cleanupThreads = null;
+        }
+
         if (deleteStufferThread != null)
         {
           if (!deleteStufferThread.isAlive())