You are viewing a plain text version of this content. The canonical link for it is here.
Posted to solr-commits@lucene.apache.org by yo...@apache.org on 2009/11/06 02:55:40 UTC

svn commit: r833272 - /lucene/solr/trunk/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java

Author: yonik
Date: Fri Nov  6 01:55:39 2009
New Revision: 833272

URL: http://svn.apache.org/viewvc?rev=833272&view=rev
Log:
SOLR-1543: fix thread hangs

Modified:
    lucene/solr/trunk/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java

Modified: lucene/solr/trunk/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java
URL: http://svn.apache.org/viewvc/lucene/solr/trunk/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java?rev=833272&r1=833271&r2=833272&view=diff
==============================================================================
--- lucene/solr/trunk/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java (original)
+++ lucene/solr/trunk/src/solrj/org/apache/solr/client/solrj/impl/StreamingUpdateSolrServer.java Fri Nov  6 01:55:39 2009
@@ -23,11 +23,7 @@
 import java.net.MalformedURLException;
 import java.util.LinkedList;
 import java.util.Queue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -63,8 +59,8 @@
   final ExecutorService scheduler = Executors.newCachedThreadPool();
   final String updateUrl = "/update";
   final Queue<Runner> runners;
-  Lock lock = null;  // used to block everything
-  int threadCount = 1;
+  volatile CountDownLatch lock = null;  // used to block everything
+  final int threadCount;
   
   public StreamingUpdateSolrServer(String solrServerUrl, int queueSize, int threadCount ) throws MalformedURLException  {
     super( solrServerUrl );
@@ -77,10 +73,10 @@
    * Opens a connection and sends everything...
    */
   class Runner implements Runnable {
-    final Lock lock = new ReentrantLock();
-    
+    final Lock runnerLock = new ReentrantLock();
+
     public void run() {
-      lock.lock();
+      runnerLock.lock();
 
       // info is ok since this should only happen once for each thread
       log.info( "starting runner: {}" , this );
@@ -162,7 +158,7 @@
           runners.remove( this );
         }
         log.info( "finished: {}" , this );
-        lock.unlock();
+        runnerLock.unlock();
       }
     }
   }
@@ -191,11 +187,12 @@
       }
     }
     
-    
-    if( lock != null ) {
-      lock.lock();  // keep it from adding new commands while we block
-    }
     try {
+      CountDownLatch tmpLock = lock;
+      if( tmpLock != null ) {
+        tmpLock.await();
+      }
+
       queue.put( req );
       
       if( runners.isEmpty() 
@@ -213,34 +210,28 @@
       log.error( "interuped", e );
       throw new IOException( e.getLocalizedMessage() );
     }
-    finally {
-      if( lock != null ) {
-        lock.unlock();
-      }
-    }
     
     // RETURN A DUMMY result
     NamedList<Object> dummy = new NamedList<Object>();
     dummy.add( "NOTE", "the request is processed in a background stream" );
     return dummy;
   }
-  
+
   public synchronized void blockUntilFinished()
   {
-    if( lock == null ) {
-      lock = new ReentrantLock();
-    }
-    lock.lock();
-
-    // Wait until no runners are running
-    Runner runner = runners.peek();
-    while( runner != null ) {
-      runner.lock.lock();
-      runner.lock.unlock();
-      runner = runners.peek();
+    lock = new CountDownLatch(1);
+    try {
+      // Wait until no runners are running
+      Runner runner = runners.peek();
+      while( runner != null ) {
+        runner.runnerLock.lock();
+        runner.runnerLock.unlock();
+        runner = runners.peek();
+      }
+    } finally {
+      lock.countDown();
+      lock=null;
     }
-    lock.unlock();
-    lock = null;
   }
   
   public void handleError( Throwable ex )