You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2015/11/02 16:29:00 UTC
svn commit: r1712045 - in /lucene/dev/trunk/solr: CHANGES.txt
solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
Author: yonik
Date: Mon Nov 2 15:29:00 2015
New Revision: 1712045
URL: http://svn.apache.org/viewvc?rev=1712045&view=rev
Log:
SOLR-6406: fix race/hang in ConcurrentUpdateSolrClient.blockUntilFinished when executor service is shut down
Modified:
lucene/dev/trunk/solr/CHANGES.txt
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1712045&r1=1712044&r2=1712045&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Mon Nov 2 15:29:00 2015
@@ -312,6 +312,11 @@ Bug Fixes
be aborted. This can cause big update reorders that can cause replicas to
get out of sync. (Mark Miller, yonik)
+* SOLR-6406: ConcurrentUpdateSolrClient hang in blockUntilFinished. If updates are still
+ flowing and shutdown is called on the executor service used by ConcurrentUpdateSolrClient,
+ a race condition can cause that client to hang in blockUntilFinished.
+ (Mark Miller, yonik)
+
Optimizations
----------------------
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java?rev=1712045&r1=1712044&r2=1712045&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java Mon Nov 2 15:29:00 2015
@@ -148,157 +148,185 @@ public class ConcurrentUpdateSolrClient
* Opens a connection and sends everything...
*/
class Runner implements Runnable {
- final Lock runnerLock = new ReentrantLock();
-
@Override
public void run() {
- runnerLock.lock();
-
log.debug("starting runner: {}", this);
- HttpPost method = null;
- HttpResponse response = null;
- try {
- while (!queue.isEmpty()) {
- try {
- final UpdateRequest updateRequest =
- queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
- if (updateRequest == null)
+
+ // This loop is so we can continue if an element was added to the queue after the last runner exited.
+ for (;;) {
+
+ try {
+
+ sendUpdateStream();
+
+ } catch (Throwable e) {
+ if (e instanceof OutOfMemoryError) {
+ throw (OutOfMemoryError) e;
+ }
+ handleError(e);
+ } finally {
+
+ synchronized (runners) {
+ // check to see if anything else was added to the queue
+ if (runners.size() == 1 && !queue.isEmpty() && !scheduler.isShutdown()) {
+ // If there is something else to process, keep last runner alive by staying in the loop.
+ } else {
+ runners.remove(this);
+ if (runners.isEmpty()) {
+ // notify anyone waiting in blockUntilFinished
+ runners.notifyAll();
+ }
break;
-
- String contentType = client.requestWriter.getUpdateContentType();
- final boolean isXml = ClientUtils.TEXT_XML.equals(contentType);
+ }
+ }
+
+ }
+ }
- final ModifiableSolrParams origParams = new ModifiableSolrParams(updateRequest.getParams());
+ log.debug("finished: {}", this);
+ }
- EntityTemplate template = new EntityTemplate(new ContentProducer() {
+ //
+ // Pull from the queue multiple times and streams over a single connection.
+ // Exits on exception, interruption, or an empty queue to pull from.
+ //
+ void sendUpdateStream() throws Exception {
+ while (!queue.isEmpty()) {
+ HttpPost method = null;
+ HttpResponse response = null;
+
+ try {
+ final UpdateRequest updateRequest =
+ queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
+ if (updateRequest == null)
+ break;
- @Override
- public void writeTo(OutputStream out) throws IOException {
- try {
- if (isXml) {
- out.write("<stream>".getBytes(StandardCharsets.UTF_8)); // can be anything
- }
- UpdateRequest req = updateRequest;
- while (req != null) {
- SolrParams currentParams = new ModifiableSolrParams(req.getParams());
- if (!origParams.toNamedList().equals(currentParams.toNamedList())) {
- queue.add(req); // params are different, push back to queue
- break;
- }
-
- client.requestWriter.write(req, out);
- if (isXml) {
- // check for commit or optimize
- SolrParams params = req.getParams();
- if (params != null) {
- String fmt = null;
- if (params.getBool(UpdateParams.OPTIMIZE, false)) {
- fmt = "<optimize waitSearcher=\"%s\" />";
- } else if (params.getBool(UpdateParams.COMMIT, false)) {
- fmt = "<commit waitSearcher=\"%s\" />";
- }
- if (fmt != null) {
- byte[] content = String.format(Locale.ROOT,
- fmt,
- params.getBool(UpdateParams.WAIT_SEARCHER, false)
- + "").getBytes(StandardCharsets.UTF_8);
- out.write(content);
- }
- }
- }
- out.flush();
+ String contentType = client.requestWriter.getUpdateContentType();
+ final boolean isXml = ClientUtils.TEXT_XML.equals(contentType);
- if (pollQueueTime > 0 && threadCount == 1 && req.isLastDocInBatch()) {
- // no need to wait to see another doc in the queue if we've hit the last doc in a batch
- req = queue.poll(0, TimeUnit.MILLISECONDS);
- } else {
- req = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
- }
+ final ModifiableSolrParams origParams = new ModifiableSolrParams(updateRequest.getParams());
+
+ EntityTemplate template = new EntityTemplate(new ContentProducer() {
+ @Override
+ public void writeTo(OutputStream out) throws IOException {
+ try {
+ if (isXml) {
+ out.write("<stream>".getBytes(StandardCharsets.UTF_8)); // can be anything
+ }
+ UpdateRequest req = updateRequest;
+ while (req != null) {
+ SolrParams currentParams = new ModifiableSolrParams(req.getParams());
+ if (!origParams.toNamedList().equals(currentParams.toNamedList())) {
+ queue.add(req); // params are different, push back to queue
+ break;
}
-
+
+ client.requestWriter.write(req, out);
if (isXml) {
- out.write("</stream>".getBytes(StandardCharsets.UTF_8));
+ // check for commit or optimize
+ SolrParams params = req.getParams();
+ if (params != null) {
+ String fmt = null;
+ if (params.getBool(UpdateParams.OPTIMIZE, false)) {
+ fmt = "<optimize waitSearcher=\"%s\" />";
+ } else if (params.getBool(UpdateParams.COMMIT, false)) {
+ fmt = "<commit waitSearcher=\"%s\" />";
+ }
+ if (fmt != null) {
+ byte[] content = String.format(Locale.ROOT,
+ fmt,
+ params.getBool(UpdateParams.WAIT_SEARCHER, false)
+ + "").getBytes(StandardCharsets.UTF_8);
+ out.write(content);
+ }
+ }
+ }
+ out.flush();
+
+ if (pollQueueTime > 0 && threadCount == 1 && req.isLastDocInBatch()) {
+ // no need to wait to see another doc in the queue if we've hit the last doc in a batch
+ req = queue.poll(0, TimeUnit.MILLISECONDS);
+ } else {
+ req = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.warn("", e);
}
- }
- });
-
- // The parser 'wt=' and 'version=' params are used instead of the
- // original params
- ModifiableSolrParams requestParams = new ModifiableSolrParams(origParams);
- requestParams.set(CommonParams.WT, client.parser.getWriterType());
- requestParams.set(CommonParams.VERSION, client.parser.getVersion());
-
- method = new HttpPost(client.getBaseURL() + "/update"
- + ClientUtils.toQueryString(requestParams, false));
- method.setEntity(template);
- method.addHeader("User-Agent", HttpSolrClient.AGENT);
- method.addHeader("Content-Type", contentType);
-
- response = client.getHttpClient().execute(method);
- int statusCode = response.getStatusLine().getStatusCode();
- if (statusCode != HttpStatus.SC_OK) {
- StringBuilder msg = new StringBuilder();
- msg.append(response.getStatusLine().getReasonPhrase());
- msg.append("\n\n\n\n");
- msg.append("request: ").append(method.getURI());
- SolrException solrExc = new SolrException(ErrorCode.getErrorCode(statusCode), msg.toString());
- // parse out the metadata from the SolrException
- try {
- NamedList<Object> resp =
- client.parser.processResponse(response.getEntity().getContent(),
- response.getEntity().getContentType().getValue());
- NamedList<Object> error = (NamedList<Object>) resp.get("error");
- if (error != null)
- solrExc.setMetadata((NamedList<String>) error.get("metadata"));
- } catch (Exception exc) {
- // don't want to fail to report error if parsing the response fails
- log.warn("Failed to parse error response from "+ client.getBaseURL()+" due to: "+exc);
- }
+ if (isXml) {
+ out.write("</stream>".getBytes(StandardCharsets.UTF_8));
+ }
- handleError(solrExc);
- } else {
- onSuccess(response);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.warn("", e);
+ }
}
- } finally {
+ });
+
+ // The parser 'wt=' and 'version=' params are used instead of the
+ // original params
+ ModifiableSolrParams requestParams = new ModifiableSolrParams(origParams);
+ requestParams.set(CommonParams.WT, client.parser.getWriterType());
+ requestParams.set(CommonParams.VERSION, client.parser.getVersion());
+
+ method = new HttpPost(client.getBaseURL() + "/update"
+ + ClientUtils.toQueryString(requestParams, false));
+ method.setEntity(template);
+ method.addHeader("User-Agent", HttpSolrClient.AGENT);
+ method.addHeader("Content-Type", contentType);
+
+ response = client.getHttpClient().execute(method);
+ int statusCode = response.getStatusLine().getStatusCode();
+ if (statusCode != HttpStatus.SC_OK) {
+ StringBuilder msg = new StringBuilder();
+ msg.append(response.getStatusLine().getReasonPhrase());
+ msg.append("\n\n\n\n");
+ msg.append("request: ").append(method.getURI());
+
+ SolrException solrExc = new SolrException(ErrorCode.getErrorCode(statusCode), msg.toString());
+ // parse out the metadata from the SolrException
try {
- if (response != null) {
- response.getEntity().getContent().close();
- }
- } catch (Exception ex) {
- log.warn("", ex);
+ NamedList<Object> resp =
+ client.parser.processResponse(response.getEntity().getContent(),
+ response.getEntity().getContentType().getValue());
+ NamedList<Object> error = (NamedList<Object>) resp.get("error");
+ if (error != null)
+ solrExc.setMetadata((NamedList<String>) error.get("metadata"));
+ } catch (Exception exc) {
+ // don't want to fail to report error if parsing the response fails
+ log.warn("Failed to parse error response from " + client.getBaseURL() + " due to: " + exc);
}
- }
- }
- } catch (Throwable e) {
- if (e instanceof OutOfMemoryError) {
- throw (OutOfMemoryError) e;
- }
- handleError(e);
- } finally {
- synchronized (runners) {
- if (runners.size() == 1 && !queue.isEmpty()) {
- // keep this runner alive
- scheduler.execute(this);
+
+ handleError(solrExc);
} else {
- runners.remove(this);
- if (runners.isEmpty())
- runners.notifyAll();
+ onSuccess(response);
+ }
+ } finally {
+ try {
+ if (response != null) {
+ response.getEntity().getContent().close();
+ }
+ } catch (Exception ex) {
+ log.warn("", ex);
}
}
-
- log.debug("finished: {}", this);
- runnerLock.unlock();
}
}
}
+ // *must* be called with runners monitor held, e.g. synchronized(runners){ addRunner() }
+ private void addRunner() {
+ MDC.put("ConcurrentUpdateSolrClient.url", client.getBaseURL());
+ try {
+ Runner r = new Runner();
+ runners.add(r);
+ scheduler.execute(r); // this can throw an exception if the scheduler has been shutdown, but that should be fine.
+ } finally {
+ MDC.remove("ConcurrentUpdateSolrClient.url");
+ }
+ }
+
@Override
public NamedList<Object> request(final SolrRequest request, String collection)
throws SolrServerException, IOException {
@@ -351,14 +379,7 @@ public class ConcurrentUpdateSolrClient
if (runners.isEmpty() || (queue.remainingCapacity() < queue.size() && runners.size() < threadCount))
{
// We need more runners, so start a new one.
- MDC.put("ConcurrentUpdateSolrClient.url", client.getBaseURL());
- try {
- Runner r = new Runner();
- runners.add(r);
- scheduler.execute(r);
- } finally {
- MDC.remove("ConcurrentUpdateSolrClient.url");
- }
+ addRunner();
} else {
// break out of the retry loop if we added the element to the queue
// successfully, *and*
@@ -399,30 +420,27 @@ public class ConcurrentUpdateSolrClient
lock = new CountDownLatch(1);
try {
synchronized (runners) {
+
+ // NOTE: if the executor is shut down, runners may never become empty (a scheduled task may never be run,
+ // which means it would never remove itself from the runners list. This is why we don't wait forever
+ // and periodically check if the scheduler is shutting down.
while (!runners.isEmpty()) {
try {
- runners.wait();
+ runners.wait(250);
} catch (InterruptedException e) {
Thread.interrupted();
}
- if (scheduler.isTerminated())
+ if (scheduler.isShutdown())
break;
- // if we reach here, then we probably got the notifyAll, but need to check if
- // the queue is empty before really considering this is finished (SOLR-4260)
+ // Need to check if the queue is empty before really considering this is finished (SOLR-4260)
int queueSize = queue.size();
- if (queueSize > 0) {
+ if (queueSize > 0 && runners.isEmpty()) {
+ // TODO: can this still happen?
log.warn("No more runners, but queue still has "+
queueSize+" adding more runners to process remaining requests on queue");
- MDC.put("ConcurrentUpdateSolrClient.url", client.getBaseURL());
- try {
- Runner r = new Runner();
- runners.add(r);
- scheduler.execute(r);
- } finally {
- MDC.remove("ConcurrentUpdateSolrClient.url");
- }
+ addRunner();
}
}
}