You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2011/01/10 03:35:42 UTC
svn commit: r1057076 [2/2] - in /incubator/lcf/trunk: ./
framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/
framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/
framework/pull-agent/src/main/java/org/apache/m...
Added: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java?rev=1057076&view=auto
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java (added)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java Mon Jan 10 02:35:42 2011
@@ -0,0 +1,430 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.crawler.system;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.agents.interfaces.*;
+import org.apache.manifoldcf.crawler.interfaces.*;
+import org.apache.manifoldcf.crawler.system.Logging;
+import java.util.*;
+import java.lang.reflect.*;
+
+/** This class represents a document cleanup thread. This thread's job is to pull document sets to be cleaned up off of
+* a queue, and kill them. It finishes a cleanup set by getting rid of the corresponding rows in the job queue, and from
+* carrydown, and from hopcount.
+*
+* There are very few decisions that this thread needs to make; essentially all the hard thought went into deciding
+* what documents to queue in the first place.
+*
+* The only caveat is that the ingestion API may not be accepting delete requests at the time that this thread wants it
+* to be able to accept them. In that case, it's acceptable for the thread to block until the ingestion service is
+* functioning again.
+*
+* Transactions are not much needed for this class; it simply needs to not fail to remove the appropriate jobqueue
+* table rows at the end of the delete.
+*/
+public class DocumentCleanupThread extends Thread
+{
+ public static final String _rcsid = "@(#)$Id$";
+
+
+ // Local data
+ protected String id;
+ // This is a reference to the static main document queue
+ protected DocumentCleanupQueue documentCleanupQueue;
+ /** Delete thread pool reset manager */
+ protected DocCleanupResetManager resetManager;
+ /** Queue tracker */
+ protected QueueTracker queueTracker;
+
+ /** Constructor.
+ *@param id is the worker thread id.
+ */
+ public DocumentCleanupThread(String id, DocumentCleanupQueue documentCleanupQueue, QueueTracker queueTracker, DocCleanupResetManager resetManager)
+ throws ManifoldCFException
+ {
+ super();
+ this.id = id;
+ this.documentCleanupQueue = documentCleanupQueue;
+ this.queueTracker = queueTracker;
+ this.resetManager = resetManager;
+ setName("Document cleanup thread '"+id+"'");
+ setDaemon(true);
+ }
+
+ public void run()
+ {
+ resetManager.registerMe();
+
+ try
+ {
+ // Create a thread context object.
+ IThreadContext threadContext = ThreadContextFactory.make();
+ IIncrementalIngester ingester = IncrementalIngesterFactory.make(threadContext);
+ IJobManager jobManager = JobManagerFactory.make(threadContext);
+ IRepositoryConnectionManager connMgr = RepositoryConnectionManagerFactory.make(threadContext);
+
+ // Loop
+ while (true)
+ {
+ // Do another try/catch around everything in the loop
+ try
+ {
+ if (Thread.currentThread().isInterrupted())
+ throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
+
+ // Before we begin, conditionally reset
+ resetManager.waitForReset(threadContext);
+
+ // Once we pull something off the queue, we MUST make sure that
+ // we update its status, even if there is an exception!!!
+
+ // See if there is anything on the queue for me
+ DocumentCleanupSet dds = documentCleanupQueue.getDocuments();
+ if (dds == null)
+ // It's a reset, so recycle
+ continue;
+
+ if (Thread.currentThread().isInterrupted())
+ throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
+
+ try
+ {
+ long currentTime = System.currentTimeMillis();
+
+ // We need to segregate all the documents by connection, in order to be able to form a decent activities object
+ // to pass into the incremental ingester. So, first pass through the document descriptions will build that.
+ Map mappedDocs = new HashMap();
+ int j = 0;
+ while (j < dds.getCount())
+ {
+ CleanupQueuedDocument dqd = dds.getDocument(j++);
+ DocumentDescription ddd = dqd.getDocumentDescription();
+ Long jobID = ddd.getJobID();
+ IJobDescription job = jobManager.load(jobID,true);
+ String connectionName = job.getConnectionName();
+ ArrayList list = (ArrayList)mappedDocs.get(connectionName);
+ if (list == null)
+ {
+ list = new ArrayList();
+ mappedDocs.put(connectionName,list);
+ }
+ list.add(dqd);
+ }
+
+ // Now, cycle through all represented connections.
+ // For each connection, construct the necessary pieces to do the deletion.
+ Iterator iter = mappedDocs.keySet().iterator();
+ while (iter.hasNext())
+ {
+ String connectionName = (String)iter.next();
+ ArrayList list = (ArrayList)mappedDocs.get(connectionName);
+
+ // Produce a map of connection name->connection object. We will use this to perform a request for multiple connector objects
+ IRepositoryConnection connection = connMgr.load(connectionName);
+ ArrayList arrayOutputConnectionNames = new ArrayList();
+ ArrayList arrayDocHashes = new ArrayList();
+ ArrayList arrayDocClasses = new ArrayList();
+ ArrayList arrayDocsToDelete = new ArrayList();
+ ArrayList arrayRelationshipTypes = new ArrayList();
+ ArrayList hopcountMethods = new ArrayList();
+ ArrayList connections = new ArrayList();
+ j = 0;
+ while (j < list.size())
+ {
+ CleanupQueuedDocument dqd = (CleanupQueuedDocument)list.get(j);
+ DocumentDescription ddd = dqd.getDocumentDescription();
+
+ Long jobID = ddd.getJobID();
+ IJobDescription job = jobManager.load(jobID,true);
+ if (job != null && connection != null)
+ {
+ // We'll need the legal link types; grab those before we proceed
+ String[] legalLinkTypes = RepositoryConnectorFactory.getRelationshipTypes(threadContext,connection.getClassName());
+ if (legalLinkTypes != null)
+ {
+ arrayOutputConnectionNames.add(job.getOutputConnectionName());
+ arrayDocClasses.add(connectionName);
+ arrayDocHashes.add(ddd.getDocumentIdentifierHash());
+ arrayDocsToDelete.add(dqd);
+ arrayRelationshipTypes.add(legalLinkTypes);
+ hopcountMethods.add(new Integer(job.getHopcountMode()));
+ }
+ }
+ j++;
+ }
+
+ // Next, segregate the documents by output connection name. This will permit logging to know what actual activity type to use.
+ HashMap outputMap = new HashMap();
+ j = 0;
+ while (j < arrayDocHashes.size())
+ {
+ String outputConnectionName = (String)arrayOutputConnectionNames.get(j);
+ ArrayList subList = (ArrayList)outputMap.get(outputConnectionName);
+ if (subList == null)
+ {
+ subList = new ArrayList();
+ outputMap.put(outputConnectionName,subList);
+ }
+ subList.add(new Integer(j));
+ j++;
+ }
+
+ // Grab one connection for each connectionName. If we fail, nothing is lost and retries are possible.
+ try
+ {
+ IRepositoryConnector connector = RepositoryConnectorFactory.grab(threadContext,connection.getClassName(),connection.getConfigParams(),connection.getMaxConnections());
+ try
+ {
+
+ // Iterate over the outputs
+ Iterator outputIterator = outputMap.keySet().iterator();
+ while (outputIterator.hasNext())
+ {
+ String outputConnectionName = (String)outputIterator.next();
+ ArrayList indexList = (ArrayList)outputMap.get(outputConnectionName);
+ // Count the number of docs to actually delete. This will be a subset of the documents in the list.
+ int k = 0;
+ int removeCount = 0;
+ while (k < indexList.size())
+ {
+ int index = ((Integer)indexList.get(k++)).intValue();
+ if (((CleanupQueuedDocument)arrayDocsToDelete.get(index)).shouldBeRemovedFromIndex())
+ removeCount++;
+ }
+
+ // Allocate removal arrays
+ String[] docClassesToRemove = new String[removeCount];
+ String[] hashedDocsToRemove = new String[removeCount];
+
+ // Now, iterate over the index list
+ k = 0;
+ removeCount = 0;
+ while (k < indexList.size())
+ {
+ int index = ((Integer)indexList.get(k)).intValue();
+ if (((CleanupQueuedDocument)arrayDocsToDelete.get(index)).shouldBeRemovedFromIndex())
+ {
+ docClassesToRemove[removeCount] = (String)arrayDocClasses.get(index);
+ hashedDocsToRemove[removeCount] = (String)arrayDocHashes.get(index);
+ removeCount++;
+ }
+ k++;
+ }
+
+ OutputRemoveActivity activities = new OutputRemoveActivity(connectionName,connMgr,outputConnectionName);
+
+ // Finally, go ahead and delete the documents from the ingestion system.
+
+ while (true)
+ {
+ try
+ {
+ ingester.documentDeleteMultiple(outputConnectionName,docClassesToRemove,hashedDocsToRemove,activities);
+ break;
+ }
+ catch (ServiceInterruption e)
+ {
+ // If we get a service interruption here, it means that the ingestion API is down.
+ // There is no point, therefore, in freeing up this thread to go do something else;
+ // might as well just wait here for our retries.
+ // Wait for the prescribed time
+ long amt = e.getRetryTime();
+ long now = System.currentTimeMillis();
+ long waittime = amt-now;
+ if (waittime <= 0L)
+ waittime = 300000L;
+ ManifoldCF.sleep(waittime);
+ }
+ }
+
+ // Successfully deleted some documents from ingestion system. Now, remove them from job queue. This
+ // must currently happen one document at a time, because the jobs and connectors for each document
+ // potentially differ.
+ k = 0;
+ while (k < indexList.size())
+ {
+ int index = ((Integer)indexList.get(k)).intValue();
+
+ DeleteQueuedDocument dqd = (DeleteQueuedDocument)arrayDocsToDelete.get(index);
+ DocumentDescription ddd = dqd.getDocumentDescription();
+ Long jobID = ddd.getJobID();
+ int hopcountMethod = ((Integer)hopcountMethods.get(index)).intValue();
+ String[] legalLinkTypes = (String[])arrayRelationshipTypes.get(index);
+ DocumentDescription[] requeueCandidates = jobManager.markDocumentDeleted(jobID,legalLinkTypes,ddd,hopcountMethod);
+ // Use the common method for doing the requeuing
+ ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,
+ connector,connection,queueTracker,currentTime);
+ // Finally, completed expiration of the document.
+ dqd.setProcessed();
+ k++;
+ }
+ }
+ }
+ finally
+ {
+ // Free up the reserved connector instance
+ RepositoryConnectorFactory.release(connector);
+ }
+ }
+ catch (ManifoldCFException e)
+ {
+ if (e.getErrorCode() == ManifoldCFException.REPOSITORY_CONNECTION_ERROR)
+ {
+ // This error can only come from grabbing the connections. So, if this occurs it means that
+ // all the documents we've been handed have to be stuffed back onto the queue for processing at a later time.
+ Logging.threads.warn("Document cleanup thread couldn't establish necessary connections, retrying later: "+e.getMessage(),e);
+
+ // Let the unprocessed documents get requeued! This is handled at the end of the loop...
+ }
+ else
+ throw e;
+ }
+ }
+ }
+ catch (ManifoldCFException e)
+ {
+ if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
+ break;
+
+ if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
+ throw e;
+
+ Logging.threads.error("Exception tossed: "+e.getMessage(),e);
+ }
+ finally
+ {
+ // Insure that the documents that were not deleted get restored to the proper state.
+ int j = 0;
+ while (j < dds.getCount())
+ {
+ DeleteQueuedDocument dqd = dds.getDocument(j);
+ if (dqd.wasProcessed() == false)
+ {
+ DocumentDescription ddd = dqd.getDocumentDescription();
+ // Requeue this document!
+ jobManager.resetCleaningDocument(ddd);
+ dqd.setProcessed();
+ }
+ j++;
+ }
+ }
+ }
+ catch (ManifoldCFException e)
+ {
+ if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
+ break;
+
+ if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
+ {
+ // Note the failure, which will cause a reset to occur
+ resetManager.noteEvent();
+ // Wake up all sleeping worker threads
+ documentCleanupQueue.reset();
+
+ Logging.threads.error("Document cleanup thread aborting and restarting due to database connection reset: "+e.getMessage(),e);
+ try
+ {
+ // Give the database a chance to catch up/wake up
+ ManifoldCF.sleep(10000L);
+ }
+ catch (InterruptedException se)
+ {
+ break;
+ }
+ continue;
+ }
+
+ // An exception occurred in the cleanup from another error.
+ // Log the error (but that's all we can do)
+ Logging.threads.error("Exception tossed: "+e.getMessage(),e);
+
+ }
+ catch (InterruptedException e)
+ {
+ // We're supposed to quit
+ break;
+ }
+ catch (OutOfMemoryError e)
+ {
+ System.err.println("agents process ran out of memory - shutting down");
+ e.printStackTrace(System.err);
+ System.exit(-200);
+ }
+ catch (Throwable e)
+ {
+ // A more severe error - but stay alive
+ Logging.threads.fatal("Error tossed: "+e.getMessage(),e);
+ }
+ }
+ }
+ catch (Throwable e)
+ {
+ // Severe error on initialization
+ System.err.println("agents process could not start - shutting down");
+ Logging.threads.fatal("DocumentCleanupThread "+id+" initialization error tossed: "+e.getMessage(),e);
+ System.exit(-300);
+ }
+ }
+
+
+ /** The ingest logger class */
+ protected static class OutputRemoveActivity implements IOutputRemoveActivity
+ {
+
+ // Connection name
+ protected String connectionName;
+ // Connection manager
+ protected IRepositoryConnectionManager connMgr;
+ // Output connection name
+ protected String outputConnectionName;
+
+ /** Constructor */
+ public OutputRemoveActivity(String connectionName, IRepositoryConnectionManager connMgr, String outputConnectionName)
+ {
+ this.connectionName = connectionName;
+ this.connMgr = connMgr;
+ this.outputConnectionName = outputConnectionName;
+ }
+
+ /** Record time-stamped information about the activity of the output connector.
+ *@param startTime is either null or the time since the start of epoch in milliseconds (Jan 1, 1970). Every
+ * activity has an associated time; the startTime field records when the activity began. A null value
+ * indicates that the start time and the finishing time are the same.
+ *@param activityType is a string which is fully interpretable only in the context of the connector involved, which is
+ * used to categorize what kind of activity is being recorded. For example, a web connector might record a
+ * "fetch document" activity. Cannot be null.
+ *@param dataSize is the number of bytes of data involved in the activity, or null if not applicable.
+ *@param entityURI is a (possibly long) string which identifies the object involved in the history record.
+ * The interpretation of this field will differ from connector to connector. May be null.
+ *@param resultCode contains a terse description of the result of the activity. The description is limited in
+ * size to 255 characters, and can be interpreted only in the context of the current connector. May be null.
+ *@param resultDescription is a (possibly long) human-readable string which adds detail, if required, to the result
+ * described in the resultCode field. This field is not meant to be queried on. May be null.
+ */
+ public void recordActivity(Long startTime, String activityType, Long dataSize,
+ String entityURI, String resultCode, String resultDescription)
+ throws ManifoldCFException
+ {
+ connMgr.recordHistory(connectionName,startTime,ManifoldCF.qualifyOutputActivityName(activityType,outputConnectionName),dataSize,entityURI,resultCode,
+ resultDescription,null);
+ }
+ }
+
+}
Propchange: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
------------------------------------------------------------------------------
svn:keywords = Id
Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java?rev=1057076&r1=1057075&r2=1057076&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java Mon Jan 10 02:35:42 2011
@@ -46,6 +46,8 @@ public class ManifoldCF extends org.apac
protected static ExpireThread[] expireThreads = null;
protected static DocumentDeleteStufferThread deleteStufferThread = null;
protected static DocumentDeleteThread[] deleteThreads = null;
+ protected static DocumentCleanupStufferThread cleanupStufferThread = null;
+ protected static DocumentCleanupThread[] cleanupThreads = null;
protected static JobResetThread jobResetThread = null;
protected static SeedingThread seedingThread = null;
protected static IdleCleanupThread idleCleanupThread = null;
@@ -56,11 +58,15 @@ public class ManifoldCF extends org.apac
protected static WorkerResetManager workerResetManager = null;
/** Delete thread pool reset manager */
protected static DocDeleteResetManager docDeleteResetManager = null;
+ /** Cleanup thread pool reset manager */
+ protected static DocCleanupResetManager docCleanupResetManager = null;
// Number of worker threads
protected static int numWorkerThreads = 0;
// Number of delete threads
protected static int numDeleteThreads = 0;
+ // Number of cleanup threads
+ protected static int numCleanupThreads = 0;
// Number of expiration threads
protected static int numExpireThreads = 0;
// Factor for low water level in queueing
@@ -70,6 +76,7 @@ public class ManifoldCF extends org.apac
protected static final String workerThreadCountProperty = "org.apache.manifoldcf.crawler.threads";
protected static final String deleteThreadCountProperty = "org.apache.manifoldcf.crawler.deletethreads";
+ protected static final String cleanupThreadCountProperty = "org.apache.manifoldcf.crawler.cleanupthreads";
protected static final String expireThreadCountProperty = "org.apache.manifoldcf.crawler.expirethreads";
protected static final String lowWaterFactorProperty = "org.apache.manifoldcf.crawler.lowwaterfactor";
protected static final String stuffAmtFactorProperty = "org.apache.manifoldcf.crawler.stuffamountfactor";
@@ -150,12 +157,18 @@ public class ManifoldCF extends org.apac
String maxDeleteThreads = getProperty(deleteThreadCountProperty);
if (maxDeleteThreads == null)
maxDeleteThreads = "10";
+ String maxCleanupThreads = getProperty(cleanupThreadCountProperty);
+ if (maxCleanupThreads == null)
+ maxCleanupThreads = "10";
String maxExpireThreads = getProperty(expireThreadCountProperty);
if (maxExpireThreads == null)
maxExpireThreads = "10";
numDeleteThreads = new Integer(maxDeleteThreads).intValue();
if (numDeleteThreads < 1 || numDeleteThreads > 300)
throw new ManifoldCFException("Illegal value for the number of delete threads");
+ numCleanupThreads = new Integer(maxCleanupThreads).intValue();
+ if (numCleanupThreads < 1 || numCleanupThreads > 300)
+ throw new ManifoldCFException("Illegal value for the number of cleanup threads");
numExpireThreads = new Integer(maxExpireThreads).intValue();
if (numExpireThreads < 1 || numExpireThreads > 300)
throw new ManifoldCFException("Illegal value for the number of expire threads");
@@ -180,12 +193,14 @@ public class ManifoldCF extends org.apac
DocumentQueue documentQueue = new DocumentQueue();
DocumentDeleteQueue documentDeleteQueue = new DocumentDeleteQueue();
+ DocumentCleanupQueue documentCleanupQueue = new DocumentCleanupQueue();
DocumentDeleteQueue expireQueue = new DocumentDeleteQueue();
BlockingDocuments blockingDocuments = new BlockingDocuments();
workerResetManager = new WorkerResetManager(documentQueue);
docDeleteResetManager = new DocDeleteResetManager(documentDeleteQueue);
+ docCleanupResetManager = new DocCleanupResetManager(documentCleanupQueue);
jobStartThread = new JobStartThread();
startupThread = new StartupThread(queueTracker);
@@ -220,6 +235,16 @@ public class ManifoldCF extends org.apac
deleteThreads[i] = new DocumentDeleteThread(Integer.toString(i),documentDeleteQueue,docDeleteResetManager);
i++;
}
+
+ cleanupStufferThread = new DocumentCleanupStufferThread(documentCleanupQueue,numCleanupThreads,docCleanupResetManager);
+ cleanupThreads = new DocumentCleanupThread[numCleanupThreads];
+ i = 0;
+ while (i < numCleanupThreads)
+ {
+ cleanupThreads[i] = new DocumentCleanupThread(Integer.toString(i),documentCleanupQueue,queueTracker,docCleanupResetManager);
+ i++;
+ }
+
jobResetThread = new JobResetThread(queueTracker);
seedingThread = new SeedingThread(queueTracker);
idleCleanupThread = new IdleCleanupThread();
@@ -311,6 +336,14 @@ public class ManifoldCF extends org.apac
i++;
}
+ cleanupStufferThread.start();
+ i = 0;
+ while (i < numCleanupThreads)
+ {
+ cleanupThreads[i].start();
+ i++;
+ }
+
deleteStufferThread.start();
i = 0;
while (i < numDeleteThreads)
@@ -351,6 +384,7 @@ public class ManifoldCF extends org.apac
while (initializationThread != null || jobDeleteThread != null || startupThread != null || jobStartThread != null || stufferThread != null ||
finisherThread != null || notificationThread != null || workerThreads != null || expireStufferThread != null | expireThreads != null ||
deleteStufferThread != null || deleteThreads != null ||
+ cleanupStufferThread != null || cleanupThreads != null ||
jobResetThread != null || seedingThread != null || idleCleanupThread != null || setPriorityThread != null)
{
// Send an interrupt to all threads that are still there.
@@ -412,6 +446,20 @@ public class ManifoldCF extends org.apac
expireThread.interrupt();
}
}
+ if (cleanupStufferThread != null)
+ {
+ cleanupStufferThread.interrupt();
+ }
+ if (cleanupThreads != null)
+ {
+ int i = 0;
+ while (i < cleanupThreads.length)
+ {
+ Thread cleanupThread = cleanupThreads[i++];
+ if (cleanupThread != null)
+ cleanupThread.interrupt();
+ }
+ }
if (deleteStufferThread != null)
{
deleteStufferThread.interrupt();
@@ -534,6 +582,31 @@ public class ManifoldCF extends org.apac
expireThreads = null;
}
+ if (cleanupStufferThread != null)
+ {
+ if (!cleanupStufferThread.isAlive())
+ cleanupStufferThread = null;
+ }
+ if (cleanupThreads != null)
+ {
+ int i = 0;
+ boolean isAlive = false;
+ while (i < cleanupThreads.length)
+ {
+ Thread cleanupThread = cleanupThreads[i];
+ if (cleanupThread != null)
+ {
+ if (!cleanupThread.isAlive())
+ cleanupThreads[i] = null;
+ else
+ isAlive = true;
+ }
+ i++;
+ }
+ if (!isAlive)
+ cleanupThreads = null;
+ }
+
if (deleteStufferThread != null)
{
if (!deleteStufferThread.isAlive())