You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2011/02/15 22:37:43 UTC
svn commit: r1071078 - in /lucene/dev/branches/branch_3x: ./ lucene/ solr/
solr/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java
Author: yonik
Date: Tue Feb 15 21:37:43 2011
New Revision: 1071078
URL: http://svn.apache.org/viewvc?rev=1071078&view=rev
Log:
SOLR-1711: fix SUSS deadlock
Modified:
lucene/dev/branches/branch_3x/ (props changed)
lucene/dev/branches/branch_3x/lucene/ (props changed)
lucene/dev/branches/branch_3x/solr/ (props changed)
lucene/dev/branches/branch_3x/solr/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java
Modified: lucene/dev/branches/branch_3x/solr/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_3x/solr/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java?rev=1071078&r1=1071077&r2=1071078&view=diff
==============================================================================
--- lucene/dev/branches/branch_3x/solr/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java (original)
+++ lucene/dev/branches/branch_3x/solr/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java Tue Feb 15 21:37:43 2011
@@ -178,6 +178,8 @@ public class StreamingUpdateSolrServer e
// remove it from the list of running things unless we are the last runner and the queue is full...
// in which case, the next queue.put() would block and there would be no runners to handle it.
+ // This case has been further handled by using offer instead of put, and using a retry loop
+ // to avoid blocking forever (see request()).
synchronized (runners) {
if (runners.size() == 1 && queue.remainingCapacity() == 0) {
// keep this runner alive
@@ -223,18 +225,40 @@ public class StreamingUpdateSolrServer e
tmpLock.await();
}
- queue.put( req );
+ boolean success = queue.offer(req);
- synchronized( runners ) {
- if( runners.isEmpty()
- || (queue.remainingCapacity() < queue.size()
- && runners.size() < threadCount) )
- {
- Runner r = new Runner();
- scheduler.execute( r );
- runners.add( r );
+ for(;;) {
+ synchronized( runners ) {
+ if( runners.isEmpty()
+ || (queue.remainingCapacity() < queue.size() // queue is half full and we can add more runners
+ && runners.size() < threadCount) )
+ {
+ // We need more runners, so start a new one.
+ Runner r = new Runner();
+ runners.add( r );
+ scheduler.execute( r );
+ } else {
+ // break out of the retry loop if we added the element to the queue successfully, *and*
+ // while we are still holding the runners lock to prevent race conditions.
+ // race conditions.
+ if (success) break;
+ }
+ }
+
+ // Retry to add to the queue w/o the runners lock held (else we risk temporary deadlock)
+ // This retry could also fail because
+ // 1) existing runners were not able to take off any new elements in the queue
+ // 2) the queue was filled back up since our last try
+ // If we succeed, the queue may have been completely emptied, and all runners stopped.
+ // In all cases, we should loop back to the top to see if we need to start more runners.
+ //
+ if (!success) {
+ success = queue.offer(req, 100, TimeUnit.MILLISECONDS);
}
+
}
+
+
}
catch (InterruptedException e) {
log.error( "interrupted", e );