You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-user@lucene.apache.org by Jamie Band <ja...@stimulussoft.com> on 2009/10/09 11:04:12 UTC
Re: Index.close() infinite TIME_WAITING (repost)
HI Michael / Uwe / others
Sorry for the repost... it just does not look like the earlier message I
sent go through.
FYI: there are no large Lucene merges taking place.
Jamie Band wrote:
> Hi Michael
>
> Thanks for your help. Here are the stacks:
>
> index processor [TIME_WAITING] CPU time: 33:01
> java.lang.Object.wait(long)
> org.apache.lucene.index.IndexWriter.doWait()
> org.apache.lucene.index.IndexWriter.shouldClose()
> org.apache.lucene.index.IndexWriter.close(boolean)
> org.apache.lucene.index.IndexWriter.close()
> com.stimulus.archiva.index.VolumeIndex.closeIndex()
> com.stimulus.archiva.index.VolumeIndex$IndexProcessor.run()
>
> The source code to our indexer is attached. As you can see, documents
> are added to a blocking queue. The index processor thread takes it out
> of the queue and processes it. After about 60k documents IndexWriter's
> close method enters TIME_WAITING indefinitely. It there any workaround
> to this problem?
>
>
> package com.stimulus.archiva.index;
>
> import java.io.File;
> import java.io.IOException;
> import java.io.PrintStream;
> import javax.mail.MessagingException;
> import org.apache.commons.logging.*;
> import org.apache.lucene.document.Document;
> import org.apache.lucene.index.*;
> import org.apache.lucene.store.FSDirectory;
> import com.stimulus.archiva.domain.Config;
> import com.stimulus.archiva.domain.Email;
> import com.stimulus.archiva.domain.EmailID;
> import com.stimulus.archiva.domain.Indexer;
> import com.stimulus.archiva.domain.Volume;
> import com.stimulus.archiva.exception.*;
> import com.stimulus.archiva.language.AnalyzerFactory;
> import com.stimulus.archiva.search.*;
> import java.util.*;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Executors;
> import java.util.concurrent.ScheduledExecutorService;
> import java.util.concurrent.ScheduledFuture;
> import java.util.concurrent.TimeUnit;
> import org.apache.lucene.store.LockObtainFailedException;
> import org.apache.lucene.store.AlreadyClosedException;
> import java.util.concurrent.locks.ReentrantLock;
> import java.util.concurrent.*;
>
> public class VolumeIndex extends Thread {
> protected ArrayBlockingQueue<IndexInfo> queue;
> protected static final Log logger =
> LogFactory.getLog(VolumeIndex.class.getName());
> IndexWriter writer = null;
> Volume volume;
> protected static ScheduledExecutorService scheduler;
> protected static ScheduledFuture<?> scheduledTask;
> protected static IndexInfo EXIT_REQ = new IndexInfo(null);
> ReentrantLock indexLock = new ReentrantLock();
> ArchivaAnalyzer analyzer = new ArchivaAnalyzer();
> Indexer indexer = null;
> File indexLogFile;
> PrintStream indexLogOut;
> IndexProcessor indexProcessor;
> public VolumeIndex(Indexer indexer, Volume
> volume) {
> logger.debug("creating new volume index {"+volume+"}");
> this.volume = volume;
> this.indexer = indexer;
> this.queue = new
> ArrayBlockingQueue<IndexInfo>(Config.getConfig().getIndex().getIndexBacklog());
>
> try {
> indexLogFile = getIndexLogFile(volume);
> if (indexLogFile!=null) {
> if (indexLogFile.length()>10485760)
> indexLogFile.delete();
> indexLogOut = new PrintStream(indexLogFile);
> }
> logger.debug("set index log file path
> {path='"+indexLogFile.getCanonicalPath()+"'}");
> } catch (Exception e) {
> logger.error("failed to open index log
> file:"+e.getMessage(),e);
> }
> startup();
> }
> protected File getIndexLogFile(Volume volume) {
> try {
> String indexpath = volume.getIndexPath();
> int lio = indexpath.lastIndexOf(File.separator)+1;
> String logfilepath =
> indexpath.substring(lio,indexpath.length()-1);
> logfilepath += ".log";
> logfilepath = "index_"+logfilepath;
> logfilepath =
> Config.getFileSystem().getLogPath()+File.separator+logfilepath;
> return new File(logfilepath);
> } catch (Exception e) {
> logger.error("failed to open index log
> file:"+e.getMessage(),e);
> return null;
> }
> }
> public void deleteMessages(List<String> ids) throws
> MessageSearchException {
> if (ids == null)
> throw new MessageSearchException("assertion
> failure: null ids",logger);
> Term[] terms = new Term[ids.size()];
> int c = 0;
> StringBuffer deleteInfo = new StringBuffer();
> for (String id : ids) {
> terms[c++] = new Term("uid",id);
> deleteInfo.append(id);
> deleteInfo.append(",");
> }
> String deleteStr = deleteInfo.toString();
> if (deleteStr.length()>0 &&
> deleteStr.charAt(deleteStr.length()-1)==',')
> deleteStr = deleteStr.substring(0,deleteStr.length()-1);
> logger.debug("delete messages
> {'"+deleteInfo+"'}");
> try {
> indexLock.lock();
> openIndex();
> try {
> writer.deleteDocuments(terms);
> writer.expungeDeletes();
> } catch (Exception e) {
> throw new MessageSearchException("failed to
> delete email from index.",e,logger);
> } finally {
> }
> } finally {
> closeIndex();
> indexLock.unlock();
> }
> }
> protected void openIndex() throws MessageSearchException {
> Exception lastError = null;
> if (writer==null) {
> logger.debug("openIndex() index will be opened. it is
> currently closed.");
> } else {
> logger.debug("openIndex() did not bother opening index.
> it is already open.");
> return;
> }
> logger.debug("opening index for write {"+volume+"}");
> indexer.prepareIndex(volume);
> logger.debug("opening search index for write
> {indexpath='"+volume.getIndexPath()+"'}");
> boolean writelock;
> int attempt = 0;
> int maxattempt = 10;
> if
> (Config.getConfig().getIndex().getMultipleIndexProcesses()) {
> maxattempt = 10000;
> } else {
> maxattempt = 10;
> }
> do {
> writelock = false;
> try {
> FSDirectory fsDirectory =
> FSDirectory.getDirectory(volume.getIndexPath());
> int maxIndexChars =
> Config.getConfig().getIndex().getMaxIndexPerFieldChars();
> writer = new
> IndexWriter(fsDirectory,analyzer,new
> IndexWriter.MaxFieldLength(maxIndexChars));
> if (logger.isDebugEnabled() &&
> indexLogOut!=null) {
> writer.setInfoStream(indexLogOut);
> }
> } catch (LockObtainFailedException lobfe) {
> logger.debug("write lock on index. will reopen
> in 50ms.");
> try { Thread.sleep(50); } catch (Exception e) {}
> attempt++;
> writelock = true;
> } catch (CorruptIndexException cie) {
> throw new MessageSearchException("index appears to
> be corrupt. please reindex the active volume."+cie.getMessage(),logger);
> } catch (IOException io) {
> throw new MessageSearchException("failed to write
> document to index:"+io.getMessage(),logger);
> }
> } while (writelock && attempt<maxattempt);
> if (attempt>=10000)
> throw new MessageSearchException("failed to open index
> writer {location='"+volume.getIndexPath()+"'}",lastError,logger);
> }
> public void indexMessage(Email message) throws
> MessageSearchException {
> logger.debug("index message {"+message+"}");
> long s = (new Date()).getTime();
> if (message == null)
> throw new MessageSearchException("assertion failure:
> null message",logger);
> Document doc = new Document();
> IndexInfo indexInfo = new IndexInfo(doc);
> try {
> DocumentIndex docIndex = new DocumentIndex(indexer);
> String language = doc.get("lang");
> if (language==null)
> language = indexer.getIndexLanguage();
> docIndex.write(message,doc,indexInfo);
> queue.put(indexInfo);
> logger.debug("message indexed successfully
> {"+message+",language='"+language+"'}");
> } catch (MessagingException me) {
> throw new MessageSearchException("failed to decode
> message during indexing",me,logger, ChainedException.Level.DEBUG);
> } catch (IOException me) {
> throw new MessageSearchException("failed to index
> message"+me.getMessage()+" {"+message+"}",me,logger,
> ChainedException.Level.DEBUG);
> } catch (ExtractionException ee)
> {
> // we will want to continue indexing
> //throw new MessageSearchException("failed to decode
> attachments in message {"+message+"}",ee,logger,
> ChainedException.Level.DEBUG);
> } catch (AlreadyClosedException ace) {
> indexMessage(message);
> } catch (Throwable e) {
> throw new MessageSearchException("failed to index
> message:"+e.getMessage(),e,logger, ChainedException.Level.DEBUG);
> }
> logger.debug("indexing message end {"+message+"}");
> long e = (new Date()).getTime();
> logger.debug("indexing time {time='"+(e-s)+"'}");
> }
> public class IndexProcessor extends Thread {
> public IndexProcessor() {
> setName("index processor");
> }
> public void run() {
> boolean exit = false;
> //ExecutorService documentPool;
> // we abandoned pool as it does not seem to offer any
> major performance benefit
> IndexInfo indexInfo = null;
> LinkedList<IndexInfo> pushbacks = new LinkedList<IndexInfo>();
> while (!exit) {
> try {
> int maxIndexDocs =
> Config.getConfig().getIndex().getMaxSimultaneousDocs();
> //documentPool =
> Executors.newFixedThreadPool(Config.getConfig().getArchiver().getArchiveThreads());
>
> indexInfo = null;
> indexInfo = (IndexInfo) queue.take();
> if (indexInfo==EXIT_REQ) {
> logger.debug("index exit req received.
> exiting");
> exit = true;
> continue;
> }
> indexLock.lock();
> try {
> openIndex();
> } catch (Exception e) {
> logger.error("failed to open
> index:"+e.getMessage(),e);
> return;
> }
> if (indexInfo==null) {
> logger.debug("index info is null");
> }
> int i = 0;
> while(indexInfo!=null && i<maxIndexDocs) {
> try {
>
> writer.addDocument(indexInfo.getDocument());
> } catch (IOException io) {
> logger.error("failed to add document to
> index:"+io.getMessage(),io);
> } catch (AlreadyClosedException e) {
> pushbacks.add(indexInfo);
> } finally {
> indexInfo.cleanup();
> }
>
> //documentPool.execute(new IndexDocument(indexInfo,pushbacks));
> i++;
> if
> (i<maxIndexDocs) {
> indexInfo = (IndexInfo) queue.poll();
> if
> (indexInfo==null) {
> logger.debug("index info is
> null");
> }
> if
> (indexInfo==EXIT_REQ) {
> logger.debug("index exit req
> received. exiting (2)");
> exit = true;
> break;
> }
> }
> }
> for (IndexInfo pushback :
> pushbacks) {
> try {
>
> writer.addDocument(pushback.getDocument());
> } catch (IOException io) {
> logger.error("failed to add document to
> index:"+io.getMessage(),io);
> } catch (AlreadyClosedException e) {
> pushbacks.add(indexInfo);
> } finally {
> indexInfo.cleanup();
> }
> //documentPool.execute(new
> IndexDocument(pushback,pushbacks));
> i++;
> }
> //documentPool.shutdown();
>
> //documentPool.awaitTermination(30,TimeUnit.MINUTES);
> } catch (Throwable ie) {
> logger.error("index write
> interrupted:"+ie.getMessage());
> } finally {
> closeIndex();
> indexLock.unlock();
> }
> } }
> public class IndexDocument extends Thread {
> IndexInfo indexInfo = null;
> List<IndexInfo> pushbacks = null;
> public IndexDocument(IndexInfo
> indexInfo,List<IndexInfo> pushbacks) {
> this.indexInfo = indexInfo;
> this.pushbacks = pushbacks;
> setName("index document");
> }
> public void run() {
> try {
> writer.addDocument(indexInfo.getDocument());
> } catch (IOException io) {
> logger.error("failed to add document to
> index:"+io.getMessage(),io);
> } catch (AlreadyClosedException e) {
> pushbacks.add(indexInfo);
> }
> }};
> }
> protected void closeIndex() {
> try {
> if (writer!=null) {
> writer.close();
> logger.debug("writer closed");
> writer = null;
> }
> } catch (Exception io) {
> logger.error("failed to close index
> writer:"+io.getMessage(),io);
> }
> }
> public void deleteIndex() throws MessageSearchException {
> logger.debug("delete index
> {indexpath='"+volume.getIndexPath()+"'}");
> try {
> indexLock.lock();
> try {
> int maxIndexChars =
> Config.getConfig().getIndex().getMaxIndexPerFieldChars();
> writer = new
> IndexWriter(FSDirectory.getDirectory(volume.getIndexPath()),analyzer,true,new
> IndexWriter.MaxFieldLength(maxIndexChars));
> } catch (Exception cie) {
> logger.error("failed to delete index
> {index='"+volume.getIndexPath()+"'}",cie);
> return;
> }
> MessageIndex.volumeIndexes.remove(this);
> } finally {
> closeIndex();
> indexLock.unlock();
> }
> }
> public void startup() {
> logger.debug("volumeindex is starting up");
> File lockFile = new
> File(volume.getIndexPath()+File.separatorChar + "write.lock");
> if (lockFile.exists()) {
> logger.warn("The server lock file already exists.
> Either another indexer is running or the server was not shutdown
> correctly.");
> logger.warn("If it is the latter, the lock file must be
> manually deleted at "+lockFile.getAbsolutePath());
> if (indexer.getMultipleIndexProcesses()) {
> logger.debug("index lock file detected on
> volumeindex startup.");
> } else {
> logger.warn("index lock file detected. the server
> was shutdown incorrectly. automatically deleting lock file.");
> logger.warn("indexer is configured to deal with
> only one indexer process.");
> logger.warn("if you are running more than one
> indexer, your index could be subject to corruption.");
> lockFile.delete();
> }
> }
> indexProcessor = new IndexProcessor();
> indexProcessor.start();
> Runtime.getRuntime().addShutdownHook(this);
> }
> public void shutdown() {
> logger.debug("volumeindex is shutting down");
> queue.add(EXIT_REQ);
> scheduler.shutdownNow();
> }
> @Override
> public void run() {
> queue.add(EXIT_REQ);
> }
> }
>
>
>
>
> Is it possible a large merge is running? By default IW.close waits
> for outstanding merges to complete. Can you post the stacktrace?
>
> Mike
>
> On Thu, Oct 8, 2009 at 5:22 PM, Jamie Band <ja...@stimulussoft.com>
> wrote:
>> Hi All
>>
>> I have a long running situation where our indexing thread is getting
>> stuck
>> indefinitely in IndexWriter's close method. Yourkit shows the thread
>> to be
>> stuck in TIME_WAITING. Any idea's on what could be causing this?
>> Could it be one of the streams or readers we passed to the document?
>>
>> I am running Lucene 2.9.0.
>>
>> Many thanks in advance
>>
>> Jamie
---------------------------------------------------------------------
To unsubscribe, e-mail: java-user-unsubscribe@lucene.apache.org
For additional commands, e-mail: java-user-help@lucene.apache.org
Re: Index.close() infinite TIME_WAITING (repost)
Posted by Jamie Band <ja...@stimulussoft.com>.
Hi Mike
There are other threads involved but none are simultaneously modifying
the idex.
There is one read that retrieves the total count every 2 seconds on the
Index for GUI display:
public long getTotalMessageCount(Volume volume) throws
MessageSearchException {
if (volume == null)
throw new MessageSearchException("assertion failure:
null volume",logger);
int count = 0;
File indexDir = new File(volume.getIndexPath());
if (!indexDir.exists())
return 0;
IndexReader indexReader = null;
try {
indexReader = IndexReader.open(indexDir);
count += indexReader.numDocs();
} catch (IOException e ) {
logger.error("failed to close index to calculate total
count",e);
} finally {
try { indexReader.close(); } catch (Exception e) {
logger.error("failed to close index to calculate
total count",e);
}
}
return count;
}
There are also other threads for performing searches across the various
indexes.
I will rerun the reindex and send you the log.
With the availability of Lucene 2.9.0 I changed our indexing routines,
so perhaps its an issue with my code. One thing I've already been
curious about is how it is possible to receive the
AlreadyClosedException when there is no
other code writing to the index due to the indexLock. Very occasionally
an AlreadyClosedException is thrown while adding a document to the
index. To ensure that nothing is list, a separate retry queue is maintained.
Any you spot anything suspect in the below? I would also be interested
to get your thoughts on how to improve indexing speed. Using a thread
pool to addDocuments to the index does not seem to help much.
public class IndexProcessor extends Thread {
public IndexProcessor() {
setName("index processor");
}
public void run() {
boolean exit = false;
//ExecutorService documentPool;
// we abandoned pool as it does not seem to offer any
major performance benefit
IndexInfo indexInfo = null;
LinkedList<IndexInfo> pushbacks = new
LinkedList<IndexInfo>();
while (!exit) {
try {
int maxIndexDocs =
Config.getConfig().getIndex().getMaxSimultaneousDocs();
//documentPool =
Executors.newFixedThreadPool(Config.getConfig().getArchiver().getArchiveThreads());
indexInfo = null;
indexInfo = (IndexInfo) queue.take();
if (indexInfo==EXIT_REQ) {
logger.debug("index exit req received.
exiting");
exit = true;
continue;
}
indexLock.lock();
try {
openIndex();
} catch (Exception e) {
logger.error("failed to open
index:"+e.getMessage(),e);
return;
}
if (indexInfo==null) {
logger.debug("index info is null");
}
int i = 0;
while(indexInfo!=null && i<maxIndexDocs) {
try {
writer.addDocument(indexInfo.getDocument());
indexInfo.cleanup();
} catch (IOException io) {
logger.error("failed to add document to
index:"+io.getMessage(),io);
indexInfo.cleanup();
} catch (AlreadyClosedException e) {
pushbacks.add(indexInfo);
}
//documentPool.execute(new
IndexDocument(indexInfo,pushbacks));
i++;
if (i<maxIndexDocs) {
indexInfo = (IndexInfo) queue.poll();
if (indexInfo==null) {
logger.debug("index info is null");
}
if (indexInfo==EXIT_REQ) {
logger.debug("index exit req
received. exiting (2)");
exit = true;
break;
}
}
}
if (pushbacks.size()>0) {
closeIndex();
try {
openIndex();
} catch (Exception e) {
logger.error("failed to open
index:"+e.getMessage(),e);
return;
}
for (IndexInfo pushback : pushbacks) {
try {
writer.addDocument(pushback.getDocument());
pushback.cleanup();
} catch (IOException io) {
logger.error("failed to add
document to index:"+io.getMessage(),io);
pushback.cleanup();
} catch (AlreadyClosedException e) {
pushbacks.add(pushback);
}
//documentPool.execute(new
IndexDocument(pushback,pushbacks));
i++;
}
}
//documentPool.shutdown();
//documentPool.awaitTermination(30,TimeUnit.MINUTES);
} catch (Throwable ie) {
logger.error("index write
interrupted:"+ie.getMessage());
} finally {
closeIndex();
indexLock.unlock();
}
}
}
Many thanks
Jamie
Michael McCandless wrote:
> Are there other threads involved, besides the one hung in close? Can
> you post their stack traces?
>
> This stack trace seems to indicate that IW believes another thread is
> in the process of closing.
>
> Can you call IndexWriter.setInfoStream and post the output leading to the hang?
>
> Mike
>
>
---------------------------------------------------------------------
To unsubscribe, e-mail: java-user-unsubscribe@lucene.apache.org
For additional commands, e-mail: java-user-help@lucene.apache.org
Re: Index.close() infinite TIME_WAITING (repost)
Posted by Michael McCandless <lu...@mikemccandless.com>.
Were there any exceptions inside Lucene, before the hang?
The fact that you're hitting AlreadyClosedException is a spooky sign
-- that means IW thinks you had in fact closed the writer, but then
used it again.
For increasing indexing throughput, I'd start here:
http://wiki.apache.org/lucene-java/ImproveIndexingSpeed
Can you whittle down your code to a smaller test that still shows the hang?
Mike
On Fri, Oct 9, 2009 at 6:05 AM, Jamie Band <ja...@stimulussoft.com> wrote:
> Hi Mike
>
> There are other threads involved but none are simultaneously modifying the
> index.
>
> There is one thread that retrieves the total count every 2 seconds on the
> index for GUI display:
>
> public long getTotalMessageCount(Volume volume) throws
> MessageSearchException {
> if (volume == null)
> throw new MessageSearchException("assertion failure: null
> volume",logger);
> int count = 0;
> File indexDir = new File(volume.getIndexPath());
> if (!indexDir.exists())
> return 0;
> IndexReader indexReader = null;
> try {
> indexReader = IndexReader.open(indexDir);
> count += indexReader.numDocs();
> } catch (IOException e ) {
> logger.error("failed to close index to calculate total
> count",e);
> } finally {
> try { indexReader.close(); } catch (Exception e) {
> logger.error("failed to close index to calculate total
> count",e);
> }
> }
> return count;
> }
>
> There are also other threads for performing searches across the various
> indexes.
>
> I will rerun the reindex and send you the log.
>
> With the availability of Lucene 2.9.0 I changed our indexing routines, so
> perhaps its an issue with my code. One thing I've already been curious about
> is how it is possible to receive the AlreadyClosedException when there is no
> other code writing to the index due to the indexLock. Very occasionally an
> AlreadyClosedException is thrown while adding a document to the index. To
> ensure that nothing is list, a separate retry queue is maintained.
>
> Any you spot anything suspect in the below? I would also be interested to
> get your thoughts on how to improve indexing speed. Using a thread pool to
> addDocuments to the index does not seem to help much.
>
>
> public class IndexProcessor extends Thread {
> public IndexProcessor() {
> setName("index processor");
> }
> public void run() {
> boolean exit = false;
> //ExecutorService documentPool;
> // we abandoned pool as it does not seem to offer any major
> performance benefit
> IndexInfo indexInfo = null;
> LinkedList<IndexInfo> pushbacks = new LinkedList<IndexInfo>();
> while (!exit) {
> try {
> int maxIndexDocs =
> Config.getConfig().getIndex().getMaxSimultaneousDocs();
> //documentPool =
> Executors.newFixedThreadPool(Config.getConfig().getArchiver().getArchiveThreads());
> indexInfo = null; indexInfo =
> (IndexInfo) queue.take();
> if (indexInfo==EXIT_REQ) {
> logger.debug("index exit req received. exiting");
> exit = true;
> continue;
> }
> indexLock.lock();
> try {
> openIndex();
> } catch (Exception e) {
> logger.error("failed to open
> index:"+e.getMessage(),e);
> return;
> }
> if (indexInfo==null) {
> logger.debug("index info is null");
> }
> int i = 0;
> while(indexInfo!=null && i<maxIndexDocs) {
> try {
> writer.addDocument(indexInfo.getDocument());
> indexInfo.cleanup();
> } catch (IOException io) {
> logger.error("failed to add document to
> index:"+io.getMessage(),io);
> indexInfo.cleanup();
> } catch (AlreadyClosedException e) {
> pushbacks.add(indexInfo);
> }
> //documentPool.execute(new
> IndexDocument(indexInfo,pushbacks));
> i++;
> if (i<maxIndexDocs) {
> indexInfo = (IndexInfo) queue.poll();
> if
> (indexInfo==null) {
> logger.debug("index info is null");
> }
> if
> (indexInfo==EXIT_REQ) {
> logger.debug("index exit req
> received. exiting (2)");
> exit = true;
> break;
> }
> }
> }
> if (pushbacks.size()>0) {
> closeIndex();
> try {
> openIndex();
> } catch (Exception e) {
> logger.error("failed to open
> index:"+e.getMessage(),e);
> return;
> }
> for (IndexInfo pushback : pushbacks) {
> try {
>
> writer.addDocument(pushback.getDocument());
> pushback.cleanup();
> } catch (IOException io) {
> logger.error("failed to add document
> to index:"+io.getMessage(),io);
> pushback.cleanup();
> } catch (AlreadyClosedException e) {
> pushbacks.add(pushback);
> }
> //documentPool.execute(new
> IndexDocument(pushback,pushbacks));
> i++;
> }
> }
> //documentPool.shutdown();
> //documentPool.awaitTermination(30,TimeUnit.MINUTES);
> } catch (Throwable ie) {
> logger.error("index write
> interrupted:"+ie.getMessage());
> } finally {
> closeIndex();
> indexLock.unlock();
> }
> } }
>
> Many thanks
>
> Jamie
>
> Michael McCandless wrote:
>>
>> Are there other threads involved, besides the one hung in close? Can
>> you post their stack traces?
>>
>> This stack trace seems to indicate that IW believes another thread is
>> in the process of closing.
>>
>> Can you call IndexWriter.setInfoStream and post the output leading to the
>> hang?
>>
>> Mike
>>
>>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: java-user-unsubscribe@lucene.apache.org
> For additional commands, e-mail: java-user-help@lucene.apache.org
>
>
---------------------------------------------------------------------
To unsubscribe, e-mail: java-user-unsubscribe@lucene.apache.org
For additional commands, e-mail: java-user-help@lucene.apache.org
Re: Index.close() infinite TIME_WAITING (repost)
Posted by Jamie Band <ja...@stimulussoft.com>.
Hi Mike
There are other threads involved but none are simultaneously modifying
the index.
There is one thread that retrieves the total count every 2 seconds on
the index for GUI display:
public long getTotalMessageCount(Volume volume) throws
MessageSearchException {
if (volume == null)
throw new MessageSearchException("assertion failure:
null volume",logger);
int count = 0;
File indexDir = new File(volume.getIndexPath());
if (!indexDir.exists())
return 0;
IndexReader indexReader = null;
try {
indexReader = IndexReader.open(indexDir);
count += indexReader.numDocs();
} catch (IOException e ) {
logger.error("failed to close index to calculate total
count",e);
} finally {
try { indexReader.close(); } catch (Exception e) {
logger.error("failed to close index to calculate
total count",e);
}
}
return count;
}
There are also other threads for performing searches across the various
indexes.
I will rerun the reindex and send you the log.
With the availability of Lucene 2.9.0 I changed our indexing routines,
so perhaps its an issue with my code. One thing I've already been
curious about is how it is possible to receive the
AlreadyClosedException when there is no
other code writing to the index due to the indexLock. Very occasionally
an AlreadyClosedException is thrown while adding a document to the
index. To ensure that nothing is list, a separate retry queue is maintained.
Any you spot anything suspect in the below? I would also be interested
to get your thoughts on how to improve indexing speed. Using a thread
pool to addDocuments to the index does not seem to help much.
public class IndexProcessor extends Thread {
public IndexProcessor() {
setName("index processor");
}
public void run() {
boolean exit = false;
//ExecutorService documentPool;
// we abandoned pool as it does not seem to offer any
major performance benefit
IndexInfo indexInfo = null;
LinkedList<IndexInfo> pushbacks = new
LinkedList<IndexInfo>();
while (!exit) {
try {
int maxIndexDocs =
Config.getConfig().getIndex().getMaxSimultaneousDocs();
//documentPool =
Executors.newFixedThreadPool(Config.getConfig().getArchiver().getArchiveThreads());
indexInfo = null;
indexInfo = (IndexInfo) queue.take();
if (indexInfo==EXIT_REQ) {
logger.debug("index exit req received.
exiting");
exit = true;
continue;
}
indexLock.lock();
try {
openIndex();
} catch (Exception e) {
logger.error("failed to open
index:"+e.getMessage(),e);
return;
}
if (indexInfo==null) {
logger.debug("index info is null");
}
int i = 0;
while(indexInfo!=null && i<maxIndexDocs) {
try {
writer.addDocument(indexInfo.getDocument());
indexInfo.cleanup();
} catch (IOException io) {
logger.error("failed to add document to
index:"+io.getMessage(),io);
indexInfo.cleanup();
} catch (AlreadyClosedException e) {
pushbacks.add(indexInfo);
}
//documentPool.execute(new
IndexDocument(indexInfo,pushbacks));
i++;
if (i<maxIndexDocs) {
indexInfo = (IndexInfo) queue.poll();
if (indexInfo==null) {
logger.debug("index info is null");
}
if (indexInfo==EXIT_REQ) {
logger.debug("index exit req
received. exiting (2)");
exit = true;
break;
}
}
}
if (pushbacks.size()>0) {
closeIndex();
try {
openIndex();
} catch (Exception e) {
logger.error("failed to open
index:"+e.getMessage(),e);
return;
}
for (IndexInfo pushback : pushbacks) {
try {
writer.addDocument(pushback.getDocument());
pushback.cleanup();
} catch (IOException io) {
logger.error("failed to add
document to index:"+io.getMessage(),io);
pushback.cleanup();
} catch (AlreadyClosedException e) {
pushbacks.add(pushback);
}
//documentPool.execute(new
IndexDocument(pushback,pushbacks));
i++;
}
}
//documentPool.shutdown();
//documentPool.awaitTermination(30,TimeUnit.MINUTES);
} catch (Throwable ie) {
logger.error("index write
interrupted:"+ie.getMessage());
} finally {
closeIndex();
indexLock.unlock();
}
}
}
Many thanks
Jamie
Michael McCandless wrote:
> Are there other threads involved, besides the one hung in close? Can
> you post their stack traces?
>
> This stack trace seems to indicate that IW believes another thread is
> in the process of closing.
>
> Can you call IndexWriter.setInfoStream and post the output leading to the hang?
>
> Mike
>
>
---------------------------------------------------------------------
To unsubscribe, e-mail: java-user-unsubscribe@lucene.apache.org
For additional commands, e-mail: java-user-help@lucene.apache.org
Re: Index.close() infinite TIME_WAITING (repost)
Posted by Michael McCandless <lu...@mikemccandless.com>.
Are there other threads involved, besides the one hung in close? Can
you post their stack traces?
This stack trace seems to indicate that IW believes another thread is
in the process of closing.
Can you call IndexWriter.setInfoStream and post the output leading to the hang?
Mike
On Fri, Oct 9, 2009 at 5:04 AM, Jamie Band <ja...@stimulussoft.com> wrote:
> HI Michael / Uwe / others
>
> Sorry for the repost... it just does not look like the earlier message I
> sent go through.
> FYI: there are no large Lucene merges taking place.
>
> Jamie Band wrote:
>>
>> Hi Michael
>>
>> Thanks for your help. Here are the stacks:
>>
>> index processor [TIME_WAITING] CPU time: 33:01
>> java.lang.Object.wait(long)
>> org.apache.lucene.index.IndexWriter.doWait()
>> org.apache.lucene.index.IndexWriter.shouldClose()
>> org.apache.lucene.index.IndexWriter.close(boolean)
>> org.apache.lucene.index.IndexWriter.close()
>> com.stimulus.archiva.index.VolumeIndex.closeIndex()
>> com.stimulus.archiva.index.VolumeIndex$IndexProcessor.run()
>>
>> The source code to our indexer is attached. As you can see, documents are
>> added to a blocking queue. The index processor thread takes it out of the
>> queue and processes it. After about 60k documents IndexWriter's close method
>> enters TIME_WAITING indefinitely. It there any workaround to this problem?
>>
>>
>> package com.stimulus.archiva.index;
>>
>> import java.io.File;
>> import java.io.IOException;
>> import java.io.PrintStream;
>> import javax.mail.MessagingException;
>> import org.apache.commons.logging.*;
>> import org.apache.lucene.document.Document;
>> import org.apache.lucene.index.*;
>> import org.apache.lucene.store.FSDirectory;
>> import com.stimulus.archiva.domain.Config;
>> import com.stimulus.archiva.domain.Email;
>> import com.stimulus.archiva.domain.EmailID;
>> import com.stimulus.archiva.domain.Indexer;
>> import com.stimulus.archiva.domain.Volume;
>> import com.stimulus.archiva.exception.*;
>> import com.stimulus.archiva.language.AnalyzerFactory;
>> import com.stimulus.archiva.search.*;
>> import java.util.*;
>> import java.util.concurrent.ExecutorService;
>> import java.util.concurrent.Executors;
>> import java.util.concurrent.ScheduledExecutorService;
>> import java.util.concurrent.ScheduledFuture;
>> import java.util.concurrent.TimeUnit;
>> import org.apache.lucene.store.LockObtainFailedException;
>> import org.apache.lucene.store.AlreadyClosedException;
>> import java.util.concurrent.locks.ReentrantLock;
>> import java.util.concurrent.*;
>>
>> public class VolumeIndex extends Thread {
>> protected ArrayBlockingQueue<IndexInfo> queue;
>> protected static final Log logger =
>> LogFactory.getLog(VolumeIndex.class.getName());
>> IndexWriter writer = null;
>> Volume volume;
>> protected static ScheduledExecutorService scheduler;
>> protected static ScheduledFuture<?> scheduledTask;
>> protected static IndexInfo EXIT_REQ = new IndexInfo(null);
>> ReentrantLock indexLock = new ReentrantLock();
>> ArchivaAnalyzer analyzer = new ArchivaAnalyzer();
>> Indexer indexer = null;
>> File indexLogFile;
>> PrintStream indexLogOut;
>> IndexProcessor indexProcessor;
>> public VolumeIndex(Indexer indexer, Volume volume)
>> {
>> logger.debug("creating new volume index {"+volume+"}");
>> this.volume = volume;
>> this.indexer = indexer;
>> this.queue = new
>> ArrayBlockingQueue<IndexInfo>(Config.getConfig().getIndex().getIndexBacklog());
>> try {
>> indexLogFile = getIndexLogFile(volume);
>> if (indexLogFile!=null) {
>> if (indexLogFile.length()>10485760)
>> indexLogFile.delete();
>> indexLogOut = new PrintStream(indexLogFile);
>> }
>> logger.debug("set index log file path
>> {path='"+indexLogFile.getCanonicalPath()+"'}");
>> } catch (Exception e) {
>> logger.error("failed to open index log
>> file:"+e.getMessage(),e);
>> }
>> startup();
>> }
>> protected File getIndexLogFile(Volume volume) {
>> try {
>> String indexpath = volume.getIndexPath();
>> int lio = indexpath.lastIndexOf(File.separator)+1;
>> String logfilepath =
>> indexpath.substring(lio,indexpath.length()-1);
>> logfilepath += ".log";
>> logfilepath = "index_"+logfilepath;
>> logfilepath =
>> Config.getFileSystem().getLogPath()+File.separator+logfilepath;
>> return new File(logfilepath);
>> } catch (Exception e) {
>> logger.error("failed to open index log
>> file:"+e.getMessage(),e);
>> return null;
>> }
>> }
>> public void deleteMessages(List<String> ids) throws
>> MessageSearchException {
>> if (ids == null)
>> throw new MessageSearchException("assertion failure:
>> null ids",logger);
>> Term[] terms = new Term[ids.size()];
>> int c = 0;
>> StringBuffer deleteInfo = new StringBuffer();
>> for (String id : ids) {
>> terms[c++] = new Term("uid",id);
>> deleteInfo.append(id);
>> deleteInfo.append(",");
>> }
>> String deleteStr = deleteInfo.toString();
>> if (deleteStr.length()>0 &&
>> deleteStr.charAt(deleteStr.length()-1)==',')
>> deleteStr = deleteStr.substring(0,deleteStr.length()-1);
>> logger.debug("delete messages
>> {'"+deleteInfo+"'}");
>> try {
>> indexLock.lock();
>> openIndex();
>> try {
>> writer.deleteDocuments(terms);
>> writer.expungeDeletes();
>> } catch (Exception e) {
>> throw new MessageSearchException("failed to delete
>> email from index.",e,logger);
>> } finally {
>> }
>> } finally {
>> closeIndex();
>> indexLock.unlock();
>> }
>> }
>> protected void openIndex() throws MessageSearchException {
>> Exception lastError = null;
>> if (writer==null) {
>> logger.debug("openIndex() index will be opened. it is
>> currently closed.");
>> } else {
>> logger.debug("openIndex() did not bother opening index. it
>> is already open.");
>> return;
>> }
>> logger.debug("opening index for write {"+volume+"}");
>> indexer.prepareIndex(volume);
>> logger.debug("opening search index for write
>> {indexpath='"+volume.getIndexPath()+"'}");
>> boolean writelock;
>> int attempt = 0;
>> int maxattempt = 10;
>> if
>> (Config.getConfig().getIndex().getMultipleIndexProcesses()) {
>> maxattempt = 10000;
>> } else {
>> maxattempt = 10;
>> }
>> do {
>> writelock = false;
>> try {
>> FSDirectory fsDirectory =
>> FSDirectory.getDirectory(volume.getIndexPath());
>> int maxIndexChars =
>> Config.getConfig().getIndex().getMaxIndexPerFieldChars();
>> writer = new IndexWriter(fsDirectory,analyzer,new
>> IndexWriter.MaxFieldLength(maxIndexChars));
>> if (logger.isDebugEnabled() && indexLogOut!=null) {
>> writer.setInfoStream(indexLogOut);
>> }
>> } catch (LockObtainFailedException lobfe) {
>> logger.debug("write lock on index. will reopen in
>> 50ms.");
>> try { Thread.sleep(50); } catch (Exception e) {}
>> attempt++;
>> writelock = true;
>> } catch (CorruptIndexException cie) {
>> throw new MessageSearchException("index appears to be
>> corrupt. please reindex the active volume."+cie.getMessage(),logger);
>> } catch (IOException io) {
>> throw new MessageSearchException("failed to write
>> document to index:"+io.getMessage(),logger);
>> }
>> } while (writelock && attempt<maxattempt);
>> if (attempt>=10000)
>> throw new MessageSearchException("failed to open index writer
>> {location='"+volume.getIndexPath()+"'}",lastError,logger);
>> }
>> public void indexMessage(Email message) throws
>> MessageSearchException {
>> logger.debug("index message {"+message+"}");
>> long s = (new Date()).getTime();
>> if (message == null)
>> throw new MessageSearchException("assertion failure: null
>> message",logger);
>> Document doc = new Document();
>> IndexInfo indexInfo = new IndexInfo(doc);
>> try {
>> DocumentIndex docIndex = new DocumentIndex(indexer);
>> String language = doc.get("lang");
>> if (language==null)
>> language = indexer.getIndexLanguage();
>> docIndex.write(message,doc,indexInfo);
>> queue.put(indexInfo);
>> logger.debug("message indexed successfully
>> {"+message+",language='"+language+"'}");
>> } catch (MessagingException me) {
>> throw new MessageSearchException("failed to decode message
>> during indexing",me,logger, ChainedException.Level.DEBUG);
>> } catch (IOException me) {
>> throw new MessageSearchException("failed to index
>> message"+me.getMessage()+" {"+message+"}",me,logger,
>> ChainedException.Level.DEBUG);
>> } catch (ExtractionException ee)
>> {
>> // we will want to continue indexing
>> //throw new MessageSearchException("failed to decode
>> attachments in message {"+message+"}",ee,logger,
>> ChainedException.Level.DEBUG);
>> } catch (AlreadyClosedException ace) {
>> indexMessage(message);
>> } catch (Throwable e) {
>> throw new MessageSearchException("failed to index
>> message:"+e.getMessage(),e,logger, ChainedException.Level.DEBUG);
>> }
>> logger.debug("indexing message end {"+message+"}");
>> long e = (new Date()).getTime();
>> logger.debug("indexing time {time='"+(e-s)+"'}");
>> }
>> public class IndexProcessor extends Thread {
>> public IndexProcessor() {
>> setName("index processor");
>> }
>> public void run() {
>> boolean exit = false;
>> //ExecutorService documentPool;
>> // we abandoned pool as it does not seem to offer any major
>> performance benefit
>> IndexInfo indexInfo = null;
>> LinkedList<IndexInfo> pushbacks = new LinkedList<IndexInfo>();
>> while (!exit) {
>> try {
>> int maxIndexDocs =
>> Config.getConfig().getIndex().getMaxSimultaneousDocs();
>> //documentPool =
>> Executors.newFixedThreadPool(Config.getConfig().getArchiver().getArchiveThreads());
>> indexInfo = null; indexInfo
>> = (IndexInfo) queue.take();
>> if (indexInfo==EXIT_REQ) {
>> logger.debug("index exit req received.
>> exiting");
>> exit = true;
>> continue;
>> }
>> indexLock.lock();
>> try {
>> openIndex();
>> } catch (Exception e) {
>> logger.error("failed to open
>> index:"+e.getMessage(),e);
>> return;
>> }
>> if (indexInfo==null) {
>> logger.debug("index info is null");
>> }
>> int i = 0;
>> while(indexInfo!=null && i<maxIndexDocs) {
>> try {
>> writer.addDocument(indexInfo.getDocument());
>> } catch (IOException io) {
>> logger.error("failed to add document to
>> index:"+io.getMessage(),io);
>> } catch (AlreadyClosedException e) {
>> pushbacks.add(indexInfo);
>> } finally {
>> indexInfo.cleanup();
>> }
>>
>> //documentPool.execute(new IndexDocument(indexInfo,pushbacks));
>> i++;
>> if (i<maxIndexDocs)
>> {
>> indexInfo = (IndexInfo) queue.poll();
>> if
>> (indexInfo==null) {
>> logger.debug("index info is null");
>> }
>> if
>> (indexInfo==EXIT_REQ) {
>> logger.debug("index exit req
>> received. exiting (2)");
>> exit = true;
>> break;
>> }
>> }
>> }
>> for (IndexInfo pushback :
>> pushbacks) {
>> try {
>> writer.addDocument(pushback.getDocument());
>> } catch (IOException io) {
>> logger.error("failed to add document to
>> index:"+io.getMessage(),io);
>> } catch (AlreadyClosedException e) {
>> pushbacks.add(indexInfo);
>> } finally {
>> indexInfo.cleanup();
>> }
>> //documentPool.execute(new
>> IndexDocument(pushback,pushbacks));
>> i++;
>> }
>> //documentPool.shutdown();
>>
>> //documentPool.awaitTermination(30,TimeUnit.MINUTES);
>> } catch (Throwable ie) {
>> logger.error("index write
>> interrupted:"+ie.getMessage());
>> } finally {
>> closeIndex();
>> indexLock.unlock();
>> }
>> } }
>> public class IndexDocument extends Thread {
>> IndexInfo indexInfo = null;
>> List<IndexInfo> pushbacks = null;
>> public IndexDocument(IndexInfo
>> indexInfo,List<IndexInfo> pushbacks) {
>> this.indexInfo = indexInfo;
>> this.pushbacks = pushbacks;
>> setName("index document");
>> }
>> public void run() {
>> try {
>> writer.addDocument(indexInfo.getDocument());
>> } catch (IOException io) {
>> logger.error("failed to add document to
>> index:"+io.getMessage(),io);
>> } catch (AlreadyClosedException e) {
>> pushbacks.add(indexInfo);
>> }
>> }};
>> }
>> protected void closeIndex() {
>> try {
>> if (writer!=null) {
>> writer.close();
>> logger.debug("writer closed");
>> writer = null;
>> }
>> } catch (Exception io) {
>> logger.error("failed to close index
>> writer:"+io.getMessage(),io);
>> }
>> }
>> public void deleteIndex() throws MessageSearchException {
>> logger.debug("delete index
>> {indexpath='"+volume.getIndexPath()+"'}");
>> try {
>> indexLock.lock();
>> try {
>> int maxIndexChars =
>> Config.getConfig().getIndex().getMaxIndexPerFieldChars();
>> writer = new
>> IndexWriter(FSDirectory.getDirectory(volume.getIndexPath()),analyzer,true,new
>> IndexWriter.MaxFieldLength(maxIndexChars));
>> } catch (Exception cie) {
>> logger.error("failed to delete index
>> {index='"+volume.getIndexPath()+"'}",cie);
>> return;
>> }
>> MessageIndex.volumeIndexes.remove(this);
>> } finally {
>> closeIndex();
>> indexLock.unlock();
>> }
>> }
>> public void startup() {
>> logger.debug("volumeindex is starting up");
>> File lockFile = new
>> File(volume.getIndexPath()+File.separatorChar + "write.lock");
>> if (lockFile.exists()) {
>> logger.warn("The server lock file already exists. Either
>> another indexer is running or the server was not shutdown correctly.");
>> logger.warn("If it is the latter, the lock file must be
>> manually deleted at "+lockFile.getAbsolutePath());
>> if (indexer.getMultipleIndexProcesses()) {
>> logger.debug("index lock file detected on volumeindex
>> startup.");
>> } else {
>> logger.warn("index lock file detected. the server was
>> shutdown incorrectly. automatically deleting lock file.");
>> logger.warn("indexer is configured to deal with only one
>> indexer process.");
>> logger.warn("if you are running more than one indexer,
>> your index could be subject to corruption.");
>> lockFile.delete();
>> }
>> }
>> indexProcessor = new IndexProcessor();
>> indexProcessor.start();
>> Runtime.getRuntime().addShutdownHook(this);
>> }
>> public void shutdown() {
>> logger.debug("volumeindex is shutting down");
>> queue.add(EXIT_REQ);
>> scheduler.shutdownNow();
>> }
>> @Override
>> public void run() {
>> queue.add(EXIT_REQ);
>> }
>> }
>>
>>
>>
>> Is it possible a large merge is running? By default IW.close waits
>> for outstanding merges to complete. Can you post the stacktrace?
>>
>> Mike
>>
>> On Thu, Oct 8, 2009 at 5:22 PM, Jamie Band <ja...@stimulussoft.com> wrote:
>>>
>>> Hi All
>>>
>>> I have a long running situation where our indexing thread is getting
>>> stuck
>>> indefinitely in IndexWriter's close method. Yourkit shows the thread to
>>> be
>>> stuck in TIME_WAITING. Any idea's on what could be causing this?
>>> Could it be one of the streams or readers we passed to the document?
>>>
>>> I am running Lucene 2.9.0.
>>>
>>> Many thanks in advance
>>>
>>> Jamie
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: java-user-unsubscribe@lucene.apache.org
> For additional commands, e-mail: java-user-help@lucene.apache.org
>
>
---------------------------------------------------------------------
To unsubscribe, e-mail: java-user-unsubscribe@lucene.apache.org
For additional commands, e-mail: java-user-help@lucene.apache.org