You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2015/11/02 16:29:00 UTC

svn commit: r1712045 - in /lucene/dev/trunk/solr: CHANGES.txt solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java

Author: yonik
Date: Mon Nov  2 15:29:00 2015
New Revision: 1712045

URL: http://svn.apache.org/viewvc?rev=1712045&view=rev
Log:
SOLR-6406: fix race/hang in ConcurrentUpdateSolrClient.blockUntilFinished when executor service is shut down

Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1712045&r1=1712044&r2=1712045&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Mon Nov  2 15:29:00 2015
@@ -312,6 +312,11 @@ Bug Fixes
   be aborted.  This can cause big update reorders that can cause replicas to
   get out of sync. (Mark Miller, yonik)
 
+* SOLR-6406: ConcurrentUpdateSolrClient hang in blockUntilFinished. If updates are still
+  flowing and shutdown is called on the executor service used by ConcurrentUpdateSolrClient,
+  a race condition can cause that client to hang in blockUntilFinished.
+  (Mark Miller, yonik)
+
 
 Optimizations
 ----------------------

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java?rev=1712045&r1=1712044&r2=1712045&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java Mon Nov  2 15:29:00 2015
@@ -148,157 +148,185 @@ public class ConcurrentUpdateSolrClient
    * Opens a connection and sends everything...
    */
   class Runner implements Runnable {
-    final Lock runnerLock = new ReentrantLock();
-
     @Override
     public void run() {
-      runnerLock.lock();
-
       log.debug("starting runner: {}", this);
-      HttpPost method = null;
-      HttpResponse response = null;
-      try {
-        while (!queue.isEmpty()) {
-          try {
-            final UpdateRequest updateRequest = 
-                queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
-            if (updateRequest == null)
+
+      // This loop is so we can continue if an element was added to the queue after the last runner exited.
+      for (;;) {
+
+        try {
+
+          sendUpdateStream();
+
+        } catch (Throwable e) {
+          if (e instanceof OutOfMemoryError) {
+            throw (OutOfMemoryError) e;
+          }
+          handleError(e);
+        } finally {
+
+          synchronized (runners) {
+            // check to see if anything else was added to the queue
+            if (runners.size() == 1 && !queue.isEmpty() && !scheduler.isShutdown()) {
+              // If there is something else to process, keep last runner alive by staying in the loop.
+            } else {
+              runners.remove(this);
+              if (runners.isEmpty()) {
+                // notify anyone waiting in blockUntilFinished
+                runners.notifyAll();
+              }
               break;
-                       
-            String contentType = client.requestWriter.getUpdateContentType();
-            final boolean isXml = ClientUtils.TEXT_XML.equals(contentType);
+            }
+          }
+
+        }
+      }
 
-            final ModifiableSolrParams origParams = new ModifiableSolrParams(updateRequest.getParams());
+      log.debug("finished: {}", this);
+    }
 
-            EntityTemplate template = new EntityTemplate(new ContentProducer() {
+    //
+    // Pull from the queue multiple times and streams over a single connection.
+    // Exits on exception, interruption, or an empty queue to pull from.
+    //
+    void sendUpdateStream() throws Exception {
+      while (!queue.isEmpty()) {
+        HttpPost method = null;
+        HttpResponse response = null;
+
+        try {
+          final UpdateRequest updateRequest =
+              queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
+          if (updateRequest == null)
+            break;
 
-              @Override
-              public void writeTo(OutputStream out) throws IOException {
-                try {
-                  if (isXml) {
-                    out.write("<stream>".getBytes(StandardCharsets.UTF_8)); // can be anything
-                  }                                    
-                  UpdateRequest req = updateRequest;
-                  while (req != null) {                                        
-                    SolrParams currentParams = new ModifiableSolrParams(req.getParams());
-                    if (!origParams.toNamedList().equals(currentParams.toNamedList())) {
-                      queue.add(req); // params are different, push back to queue
-                      break;
-                    }
-                    
-                    client.requestWriter.write(req, out);
-                    if (isXml) {
-                      // check for commit or optimize
-                      SolrParams params = req.getParams();
-                      if (params != null) {
-                        String fmt = null;
-                        if (params.getBool(UpdateParams.OPTIMIZE, false)) {
-                          fmt = "<optimize waitSearcher=\"%s\" />";
-                        } else if (params.getBool(UpdateParams.COMMIT, false)) {
-                          fmt = "<commit waitSearcher=\"%s\" />";
-                        }
-                        if (fmt != null) {
-                          byte[] content = String.format(Locale.ROOT,
-                              fmt,
-                              params.getBool(UpdateParams.WAIT_SEARCHER, false)
-                                  + "").getBytes(StandardCharsets.UTF_8);
-                          out.write(content);
-                        }
-                      }
-                    }
-                    out.flush();
+          String contentType = client.requestWriter.getUpdateContentType();
+          final boolean isXml = ClientUtils.TEXT_XML.equals(contentType);
 
-                    if (pollQueueTime > 0 && threadCount == 1 && req.isLastDocInBatch()) {
-                      // no need to wait to see another doc in the queue if we've hit the last doc in a batch
-                      req = queue.poll(0, TimeUnit.MILLISECONDS);
-                    } else {
-                      req = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
-                    }
+          final ModifiableSolrParams origParams = new ModifiableSolrParams(updateRequest.getParams());
+
+          EntityTemplate template = new EntityTemplate(new ContentProducer() {
 
+            @Override
+            public void writeTo(OutputStream out) throws IOException {
+              try {
+                if (isXml) {
+                  out.write("<stream>".getBytes(StandardCharsets.UTF_8)); // can be anything
+                }
+                UpdateRequest req = updateRequest;
+                while (req != null) {
+                  SolrParams currentParams = new ModifiableSolrParams(req.getParams());
+                  if (!origParams.toNamedList().equals(currentParams.toNamedList())) {
+                    queue.add(req); // params are different, push back to queue
+                    break;
                   }
-                  
+
+                  client.requestWriter.write(req, out);
                   if (isXml) {
-                    out.write("</stream>".getBytes(StandardCharsets.UTF_8));
+                    // check for commit or optimize
+                    SolrParams params = req.getParams();
+                    if (params != null) {
+                      String fmt = null;
+                      if (params.getBool(UpdateParams.OPTIMIZE, false)) {
+                        fmt = "<optimize waitSearcher=\"%s\" />";
+                      } else if (params.getBool(UpdateParams.COMMIT, false)) {
+                        fmt = "<commit waitSearcher=\"%s\" />";
+                      }
+                      if (fmt != null) {
+                        byte[] content = String.format(Locale.ROOT,
+                            fmt,
+                            params.getBool(UpdateParams.WAIT_SEARCHER, false)
+                                + "").getBytes(StandardCharsets.UTF_8);
+                        out.write(content);
+                      }
+                    }
+                  }
+                  out.flush();
+
+                  if (pollQueueTime > 0 && threadCount == 1 && req.isLastDocInBatch()) {
+                    // no need to wait to see another doc in the queue if we've hit the last doc in a batch
+                    req = queue.poll(0, TimeUnit.MILLISECONDS);
+                  } else {
+                    req = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
                   }
 
-                } catch (InterruptedException e) {
-                  Thread.currentThread().interrupt();
-                  log.warn("", e);
                 }
-              }
-            });
-            
-            // The parser 'wt=' and 'version=' params are used instead of the
-            // original params
-            ModifiableSolrParams requestParams = new ModifiableSolrParams(origParams);
-            requestParams.set(CommonParams.WT, client.parser.getWriterType());
-            requestParams.set(CommonParams.VERSION, client.parser.getVersion());
-
-            method = new HttpPost(client.getBaseURL() + "/update"
-                + ClientUtils.toQueryString(requestParams, false));
-            method.setEntity(template);
-            method.addHeader("User-Agent", HttpSolrClient.AGENT);
-            method.addHeader("Content-Type", contentType);
-                        
-            response = client.getHttpClient().execute(method);
-            int statusCode = response.getStatusLine().getStatusCode();
-            if (statusCode != HttpStatus.SC_OK) {
-              StringBuilder msg = new StringBuilder();
-              msg.append(response.getStatusLine().getReasonPhrase());
-              msg.append("\n\n\n\n");
-              msg.append("request: ").append(method.getURI());
 
-              SolrException solrExc = new SolrException(ErrorCode.getErrorCode(statusCode), msg.toString());
-              // parse out the metadata from the SolrException
-              try {
-                NamedList<Object> resp =
-                    client.parser.processResponse(response.getEntity().getContent(),
-                        response.getEntity().getContentType().getValue());
-                NamedList<Object> error = (NamedList<Object>) resp.get("error");
-                if (error != null)
-                  solrExc.setMetadata((NamedList<String>) error.get("metadata"));
-              } catch (Exception exc) {
-                // don't want to fail to report error if parsing the response fails
-                log.warn("Failed to parse error response from "+ client.getBaseURL()+" due to: "+exc);
-              }
+                if (isXml) {
+                  out.write("</stream>".getBytes(StandardCharsets.UTF_8));
+                }
 
-              handleError(solrExc);
-            } else {
-              onSuccess(response);
+              } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                log.warn("", e);
+              }
             }
-          } finally {
+          });
+
+          // The parser 'wt=' and 'version=' params are used instead of the
+          // original params
+          ModifiableSolrParams requestParams = new ModifiableSolrParams(origParams);
+          requestParams.set(CommonParams.WT, client.parser.getWriterType());
+          requestParams.set(CommonParams.VERSION, client.parser.getVersion());
+
+          method = new HttpPost(client.getBaseURL() + "/update"
+              + ClientUtils.toQueryString(requestParams, false));
+          method.setEntity(template);
+          method.addHeader("User-Agent", HttpSolrClient.AGENT);
+          method.addHeader("Content-Type", contentType);
+
+          response = client.getHttpClient().execute(method);
+          int statusCode = response.getStatusLine().getStatusCode();
+          if (statusCode != HttpStatus.SC_OK) {
+            StringBuilder msg = new StringBuilder();
+            msg.append(response.getStatusLine().getReasonPhrase());
+            msg.append("\n\n\n\n");
+            msg.append("request: ").append(method.getURI());
+
+            SolrException solrExc = new SolrException(ErrorCode.getErrorCode(statusCode), msg.toString());
+            // parse out the metadata from the SolrException
             try {
-              if (response != null) {
-                response.getEntity().getContent().close();
-              }
-            } catch (Exception ex) {
-              log.warn("", ex);
+              NamedList<Object> resp =
+                  client.parser.processResponse(response.getEntity().getContent(),
+                      response.getEntity().getContentType().getValue());
+              NamedList<Object> error = (NamedList<Object>) resp.get("error");
+              if (error != null)
+                solrExc.setMetadata((NamedList<String>) error.get("metadata"));
+            } catch (Exception exc) {
+              // don't want to fail to report error if parsing the response fails
+              log.warn("Failed to parse error response from " + client.getBaseURL() + " due to: " + exc);
             }
-          }
-        }
-      } catch (Throwable e) {
-        if (e instanceof OutOfMemoryError) {
-          throw (OutOfMemoryError) e;
-        }
-        handleError(e);
-      } finally {
-        synchronized (runners) {
-          if (runners.size() == 1 && !queue.isEmpty()) {
-            // keep this runner alive
-            scheduler.execute(this);
+
+            handleError(solrExc);
           } else {
-            runners.remove(this);
-            if (runners.isEmpty())
-              runners.notifyAll();
+            onSuccess(response);
+          }
+        } finally {
+          try {
+            if (response != null) {
+              response.getEntity().getContent().close();
+            }
+          } catch (Exception ex) {
+            log.warn("", ex);
           }
         }
-
-        log.debug("finished: {}", this);
-        runnerLock.unlock();
       }
     }
   }
 
+  // *must* be called with runners monitor held, e.g. synchronized(runners){ addRunner() }
+  private void addRunner() {
+    MDC.put("ConcurrentUpdateSolrClient.url", client.getBaseURL());
+    try {
+      Runner r = new Runner();
+      runners.add(r);
+      scheduler.execute(r);  // this can throw an exception if the scheduler has been shutdown, but that should be fine.
+    } finally {
+      MDC.remove("ConcurrentUpdateSolrClient.url");
+    }
+  }
+
   @Override
   public NamedList<Object> request(final SolrRequest request, String collection)
       throws SolrServerException, IOException {
@@ -351,14 +379,7 @@ public class ConcurrentUpdateSolrClient
           if (runners.isEmpty() || (queue.remainingCapacity() < queue.size() && runners.size() < threadCount))
           {
             // We need more runners, so start a new one.
-            MDC.put("ConcurrentUpdateSolrClient.url", client.getBaseURL());
-            try {
-              Runner r = new Runner();
-              runners.add(r);
-              scheduler.execute(r);
-            } finally {
-              MDC.remove("ConcurrentUpdateSolrClient.url");
-            }
+            addRunner();
           } else {
             // break out of the retry loop if we added the element to the queue
             // successfully, *and*
@@ -399,30 +420,27 @@ public class ConcurrentUpdateSolrClient
     lock = new CountDownLatch(1);
     try {
       synchronized (runners) {
+
+        // NOTE: if the executor is shut down, runners may never become empty (a scheduled task may never be run,
+        // which means it would never remove itself from the runners list.  This is why we don't wait forever
+        // and periodically check if the scheduler is shutting down.
         while (!runners.isEmpty()) {
           try {
-            runners.wait();
+            runners.wait(250);
           } catch (InterruptedException e) {
             Thread.interrupted();
           }
           
-          if (scheduler.isTerminated())
+          if (scheduler.isShutdown())
             break;
                       
-          // if we reach here, then we probably got the notifyAll, but need to check if
-          // the queue is empty before really considering this is finished (SOLR-4260)
+          // Need to check if the queue is empty before really considering this is finished (SOLR-4260)
           int queueSize = queue.size();
-          if (queueSize > 0) {
+          if (queueSize > 0 && runners.isEmpty()) {
+            // TODO: can this still happen?
             log.warn("No more runners, but queue still has "+
               queueSize+" adding more runners to process remaining requests on queue");
-            MDC.put("ConcurrentUpdateSolrClient.url", client.getBaseURL());
-            try {
-              Runner r = new Runner();
-              runners.add(r);
-              scheduler.execute(r);
-            } finally {
-              MDC.remove("ConcurrentUpdateSolrClient.url");
-            }
+            addRunner();
           }
         }
       }