You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2017/02/22 19:44:31 UTC
[1/3] lucene-solr:master: SOLR-10126: Improve test a bit.
Repository: lucene-solr
Updated Branches:
refs/heads/master 55ef713eb -> d6337ac3e
SOLR-10126: Improve test a bit.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/be64c26c
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/be64c26c
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/be64c26c
Branch: refs/heads/master
Commit: be64c26c270fc9663609492de77c1dec5574afda
Parents: 55ef713
Author: markrmiller <ma...@apache.org>
Authored: Wed Feb 22 12:52:07 2017 -0500
Committer: markrmiller <ma...@apache.org>
Committed: Wed Feb 22 14:44:17 2017 -0500
----------------------------------------------------------------------
.../solr/cloud/PeerSyncReplicationTest.java | 26 ++++++++++++++------
1 file changed, 19 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/be64c26c/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java b/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java
index 416e95e..0859eb5 100644
--- a/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java
@@ -195,9 +195,15 @@ public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase {
}
}
+ class IndexInBackGround extends Thread {
+ private int numDocs;
- private void indexInBackground(int numDocs) {
- new Thread(() -> {
+ public IndexInBackGround(int numDocs) {
+ super(getClassName());
+ this.numDocs = numDocs;
+ }
+
+ public void run() {
try {
for (int i = 0; i < numDocs; i++) {
indexDoc(id, docId, i1, 50, tlong, 50, t1, "document number " + docId);
@@ -209,10 +215,7 @@ public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase {
log.error("Error indexing doc in background", e);
//Throwing an error here will kill the thread
}
- }, getClassName())
- .start();
-
-
+ }
}
@@ -269,7 +272,8 @@ public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase {
// disable fingerprint check if needed
System.setProperty("solr.disableFingerprint", String.valueOf(disableFingerprint));
- indexInBackground(50);
+ IndexInBackGround iib = new IndexInBackGround(50);
+ iib.start();
// bring back dead node and ensure it recovers
ChaosMonkey.start(nodeToBringUp.jetty);
@@ -284,6 +288,14 @@ public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase {
jetties.removeAll(nodesDown);
assertEquals(getShardCount() - nodesDown.size(), jetties.size());
+ waitForThingsToLevelOut(30);
+
+ iib.join();
+
+ cloudClient.commit();
+
+ checkShardConsistency(false, false);
+
long cloudClientDocs = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();
assertEquals(docId, cloudClientDocs);
[2/3] lucene-solr:master: SOLR-9855: DynamicInterceptor in
HttpClientUtils use synchronization that can deadlock and puts a global mutex
around per request process calls.
Posted by ma...@apache.org.
SOLR-9855: DynamicInterceptor in HttpClientUtils use synchronization that can deadlock and puts a global mutex around per request process calls.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/2f82409e
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/2f82409e
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/2f82409e
Branch: refs/heads/master
Commit: 2f82409e5b3a90363941caa3767c3de2abecdaf0
Parents: be64c26
Author: markrmiller <ma...@apache.org>
Authored: Wed Feb 22 13:01:21 2017 -0500
Committer: markrmiller <ma...@apache.org>
Committed: Wed Feb 22 14:44:18 2017 -0500
----------------------------------------------------------------------
.../org/apache/solr/client/solrj/impl/HttpClientUtil.java | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2f82409e/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java
index 7ee90e1..7a84e7f 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java
@@ -19,10 +19,9 @@ package org.apache.solr.client.solrj.impl;
import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.zip.GZIPInputStream;
@@ -128,7 +127,7 @@ public class HttpClientUtil {
private static volatile SchemaRegistryProvider schemaRegistryProvider;
private static volatile String cookiePolicy;
- private static final List<HttpRequestInterceptor> interceptors = Collections.synchronizedList(new ArrayList<HttpRequestInterceptor>());
+ private static final List<HttpRequestInterceptor> interceptors = new CopyOnWriteArrayList<>();
static {
@@ -156,6 +155,9 @@ public class HttpClientUtil {
@Override
public void process(HttpRequest request, HttpContext context) throws HttpException, IOException {
+ // don't synchronize traversal - can lead to deadlock - CopyOnWriteArrayList is critical
+ // we also do not want to have to acquire the mutex when the list is empty or put a global
+ // mutex around the process calls
interceptors.forEach(new Consumer<HttpRequestInterceptor>() {
@Override
[3/3] lucene-solr:master: SOLR-9824: Some bulk update paths could be
very slow due to CUSC polling.
Posted by ma...@apache.org.
SOLR-9824: Some bulk update paths could be very slow due to CUSC polling.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/d6337ac3
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/d6337ac3
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/d6337ac3
Branch: refs/heads/master
Commit: d6337ac3e566c504766d69499ab470bd26744a29
Parents: 2f82409
Author: markrmiller <ma...@apache.org>
Authored: Wed Feb 22 13:00:42 2017 -0500
Committer: markrmiller <ma...@apache.org>
Committed: Wed Feb 22 14:44:18 2017 -0500
----------------------------------------------------------------------
solr/CHANGES.txt | 2 +
.../handler/loader/ContentStreamLoader.java | 2 -
.../solr/handler/loader/JavabinLoader.java | 3 -
.../apache/solr/update/AddUpdateCommand.java | 2 -
.../apache/solr/update/SolrCmdDistributor.java | 15 +-
.../solr/update/StreamingSolrClients.java | 2 +-
.../processor/DistributedUpdateProcessor.java | 7 +
.../solrj/impl/ConcurrentUpdateSolrClient.java | 317 ++++++++++++++-----
8 files changed, 252 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6337ac3/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index a6b5504..dcea40c 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -178,6 +178,8 @@ Bug Fixes
* SOLR-10168: ShardSplit can fail with NPE in OverseerCollectionMessageHandler#waitForCoreAdminAsyncCallToComplete. (Mark Miller)
+* SOLR-9824: Some bulk update paths could be very slow due to CUSC polling. (David Smiley, Mark Miller)
+
Optimizations
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6337ac3/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java b/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java
index 1dd038f..7751b43 100644
--- a/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java
+++ b/solr/core/src/java/org/apache/solr/handler/loader/ContentStreamLoader.java
@@ -29,8 +29,6 @@ import org.apache.solr.update.processor.UpdateRequestProcessor;
*/
public abstract class ContentStreamLoader {
- protected static final int pollQueueTime = Integer.getInteger("solr.cloud.replication.poll-queue-time-ms", 25);
-
/**
* This should be called once for each RequestHandler
*/
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6337ac3/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java b/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
index 6114280..873bcd1 100644
--- a/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
+++ b/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
@@ -116,9 +116,6 @@ public class JavabinLoader extends ContentStreamLoader {
private AddUpdateCommand getAddCommand(SolrQueryRequest req, SolrParams params) {
AddUpdateCommand addCmd = new AddUpdateCommand(req);
- // since we can give a hint to the leader that the end of a batch is being processed, it's OK to have a larger
- // pollQueueTime than the default 0 since we can optimize around not waiting unnecessarily
- addCmd.pollQueueTime = pollQueueTime;
addCmd.overwrite = params.getBool(UpdateParams.OVERWRITE, true);
addCmd.commitWithin = params.getInt(UpdateParams.COMMIT_WITHIN, -1);
return addCmd;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6337ac3/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java b/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
index 0ede728..f526397 100644
--- a/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
+++ b/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
@@ -60,8 +60,6 @@ public class AddUpdateCommand extends UpdateCommand implements Iterable<Document
public int commitWithin = -1;
public boolean isLastDocInBatch = false;
-
- public int pollQueueTime = 0;
public AddUpdateCommand(SolrQueryRequest req) {
super(req);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6337ac3/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index 5caf43e..dac4000 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -36,6 +36,7 @@ import org.apache.solr.update.processor.DistributedUpdateProcessor.RequestReplic
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
@@ -51,7 +52,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-public class SolrCmdDistributor {
+public class SolrCmdDistributor implements Closeable {
private static final int MAX_RETRIES_ON_FORWARD = 25;
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -96,6 +97,10 @@ public class SolrCmdDistributor {
clients.shutdown();
}
}
+
+ public void close() {
+ clients.shutdown();
+ }
private void doRetriesIfNeeded() {
// NOTE: retries will be forwards to a single url
@@ -210,7 +215,7 @@ public class SolrCmdDistributor {
if (cmd.isInPlaceUpdate()) {
params.set(DistributedUpdateProcessor.DISTRIB_INPLACE_PREVVERSION, String.valueOf(cmd.prevVersion));
}
- submit(new Req(cmd, node, uReq, synchronous, rrt, cmd.pollQueueTime), false);
+ submit(new Req(cmd, node, uReq, synchronous, rrt), false);
}
}
@@ -314,19 +319,17 @@ public class SolrCmdDistributor {
public boolean synchronous;
public UpdateCommand cmd;
public RequestReplicationTracker rfTracker;
- public int pollQueueTime;
public Req(UpdateCommand cmd, Node node, UpdateRequest uReq, boolean synchronous) {
- this(cmd, node, uReq, synchronous, null, 0);
+ this(cmd, node, uReq, synchronous, null);
}
- public Req(UpdateCommand cmd, Node node, UpdateRequest uReq, boolean synchronous, RequestReplicationTracker rfTracker, int pollQueueTime) {
+ public Req(UpdateCommand cmd, Node node, UpdateRequest uReq, boolean synchronous, RequestReplicationTracker rfTracker) {
this.node = node;
this.uReq = uReq;
this.synchronous = synchronous;
this.cmd = cmd;
this.rfTracker = rfTracker;
- this.pollQueueTime = pollQueueTime;
}
public String toString() {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6337ac3/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java b/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
index fc50be2..7c630f4 100644
--- a/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
+++ b/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
@@ -73,9 +73,9 @@ public class StreamingSolrClients {
// on a greater scale since the current behavior is to only increase the number of connections/Runners when
// the queue is more than half full.
client = new ErrorReportingConcurrentUpdateSolrClient(url, httpClient, 100, runnerCount, updateExecutor, true, req);
+ client.setPollQueueTime(Integer.MAX_VALUE); // minimize connections created
client.setParser(new BinaryResponseParser());
client.setRequestWriter(new BinaryRequestWriter());
- client.setPollQueueTime(req.pollQueueTime);
Set<String> queryParams = new HashSet<>(2);
queryParams.add(DistributedUpdateProcessor.DISTRIB_FROM);
queryParams.add(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6337ac3/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index c6ccb71..ec093cf 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -828,6 +828,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// Given that, it may also make sense to move the version reporting out of this
// processor too.
}
+
+ @Override
+ protected void doClose() {
+ if (cmdDistrib != null) {
+ cmdDistrib.close();
+ }
+ }
// TODO: optionally fail if n replicas are not reached...
private void doFinish() {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6337ac3/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
index 5c3f289..4eac2a5 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClient.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
@@ -87,7 +88,13 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
private boolean internalHttpClient;
private volatile Integer connectionTimeout;
private volatile Integer soTimeout;
-
+ private volatile boolean closed;
+
+ AtomicInteger pollInterrupts;
+ AtomicInteger pollExits;
+ AtomicInteger blockLoops;
+ AtomicInteger emptyQueueLoops;
+
/**
* Uses an internally managed HttpClient instance.
*
@@ -156,6 +163,13 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
scheduler = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("concurrentUpdateScheduler"));
shutdownExecutor = true;
}
+
+ if (log.isDebugEnabled()) {
+ pollInterrupts = new AtomicInteger();
+ pollExits = new AtomicInteger();
+ blockLoops = new AtomicInteger();
+ emptyQueueLoops = new AtomicInteger();
+ }
}
public Set<String> getQueryParams() {
@@ -174,13 +188,19 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
* Opens a connection and sends everything...
*/
class Runner implements Runnable {
+ volatile Thread thread = null;
+ volatile boolean inPoll = false;
+
+ public Thread getThread() {
+ return thread;
+ }
+
@Override
public void run() {
+ this.thread = Thread.currentThread();
log.debug("starting runner: {}", this);
-
// This loop is so we can continue if an element was added to the queue after the last runner exited.
for (;;) {
-
try {
sendUpdateStream();
@@ -191,7 +211,6 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
}
handleError(e);
} finally {
-
synchronized (runners) {
// check to see if anything else was added to the queue
if (runners.size() == 1 && !queue.isEmpty() && !scheduler.isShutdown()) {
@@ -205,26 +224,42 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
break;
}
}
-
}
}
log.debug("finished: {}", this);
}
+ public void interruptPoll() {
+ Thread lthread = thread;
+ if (inPoll && lthread != null) {
+ lthread.interrupt();
+ }
+ }
+
//
// Pull from the queue multiple times and streams over a single connection.
// Exits on exception, interruption, or an empty queue to pull from.
//
void sendUpdateStream() throws Exception {
+
while (!queue.isEmpty()) {
HttpPost method = null;
HttpResponse response = null;
-
+
InputStream rspBody = null;
try {
- final Update update =
- queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
+ Update update;
+ notifyQueueAndRunnersIfEmptyQueue();
+ try {
+ inPoll = true;
+ update = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ if (log.isDebugEnabled()) pollInterrupts.incrementAndGet();
+ continue;
+ } finally {
+ inPoll = false;
+ }
if (update == null)
break;
@@ -234,61 +269,73 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
final ModifiableSolrParams origParams = new ModifiableSolrParams(update.getRequest().getParams());
EntityTemplate template = new EntityTemplate(new ContentProducer() {
-
+
@Override
public void writeTo(OutputStream out) throws IOException {
- try {
- if (isXml) {
- out.write("<stream>".getBytes(StandardCharsets.UTF_8)); // can be anything
+
+ if (isXml) {
+ out.write("<stream>".getBytes(StandardCharsets.UTF_8)); // can be anything
+ }
+ Update upd = update;
+ while (upd != null) {
+ UpdateRequest req = upd.getRequest();
+ SolrParams currentParams = new ModifiableSolrParams(req.getParams());
+ if (!origParams.toNamedList().equals(currentParams.toNamedList())) {
+ queue.add(upd); // params are different, push back to queue
+ break;
}
- Update upd = update;
- while (upd != null) {
- UpdateRequest req = upd.getRequest();
- SolrParams currentParams = new ModifiableSolrParams(req.getParams());
- if (!origParams.toNamedList().equals(currentParams.toNamedList())) {
- queue.add(upd); // params are different, push back to queue
- break;
- }
- client.requestWriter.write(req, out);
- if (isXml) {
- // check for commit or optimize
- SolrParams params = req.getParams();
- if (params != null) {
- String fmt = null;
- if (params.getBool(UpdateParams.OPTIMIZE, false)) {
- fmt = "<optimize waitSearcher=\"%s\" />";
- } else if (params.getBool(UpdateParams.COMMIT, false)) {
- fmt = "<commit waitSearcher=\"%s\" />";
- }
- if (fmt != null) {
- byte[] content = String.format(Locale.ROOT,
- fmt,
- params.getBool(UpdateParams.WAIT_SEARCHER, false)
- + "").getBytes(StandardCharsets.UTF_8);
- out.write(content);
- }
+ client.requestWriter.write(req, out);
+ if (isXml) {
+ // check for commit or optimize
+ SolrParams params = req.getParams();
+ if (params != null) {
+ String fmt = null;
+ if (params.getBool(UpdateParams.OPTIMIZE, false)) {
+ fmt = "<optimize waitSearcher=\"%s\" />";
+ } else if (params.getBool(UpdateParams.COMMIT, false)) {
+ fmt = "<commit waitSearcher=\"%s\" />";
+ }
+ if (fmt != null) {
+ byte[] content = String.format(Locale.ROOT,
+ fmt, params.getBool(UpdateParams.WAIT_SEARCHER, false)
+ + "")
+ .getBytes(StandardCharsets.UTF_8);
+ out.write(content);
}
}
- out.flush();
-
- if (pollQueueTime > 0 && threadCount == 1 && req.isLastDocInBatch()) {
- // no need to wait to see another doc in the queue if we've hit the last doc in a batch
- upd = queue.poll(0, TimeUnit.MILLISECONDS);
- } else {
- upd = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
- }
-
}
-
- if (isXml) {
- out.write("</stream>".getBytes(StandardCharsets.UTF_8));
+ out.flush();
+
+ notifyQueueAndRunnersIfEmptyQueue();
+ inPoll = true;
+ try {
+ while (true) {
+ try {
+ upd = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
+ break;
+ } catch (InterruptedException e) {
+ if (log.isDebugEnabled()) pollInterrupts.incrementAndGet();
+ if (!queue.isEmpty()) {
+ continue;
+ }
+ if (log.isDebugEnabled()) pollExits.incrementAndGet();
+ upd = null;
+ break;
+ } finally {
+ inPoll = false;
+ }
+ }
+ }finally {
+ inPoll = false;
}
+ }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.warn("", e);
+ if (isXml) {
+ out.write("</stream>".getBytes(StandardCharsets.UTF_8));
}
+
+
}
});
@@ -318,10 +365,13 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
method.setEntity(template);
method.addHeader("User-Agent", HttpSolrClient.AGENT);
method.addHeader("Content-Type", contentType);
-
+
+
response = client.getHttpClient()
.execute(method, HttpClientUtil.createNewHttpClientRequestContext());
+
rspBody = response.getEntity().getContent();
+
int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
StringBuilder msg = new StringBuilder();
@@ -364,6 +414,7 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
} else {
onSuccess(response);
}
+
} finally {
try {
if (response != null) {
@@ -372,10 +423,25 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
} catch (Exception e) {
log.error("Error consuming and closing http response stream.", e);
}
+ notifyQueueAndRunnersIfEmptyQueue();
}
}
}
}
+
+ private void notifyQueueAndRunnersIfEmptyQueue() {
+ if (queue.size() == 0) {
+ synchronized (queue) {
+ // queue may be empty
+ queue.notifyAll();
+ }
+ synchronized (runners) {
+ // we notify runners too - if there is a high queue poll time and this is the update
+ // that emptied the queue, we make an attempt to avoid the 250ms timeout in blockUntilFinished
+ runners.notifyAll();
+ }
+ }
+ }
// *must* be called with runners monitor held, e.g. synchronized(runners){ addRunner() }
private void addRunner() {
@@ -383,7 +449,9 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
try {
Runner r = new Runner();
runners.add(r);
+
scheduler.execute(r); // this can throw an exception if the scheduler has been shutdown, but that should be fine.
+
} finally {
MDC.remove("ConcurrentUpdateSolrClient.url");
}
@@ -517,29 +585,52 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
public synchronized void blockUntilFinished() {
lock = new CountDownLatch(1);
try {
+
+ waitForEmptyQueue();
+ interruptRunnerThreadsPolling();
+
synchronized (runners) {
// NOTE: if the executor is shut down, runners may never become empty (a scheduled task may never be run,
- // which means it would never remove itself from the runners list. This is why we don't wait forever
+ // which means it would never remove itself from the runners list. This is why we don't wait forever
// and periodically check if the scheduler is shutting down.
+ int loopCount = 0;
while (!runners.isEmpty()) {
- try {
- runners.wait(250);
- } catch (InterruptedException e) {
- Thread.interrupted();
- }
+
+ if (log.isDebugEnabled()) blockLoops.incrementAndGet();
if (scheduler.isShutdown())
break;
-
+
+ loopCount++;
+
// Need to check if the queue is empty before really considering this is finished (SOLR-4260)
int queueSize = queue.size();
if (queueSize > 0 && runners.isEmpty()) {
// TODO: can this still happen?
- log.warn("No more runners, but queue still has "+
- queueSize+" adding more runners to process remaining requests on queue");
+ log.warn("No more runners, but queue still has " +
+ queueSize + " adding more runners to process remaining requests on queue");
addRunner();
}
+
+ interruptRunnerThreadsPolling();
+
+ // try to avoid the worst case wait timeout
+ // without bad spin
+ int timeout;
+ if (loopCount < 3) {
+ timeout = 10;
+ } else if (loopCount < 10) {
+ timeout = 25;
+ } else {
+ timeout = 250;
+ }
+
+ try {
+ runners.wait(timeout);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
}
}
} finally {
@@ -548,6 +639,29 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
}
}
+ private void waitForEmptyQueue() {
+
+ while (!queue.isEmpty()) {
+ if (log.isDebugEnabled()) emptyQueueLoops.incrementAndGet();
+
+ synchronized (runners) {
+ int queueSize = queue.size();
+ if (queueSize > 0 && runners.isEmpty()) {
+ log.warn("No more runners, but queue still has " +
+ queueSize + " adding more runners to process remaining requests on queue");
+ addRunner();
+ }
+ }
+ synchronized (queue) {
+ try {
+ queue.wait(250);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+
public void handleError(Throwable ex) {
log.error("error", ex);
}
@@ -560,19 +674,42 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
}
@Override
- public void close() {
- if (internalHttpClient) IOUtils.closeQuietly(client);
- if (shutdownExecutor) {
- scheduler.shutdown();
- try {
- if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
+ public synchronized void close() {
+ if (closed) {
+ interruptRunnerThreadsPolling();
+ return;
+ }
+ closed = true;
+
+ try {
+ if (shutdownExecutor) {
+ scheduler.shutdown();
+ interruptRunnerThreadsPolling();
+ try {
+ if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
+ scheduler.shutdownNow();
+ if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) log
+ .error("ExecutorService did not terminate");
+ }
+ } catch (InterruptedException ie) {
scheduler.shutdownNow();
- if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) log
- .error("ExecutorService did not terminate");
+ Thread.currentThread().interrupt();
}
- } catch (InterruptedException ie) {
- scheduler.shutdownNow();
- Thread.currentThread().interrupt();
+ } else {
+ interruptRunnerThreadsPolling();
+ }
+ } finally {
+ if (internalHttpClient) IOUtils.closeQuietly(client);
+ if (log.isDebugEnabled()) {
+ log.debug("STATS pollInteruppts={} pollExists={} blockLoops={} emptyQueueLoops={}", pollInterrupts.get(), pollExits.get(), blockLoops.get(), emptyQueueLoops.get());
+ }
+ }
+ }
+
+ private void interruptRunnerThreadsPolling() {
+ synchronized (runners) {
+ for (Runner runner : runners) {
+ runner.interruptPoll();
}
}
}
@@ -590,17 +727,29 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
}
public void shutdownNow() {
- if (internalHttpClient) IOUtils.closeQuietly(client);
- if (shutdownExecutor) {
- scheduler.shutdownNow(); // Cancel currently executing tasks
- try {
- if (!scheduler.awaitTermination(30, TimeUnit.SECONDS))
- log.error("ExecutorService did not terminate");
- } catch (InterruptedException ie) {
- scheduler.shutdownNow();
- Thread.currentThread().interrupt();
+ if (closed) {
+ return;
+ }
+ closed = true;
+ try {
+
+ if (shutdownExecutor) {
+ scheduler.shutdown();
+ interruptRunnerThreadsPolling();
+ scheduler.shutdownNow(); // Cancel currently executing tasks
+ try {
+ if (!scheduler.awaitTermination(30, TimeUnit.SECONDS))
+ log.error("ExecutorService did not terminate");
+ } catch (InterruptedException ie) {
+ scheduler.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ } else {
+ interruptRunnerThreadsPolling();
}
- }
+ } finally {
+ if (internalHttpClient) IOUtils.closeQuietly(client);
+ }
}
public void setParser(ResponseParser responseParser) {