You are viewing a plain text version of this content. The canonical link for it is here.
Posted to solr-commits@lucene.apache.org by yo...@apache.org on 2009/11/06 02:55:40 UTC
svn commit: r833272 -
/lucene/solr/trunk/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java
Author: yonik
Date: Fri Nov 6 01:55:39 2009
New Revision: 833272
URL: http://svn.apache.org/viewvc?rev=833272&view=rev
Log:
SOLR-1543: fix thread hangs
Modified:
lucene/solr/trunk/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java
Modified: lucene/solr/trunk/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java?rev=833272&r1=833271&r2=833272&view=diff
==============================================================================
--- lucene/solr/trunk/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java (original)
+++ lucene/solr/trunk/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java Fri Nov 6 01:55:39 2009
@@ -23,11 +23,7 @@
import java.net.MalformedURLException;
import java.util.LinkedList;
import java.util.Queue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -63,8 +59,8 @@
final ExecutorService scheduler = Executors.newCachedThreadPool();
final String updateUrl = "/update";
final Queue<Runner> runners;
- Lock lock = null; // used to block everything
- int threadCount = 1;
+ volatile CountDownLatch lock = null; // used to block everything
+ final int threadCount;
public StreamingUpdateSolrServer(String solrServerUrl, int queueSize, int threadCount ) throws MalformedURLException {
super( solrServerUrl );
@@ -77,10 +73,10 @@
* Opens a connection and sends everything...
*/
class Runner implements Runnable {
- final Lock lock = new ReentrantLock();
-
+ final Lock runnerLock = new ReentrantLock();
+
public void run() {
- lock.lock();
+ runnerLock.lock();
// info is ok since this should only happen once for each thread
log.info( "starting runner: {}" , this );
@@ -162,7 +158,7 @@
runners.remove( this );
}
log.info( "finished: {}" , this );
- lock.unlock();
+ runnerLock.unlock();
}
}
}
@@ -191,11 +187,12 @@
}
}
-
- if( lock != null ) {
- lock.lock(); // keep it from adding new commands while we block
- }
try {
+ CountDownLatch tmpLock = lock;
+ if( tmpLock != null ) {
+ tmpLock.await();
+ }
+
queue.put( req );
if( runners.isEmpty()
@@ -213,34 +210,28 @@
log.error( "interuped", e );
throw new IOException( e.getLocalizedMessage() );
}
- finally {
- if( lock != null ) {
- lock.unlock();
- }
- }
// RETURN A DUMMY result
NamedList<Object> dummy = new NamedList<Object>();
dummy.add( "NOTE", "the request is processed in a background stream" );
return dummy;
}
-
+
public synchronized void blockUntilFinished()
{
- if( lock == null ) {
- lock = new ReentrantLock();
- }
- lock.lock();
-
- // Wait until no runners are running
- Runner runner = runners.peek();
- while( runner != null ) {
- runner.lock.lock();
- runner.lock.unlock();
- runner = runners.peek();
+ lock = new CountDownLatch(1);
+ try {
+ // Wait until no runners are running
+ Runner runner = runners.peek();
+ while( runner != null ) {
+ runner.runnerLock.lock();
+ runner.runnerLock.unlock();
+ runner = runners.peek();
+ }
+ } finally {
+ lock.countDown();
+ lock=null;
}
- lock.unlock();
- lock = null;
}
public void handleError( Throwable ex )