You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/11/19 21:10:31 UTC
[3/4] lucene-solr:jira/http2: Cleanup
Cleanup
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/c7655028
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/c7655028
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/c7655028
Branch: refs/heads/jira/http2
Commit: c7655028d38589d6315214afa41b5dfb7b5b5675
Parents: 717437c
Author: Cao Manh Dat <da...@apache.org>
Authored: Mon Nov 19 11:26:27 2018 +0000
Committer: Cao Manh Dat <da...@apache.org>
Committed: Mon Nov 19 11:26:27 2018 +0000
----------------------------------------------------------------------
.../solr/security/PKIAuthenticationPlugin.java | 5 -
.../apache/solr/update/SolrCmdDistributor.java | 3 +-
.../solr/update/StreamingSolrClients.java | 18 +-
.../impl/ConcurrentUpdateHttp2SolrClient.java | 799 +++++++++++++++++++
.../solrj/impl/ConcurrentUpdateSolr2Client.java | 799 -------------------
5 files changed, 809 insertions(+), 815 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c7655028/solr/core/src/java/org/apache/solr/security/PKIAuthenticationPlugin.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/security/PKIAuthenticationPlugin.java b/solr/core/src/java/org/apache/solr/security/PKIAuthenticationPlugin.java
index 6f85d95..0dbfa7e 100644
--- a/solr/core/src/java/org/apache/solr/security/PKIAuthenticationPlugin.java
+++ b/solr/core/src/java/org/apache/solr/security/PKIAuthenticationPlugin.java
@@ -248,7 +248,6 @@ public class PKIAuthenticationPlugin extends AuthenticationPlugin implements Htt
SolrRequestInfo reqInfo = getRequestInfo();
String usr;
if (reqInfo != null) {
- log.debug("ReqInfo is not null");
Principal principal = reqInfo.getReq().getUserPrincipal();
if (principal == null) {
log.debug("principal is null");
@@ -259,14 +258,11 @@ public class PKIAuthenticationPlugin extends AuthenticationPlugin implements Htt
usr = principal.getName();
}
} else {
- log.debug("SolrRequestInfo is null");
if (!isSolrThread()) {
- log.debug("Not in Solr threadpool");
//if this is not running inside a Solr threadpool (as in testcases)
// then no need to add any header
return Optional.empty();
}
- log.debug("usr = $");
//this request seems to be originated from Solr itself
usr = "$"; //special name to denote the user is the node itself
}
@@ -276,7 +272,6 @@ public class PKIAuthenticationPlugin extends AuthenticationPlugin implements Htt
byte[] payload = s.getBytes(UTF_8);
byte[] payloadCipher = publicKeyHandler.keyPair.encrypt(ByteBuffer.wrap(payload));
String base64Cipher = Base64.byteArrayToBase64(payloadCipher);
- log.debug("Base64Ciphoer {}", base64Cipher);
return Optional.of(base64Cipher);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c7655028/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index 72d0d30..49d58bd 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -539,8 +539,7 @@ public class SolrCmdDistributor implements Closeable {
private boolean isRetriableException(Throwable t) {
return t instanceof SocketException
|| t instanceof NoHttpResponseException
- || t instanceof SocketTimeoutException
- || t instanceof EofException;
+ || t instanceof SocketTimeoutException;
}
@Override
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c7655028/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java b/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
index 74e9801..698c2c4 100644
--- a/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
+++ b/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
@@ -30,7 +30,7 @@ import java.util.concurrent.ExecutorService;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.BinaryRequestWriter;
import org.apache.solr.client.solrj.impl.BinaryResponseParser;
-import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolr2Client;
+import org.apache.solr.client.solrj.impl.ConcurrentUpdateHttp2SolrClient;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.common.SolrException;
import org.apache.solr.update.SolrCmdDistributor.Error;
@@ -47,7 +47,7 @@ public class StreamingSolrClients {
private Http2SolrClient httpClient;
- private Map<String, ConcurrentUpdateSolr2Client> solrClients = new HashMap<>();
+ private Map<String, ConcurrentUpdateHttp2SolrClient> solrClients = new HashMap<>();
private List<Error> errors = Collections.synchronizedList(new ArrayList<Error>());
private ExecutorService updateExecutor;
@@ -73,7 +73,7 @@ public class StreamingSolrClients {
public synchronized SolrClient getSolrClient(final SolrCmdDistributor.Req req) {
String url = getFullUrl(req.node.getUrl());
- ConcurrentUpdateSolr2Client client = solrClients.get(url);
+ ConcurrentUpdateHttp2SolrClient client = solrClients.get(url);
if (client == null) {
// NOTE: increasing to more than 1 threadCount for the client could cause updates to be reordered
// on a greater scale since the current behavior is to only increase the number of connections/Runners when
@@ -101,13 +101,13 @@ public class StreamingSolrClients {
}
public synchronized void blockUntilFinished() {
- for (ConcurrentUpdateSolr2Client client : solrClients.values()) {
+ for (ConcurrentUpdateHttp2SolrClient client : solrClients.values()) {
client.blockUntilFinished();
}
}
public synchronized void shutdown() {
- for (ConcurrentUpdateSolr2Client client : solrClients.values()) {
+ for (ConcurrentUpdateHttp2SolrClient client : solrClients.values()) {
client.close();
}
}
@@ -131,7 +131,7 @@ public class StreamingSolrClients {
}
}
-class ErrorReportingConcurrentUpdateSolrClient extends ConcurrentUpdateSolr2Client {
+class ErrorReportingConcurrentUpdateSolrClient extends ConcurrentUpdateHttp2SolrClient {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final SolrCmdDistributor.Req req;
private final List<Error> errors;
@@ -144,7 +144,7 @@ class ErrorReportingConcurrentUpdateSolrClient extends ConcurrentUpdateSolr2Clie
@Override
public void handleError(Throwable ex) {
- log.error("Datcm error", ex);
+ log.error("error", ex);
Error error = new Error();
error.e = (Exception) ex;
if (ex instanceof SolrException) {
@@ -154,7 +154,7 @@ class ErrorReportingConcurrentUpdateSolrClient extends ConcurrentUpdateSolr2Clie
errors.add(error);
if (!req.shouldRetry(error)) {
// only track the error if we are not retrying the request
- req.trackRequestResult(null,null, false);
+ req.trackRequestResult(null, null, false);
}
}
@Override
@@ -162,7 +162,7 @@ class ErrorReportingConcurrentUpdateSolrClient extends ConcurrentUpdateSolr2Clie
req.trackRequestResult(resp, respBody, true);
}
- static class Builder extends ConcurrentUpdateSolr2Client.Builder {
+ static class Builder extends ConcurrentUpdateHttp2SolrClient.Builder {
protected SolrCmdDistributor.Req req;
protected List<Error> errors;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c7655028/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java
new file mode 100644
index 0000000..54935ce
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java
@@ -0,0 +1,799 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.invoke.MethodHandles;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedList;
+import java.util.Locale;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.client.solrj.ResponseParser;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.RequestWriter;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.util.ClientUtils;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.StringUtils;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.params.UpdateParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
+import org.eclipse.jetty.client.api.Request;
+import org.eclipse.jetty.client.api.Response;
+import org.eclipse.jetty.client.util.InputStreamResponseListener;
+import org.eclipse.jetty.client.util.OutputStreamContentProvider;
+import org.eclipse.jetty.http.HttpMethod;
+import org.eclipse.jetty.http.HttpStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
+ private static final long serialVersionUID = 1L;
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final Update END_UPDATE = new Update(null, null);
+ private Http2SolrClient client;
+ private final String basePath;
+ final BlockingQueue<Update> queue;
+ final ExecutorService scheduler;
+ final Queue<Runner> runners;
+ volatile CountDownLatch lock = null; // used to block everything
+ final int threadCount;
+ boolean shutdownExecutor = false;
+ int pollQueueTime = 250;
+ private final boolean streamDeletes;
+ private volatile Integer connectionTimeout;
+ private volatile Integer soTimeout;
+ private volatile boolean closed;
+
+ /**
+ * Uses the supplied HttpClient to send documents to the Solr server.
+ *
+ * @deprecated use {@link ConcurrentUpdateSolrClient#ConcurrentUpdateSolrClient(ConcurrentUpdateSolrClient.Builder)} instead, as it is a more extension/subclassing-friendly alternative
+ */
+ @Deprecated
+ protected ConcurrentUpdateHttp2SolrClient(String solrServerUrl,
+ Http2SolrClient client, int queueSize, int threadCount,
+ ExecutorService es, boolean streamDeletes) {
+ this((streamDeletes) ?
+ new Builder(solrServerUrl)
+ .withHttpClient(client)
+ .withQueueSize(queueSize)
+ .withThreadCount(threadCount)
+ .withExecutorService(es)
+ .alwaysStreamDeletes() :
+ new Builder(solrServerUrl)
+ .withHttpClient(client)
+ .withQueueSize(queueSize)
+ .withThreadCount(threadCount)
+ .withExecutorService(es)
+ .neverStreamDeletes());
+ }
+
+ protected ConcurrentUpdateHttp2SolrClient(Builder builder) {
+ this.client = builder.httpClient;
+ this.queue = new LinkedBlockingQueue<>(builder.queueSize);
+ this.threadCount = builder.threadCount;
+ this.runners = new LinkedList<>();
+ this.streamDeletes = builder.streamDeletes;
+ this.connectionTimeout = builder.connectionTimeoutMillis;
+ this.soTimeout = builder.socketTimeoutMillis;
+ this.basePath = builder.baseSolrUrl;
+
+ if (builder.executorService != null) {
+ this.scheduler = builder.executorService;
+ this.shutdownExecutor = false;
+ } else {
+ this.scheduler = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("concurrentUpdateScheduler"));
+ this.shutdownExecutor = true;
+ }
+
+ }
+
+ public Set<String> getQueryParams() {
+ return this.client.getQueryParams();
+ }
+
+ /**
+ * Expert Method.
+ * @param queryParams set of param keys to only send via the query string
+ */
+ public void setQueryParams(Set<String> queryParams) {
+ this.client.setQueryParams(queryParams);
+ }
+
+ /**
+ * Opens a connection and sends everything...
+ */
+ class Runner implements Runnable {
+
+ @Override
+ public void run() {
+ log.debug("starting runner: {}", this);
+ // This loop is so we can continue if an element was added to the queue after the last runner exited.
+ for (;;) {
+ try {
+
+ sendUpdateStream();
+
+ } catch (Throwable e) {
+ if (e instanceof OutOfMemoryError) {
+ throw (OutOfMemoryError) e;
+ }
+ handleError(e);
+ } finally {
+ synchronized (runners) {
+ // check to see if anything else was added to the queue
+ if (runners.size() == 1 && !queue.isEmpty() && !scheduler.isShutdown()) {
+ // If there is something else to process, keep last runner alive by staying in the loop.
+ } else {
+ runners.remove(this);
+ if (runners.isEmpty()) {
+ // notify anyone waiting in blockUntilFinished
+ runners.notifyAll();
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ log.debug("finished: {}", this);
+ }
+
+ //
+ // Pull from the queue multiple times and streams over a single connection.
+ // Exits on exception, interruption, or an empty queue to pull from.
+ //
+ void sendUpdateStream() throws Exception {
+
+ try {
+ while (!queue.isEmpty()) {
+ InputStream rspBody = null;
+ try {
+ Update update;
+ notifyQueueAndRunnersIfEmptyQueue();
+ update = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
+
+ if (update == END_UPDATE)
+ break;
+ if (update == null)
+ break;
+
+ String contentType = client.requestWriter.getUpdateContentType();
+ final boolean isXml = ClientUtils.TEXT_XML.equals(contentType);
+
+ final ModifiableSolrParams origParams = new ModifiableSolrParams(update.getRequest().getParams());
+ final String origTargetCollection = update.getCollection();
+
+ // The parser 'wt=' and 'version=' params are used instead of the
+ // original params
+ ModifiableSolrParams requestParams = new ModifiableSolrParams(origParams);
+ requestParams.set(CommonParams.WT, client.parser.getWriterType());
+ requestParams.set(CommonParams.VERSION, client.parser.getVersion());
+
+ String basePath = ConcurrentUpdateHttp2SolrClient.this.basePath;
+ if (update.getCollection() != null)
+ basePath += "/" + update.getCollection();
+ if (!basePath.endsWith("/"))
+ basePath += "/";
+
+ OutputStreamContentProvider provider = new OutputStreamContentProvider();
+ Request postRequest = client.getHttpClient()
+ .newRequest(basePath + "update"
+ + requestParams.toQueryString())
+ .method(HttpMethod.POST)
+ .timeout(connectionTimeout, TimeUnit.MILLISECONDS)
+ .timeout(soTimeout, TimeUnit.MILLISECONDS)
+ .header("User-Agent", HttpSolrClient.AGENT)
+ .header("Content-Type", contentType)
+ .content(provider);
+ InputStreamResponseListener responseListener = new InputStreamResponseListener();
+ client.addNecessaryAuth(postRequest);
+ postRequest.send(responseListener);
+
+ try (OutputStream out = provider.getOutputStream()) {
+ if (isXml) {
+ out.write("<stream>".getBytes(StandardCharsets.UTF_8)); // can be anything
+ }
+ Update upd = update;
+ while (upd != null && upd != END_UPDATE) {
+ UpdateRequest req = upd.getRequest();
+ SolrParams currentParams = new ModifiableSolrParams(req.getParams());
+ if (!origParams.toNamedList().equals(currentParams.toNamedList()) || !StringUtils.equals(origTargetCollection, upd.getCollection())) {
+ queue.add(upd); // Request has different params or destination core/collection, return to queue
+ break;
+ }
+
+ client.requestWriter.write(req, out);
+ if (isXml) {
+ // check for commit or optimize
+ SolrParams params = req.getParams();
+ if (params != null) {
+ String fmt = null;
+ if (params.getBool(UpdateParams.OPTIMIZE, false)) {
+ fmt = "<optimize waitSearcher=\"%s\" />";
+ } else if (params.getBool(UpdateParams.COMMIT, false)) {
+ fmt = "<commit waitSearcher=\"%s\" />";
+ }
+ if (fmt != null) {
+ byte[] content = String.format(Locale.ROOT,
+ fmt, params.getBool(UpdateParams.WAIT_SEARCHER, false)
+ + "")
+ .getBytes(StandardCharsets.UTF_8);
+ out.write(content);
+ }
+ }
+ }
+ out.flush();
+
+ notifyQueueAndRunnersIfEmptyQueue();
+ upd = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
+ }
+
+ if (isXml) {
+ out.write("</stream>".getBytes(StandardCharsets.UTF_8));
+ }
+ }
+
+ Response response = responseListener.get(soTimeout, TimeUnit.MILLISECONDS);
+ rspBody = responseListener.getInputStream();
+
+ int statusCode = response.getStatus();
+ if (statusCode != HttpStatus.OK_200) {
+ StringBuilder msg = new StringBuilder();
+ msg.append(response.getReason());
+ msg.append("\n\n\n\n");
+ msg.append("request: ").append(postRequest.getURI());
+
+ SolrException solrExc;
+ NamedList<String> metadata = null;
+ // parse out the metadata from the SolrException
+ try {
+ String encoding = "UTF-8"; // default
+ NamedList<Object> resp = client.parser.processResponse(rspBody, encoding);
+ NamedList<Object> error = (NamedList<Object>) resp.get("error");
+ if (error != null) {
+ metadata = (NamedList<String>) error.get("metadata");
+ String remoteMsg = (String) error.get("msg");
+ if (remoteMsg != null) {
+ msg.append("\nRemote error message: ");
+ msg.append(remoteMsg);
+ }
+ }
+ } catch (Exception exc) {
+ // don't want to fail to report error if parsing the response fails
+ log.warn("Failed to parse error response from " + basePath + " due to: " + exc);
+ } finally {
+ solrExc = new HttpSolrClient.RemoteSolrException(basePath , statusCode, msg.toString(), null);
+ if (metadata != null) {
+ solrExc.setMetadata(metadata);
+ }
+ }
+
+ handleError(solrExc);
+ } else {
+ onSuccess(response, rspBody);
+ }
+
+ } finally {
+ try {
+ if (rspBody != null) {
+ while (rspBody.read() != -1) {}
+ }
+ } catch (Exception e) {
+ log.error("Error consuming and closing http response stream.", e);
+ }
+ notifyQueueAndRunnersIfEmptyQueue();
+ }
+ }
+ } catch (InterruptedException e) {
+ log.error("Interrupted on polling from queue", e);
+ }
+
+ }
+ }
+
+ private void notifyQueueAndRunnersIfEmptyQueue() {
+ if (queue.size() == 0) {
+ synchronized (queue) {
+ // queue may be empty
+ queue.notifyAll();
+ }
+ synchronized (runners) {
+ // we notify runners too - if there is a high queue poll time and this is the update
+ // that emptied the queue, we make an attempt to avoid the 250ms timeout in blockUntilFinished
+ runners.notifyAll();
+ }
+ }
+ }
+
+ // *must* be called with runners monitor held, e.g. synchronized(runners){ addRunner() }
+ private void addRunner() {
+ MDC.put("ConcurrentUpdateSolrClient.url", client.getBaseURL());
+ try {
+ Runner r = new Runner();
+ runners.add(r);
+ try {
+ scheduler.execute(r); // this can throw an exception if the scheduler has been shutdown, but that should be fine.
+ } catch (RuntimeException e) {
+ runners.remove(r);
+ throw e;
+ }
+ } finally {
+ MDC.remove("ConcurrentUpdateSolrClient.url");
+ }
+ }
+
+ /**
+ * Class representing an UpdateRequest and an optional collection.
+ */
+ static class Update {
+ UpdateRequest request;
+ String collection;
+ /**
+ *
+ * @param request the update request.
+ * @param collection The collection, can be null.
+ */
+ public Update(UpdateRequest request, String collection) {
+ this.request = request;
+ this.collection = collection;
+ }
+ /**
+ * @return the update request.
+ */
+ public UpdateRequest getRequest() {
+ return request;
+ }
+ public void setRequest(UpdateRequest request) {
+ this.request = request;
+ }
+ /**
+ * @return the collection, can be null.
+ */
+ public String getCollection() {
+ return collection;
+ }
+ public void setCollection(String collection) {
+ this.collection = collection;
+ }
+ }
+
+ @Override
+ public NamedList<Object> request(final SolrRequest request, String collection)
+ throws SolrServerException, IOException {
+ if (!(request instanceof UpdateRequest)) {
+ return client.request(request, collection);
+ }
+ UpdateRequest req = (UpdateRequest) request;
+ log.info("Client send req:{} to:{}", request, basePath);
+ req.setBasePath(basePath);
+ // this happens for commit...
+ if (streamDeletes) {
+ if ((req.getDocuments() == null || req.getDocuments().isEmpty())
+ && (req.getDeleteById() == null || req.getDeleteById().isEmpty())
+ && (req.getDeleteByIdMap() == null || req.getDeleteByIdMap().isEmpty())) {
+ if (req.getDeleteQuery() == null) {
+ blockUntilFinished();
+ return client.request(request, collection);
+ }
+ }
+ } else {
+ if ((req.getDocuments() == null || req.getDocuments().isEmpty())) {
+ blockUntilFinished();
+ return client.request(request, collection);
+ }
+ }
+
+
+ SolrParams params = req.getParams();
+ if (params != null) {
+ // check if it is waiting for the searcher
+ if (params.getBool(UpdateParams.WAIT_SEARCHER, false)) {
+ log.info("blocking for commit/optimize");
+ blockUntilFinished(); // empty the queue
+ return client.request(request, collection);
+ }
+ }
+
+ try {
+ CountDownLatch tmpLock = lock;
+ if (tmpLock != null) {
+ tmpLock.await();
+ }
+
+ Update update = new Update(req, collection);
+ boolean success = queue.offer(update);
+
+ for (;;) {
+ synchronized (runners) {
+ // see if queue is half full and we can add more runners
+ // special case: if only using a threadCount of 1 and the queue
+ // is filling up, allow 1 add'l runner to help process the queue
+ if (runners.isEmpty() || (queue.remainingCapacity() < queue.size() && runners.size() < threadCount))
+ {
+ // We need more runners, so start a new one.
+ addRunner();
+ } else {
+ // break out of the retry loop if we added the element to the queue
+ // successfully, *and*
+ // while we are still holding the runners lock to prevent race
+ // conditions.
+ if (success)
+ break;
+ }
+ }
+
+ // Retry to add to the queue w/o the runners lock held (else we risk
+ // temporary deadlock)
+ // This retry could also fail because
+ // 1) existing runners were not able to take off any new elements in the
+ // queue
+ // 2) the queue was filled back up since our last try
+ // If we succeed, the queue may have been completely emptied, and all
+ // runners stopped.
+ // In all cases, we should loop back to the top to see if we need to
+ // start more runners.
+ //
+ if (!success) {
+ success = queue.offer(update, 100, TimeUnit.MILLISECONDS);
+ }
+ }
+ } catch (InterruptedException e) {
+ log.error("interrupted", e);
+ throw new IOException(e.getLocalizedMessage());
+ }
+
+ // RETURN A DUMMY result
+ NamedList<Object> dummy = new NamedList<>();
+ dummy.add("NOTE", "the request is processed in a background stream");
+ return dummy;
+ }
+
+ public synchronized void blockUntilFinished() {
+ lock = new CountDownLatch(1);
+ try {
+
+ waitForEmptyQueue();
+ interruptRunnerThreadsPolling();
+
+ synchronized (runners) {
+
+ // NOTE: if the executor is shut down, runners may never become empty (a scheduled task may never be run,
+ // which means it would never remove itself from the runners list. This is why we don't wait forever
+ // and periodically check if the scheduler is shutting down.
+ int loopCount = 0;
+ while (!runners.isEmpty()) {
+
+ if (scheduler.isShutdown())
+ break;
+
+ loopCount++;
+
+ // Need to check if the queue is empty before really considering this is finished (SOLR-4260)
+ int queueSize = queue.size();
+ for (Update upd: queue) {
+ if (upd == END_UPDATE) {
+ queueSize--;
+ }
+ }
+ if (queueSize > 0 && runners.isEmpty()) {
+ // TODO: can this still happen?
+ log.warn("No more runners, but queue still has " +
+ queueSize + " adding more runners to process remaining requests on queue");
+ addRunner();
+ }
+
+ interruptRunnerThreadsPolling();
+
+ // try to avoid the worst case wait timeout
+ // without bad spin
+ int timeout;
+ if (loopCount < 3) {
+ timeout = 10;
+ } else if (loopCount < 10) {
+ timeout = 25;
+ } else {
+ timeout = 250;
+ }
+
+ try {
+ runners.wait(timeout);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ } finally {
+ lock.countDown();
+ lock = null;
+ }
+ }
+
+ private void waitForEmptyQueue() {
+ boolean threadInterrupted = Thread.currentThread().isInterrupted();
+
+ while (!queue.isEmpty()) {
+ if (scheduler.isTerminated()) {
+ log.warn("The task queue still has elements but the update scheduler {} is terminated. Can't process any more tasks. "
+ + "Queue size: {}, Runners: {}. Current thread Interrupted? {}", scheduler, queue.size(), runners.size(), threadInterrupted);
+ break;
+ }
+
+ synchronized (runners) {
+ int queueSize = queue.size();
+ if (queueSize > 0 && runners.isEmpty()) {
+ log.warn("No more runners, but queue still has " +
+ queueSize + " adding more runners to process remaining requests on queue");
+ addRunner();
+ }
+ }
+ synchronized (queue) {
+ try {
+ queue.wait(250);
+ } catch (InterruptedException e) {
+ // If we set the thread as interrupted again, the next time the wait it's called i t's going to return immediately
+ threadInterrupted = true;
+ log.warn("Thread interrupted while waiting for update queue to be empty. There are still {} elements in the queue.",
+ queue.size());
+ }
+ }
+ }
+ if (threadInterrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public void handleError(Throwable ex) {
+ log.error("error", ex);
+ }
+
+ /**
+ * Intended to be used as an extension point for doing post processing after a request completes.
+ */
+ public void onSuccess(Response resp, InputStream respBody) {
+ // no-op by design, override to add functionality
+ }
+
+ @Override
+ public synchronized void close() {
+ if (closed) {
+ interruptRunnerThreadsPolling();
+ return;
+ }
+ closed = true;
+
+ if (shutdownExecutor) {
+ scheduler.shutdown();
+ interruptRunnerThreadsPolling();
+ try {
+ if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
+ scheduler.shutdownNow();
+ if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) log
+ .error("ExecutorService did not terminate");
+ }
+ } catch (InterruptedException ie) {
+ scheduler.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ } else {
+ interruptRunnerThreadsPolling();
+ }
+ }
+
+ private void interruptRunnerThreadsPolling() {
+ synchronized (runners) {
+ for (Runner ignored : runners) {
+ queue.add(END_UPDATE);
+ }
+ }
+ }
+
+ /**
+ * @deprecated since 7.0 Use {@link ConcurrentUpdateSolrClient.Builder} methods instead.
+ */
+ @Deprecated
+ public void setConnectionTimeout(int timeout) {
+ this.connectionTimeout = timeout;
+ }
+
+ /**
+ * set soTimeout (read timeout) on the underlying HttpConnectionManager. This is desirable for queries, but probably
+ * not for indexing.
+ *
+ * @deprecated since 7.0 Use {@link ConcurrentUpdateSolrClient.Builder} methods instead.
+ */
+ @Deprecated
+ public void setSoTimeout(int timeout) {
+ this.soTimeout = timeout;
+ }
+
+ public void shutdownNow() {
+ if (closed) {
+ return;
+ }
+ closed = true;
+
+ if (shutdownExecutor) {
+ scheduler.shutdown();
+ interruptRunnerThreadsPolling();
+ scheduler.shutdownNow(); // Cancel currently executing tasks
+ try {
+ if (!scheduler.awaitTermination(30, TimeUnit.SECONDS))
+ log.error("ExecutorService did not terminate");
+ } catch (InterruptedException ie) {
+ scheduler.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ } else {
+ interruptRunnerThreadsPolling();
+ }
+ }
+
+ public void setParser(ResponseParser responseParser) {
+ client.setParser(responseParser);
+ }
+
+
+ /**
+ * @param pollQueueTime time for an open connection to wait for updates when
+ * the queue is empty.
+ */
+ public void setPollQueueTime(int pollQueueTime) {
+ this.pollQueueTime = pollQueueTime;
+ }
+
+ public void setRequestWriter(RequestWriter requestWriter) {
+ client.setRequestWriter(requestWriter);
+ }
+
+ /**
+ * Constructs {@link ConcurrentUpdateSolrClient} instances from provided configuration.
+ */
+ public static class Builder extends SolrClientBuilder<Http2SolrClient, Builder> {
+ protected String baseSolrUrl;
+ protected int queueSize = 10;
+ protected int threadCount;
+ protected ExecutorService executorService;
+ protected boolean streamDeletes;
+
+ /**
+ * Create a Builder object, based on the provided Solr URL.
+ *
+ * Two different paths can be specified as a part of this URL:
+ *
+ * 1) A path pointing directly at a particular core
+ * <pre>
+ * SolrClient client = new ConcurrentUpdateSolrClient.Builder("http://my-solr-server:8983/solr/core1").build();
+ * QueryResponse resp = client.query(new SolrQuery("*:*"));
+ * </pre>
+ * Note that when a core is provided in the base URL, queries and other requests can be made without mentioning the
+ * core explicitly. However, the client can only send requests to that core.
+ *
+ * 2) The path of the root Solr path ("/solr")
+ * <pre>
+ * SolrClient client = new ConcurrentUpdateSolrClient.Builder("http://my-solr-server:8983/solr").build();
+ * QueryResponse resp = client.query("core1", new SolrQuery("*:*"));
+ * </pre>
+ * In this case the client is more flexible and can be used to send requests to any cores. This flexibility though
+ * requires that the core be specified on all requests.
+ */
+ public Builder(String baseSolrUrl) {
+ this.baseSolrUrl = baseSolrUrl;
+ }
+
+ /**
+ * The maximum number of requests buffered by the SolrClient's internal queue before being processed by background threads.
+ *
+ * This value should be carefully paired with the number of queue-consumer threads. A queue with a maximum size
+ * set too high may require more memory. A queue with a maximum size set too low may suffer decreased throughput
+ * as {@link ConcurrentUpdateSolrClient#request(SolrRequest)} calls block waiting to add requests to the queue.
+ *
+ * If not set, this defaults to 10.
+ *
+ * @see #withThreadCount(int)
+ */
+ public Builder withQueueSize(int queueSize) {
+ if (queueSize <= 0) {
+ throw new IllegalArgumentException("queueSize must be a positive integer.");
+ }
+ this.queueSize = queueSize;
+ return this;
+ }
+
+ /**
+ * The maximum number of threads used to empty {@link ConcurrentUpdateSolrClient}s queue.
+ *
+ * Threads are created when documents are added to the client's internal queue and exit when no updates remain in
+ * the queue.
+ * <p>
+ * This value should be carefully paired with the maximum queue capacity. A client with too few threads may suffer
+ * decreased throughput as the queue fills up and {@link ConcurrentUpdateSolrClient#request(SolrRequest)} calls
+ * block waiting to add requests to the queue.
+ */
+ public Builder withThreadCount(int threadCount) {
+ if (threadCount <= 0) {
+ throw new IllegalArgumentException("threadCount must be a positive integer.");
+ }
+
+ this.threadCount = threadCount;
+ return this;
+ }
+
+ /**
+ * Provides the {@link ExecutorService} for the created client to use when servicing the update-request queue.
+ */
+ public Builder withExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ return this;
+ }
+
+ /**
+ * Configures created clients to always stream delete requests.
+ *
+ * Streamed deletes are put into the update-queue and executed like any other update request.
+ */
+ public Builder alwaysStreamDeletes() {
+ this.streamDeletes = true;
+ return this;
+ }
+
+ /**
+ * Configures created clients to not stream delete requests.
+ *
+ * With this option set when the created ConcurrentUpdateSolrClient sents a delete request it will first will lock
+ * the queue and block until all queued updates have been sent, and then send the delete request.
+ */
+ public Builder neverStreamDeletes() {
+ this.streamDeletes = false;
+ return this;
+ }
+
+ /**
+ * Create a {@link ConcurrentUpdateSolrClient} based on the provided configuration options.
+ */
+ public ConcurrentUpdateHttp2SolrClient build() {
+ if (baseSolrUrl == null) {
+ throw new IllegalArgumentException("Cannot create HttpSolrClient without a valid baseSolrUrl!");
+ }
+
+ return new ConcurrentUpdateHttp2SolrClient(this);
+ }
+
+ @Override
+ public Builder getThis() {
+ return this;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c7655028/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolr2Client.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolr2Client.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolr2Client.java
deleted file mode 100644
index 1d0d907..0000000
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolr2Client.java
+++ /dev/null
@@ -1,799 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.client.solrj.impl;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.invoke.MethodHandles;
-import java.nio.charset.StandardCharsets;
-import java.util.LinkedList;
-import java.util.Locale;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.solr.client.solrj.ResponseParser;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.request.RequestWriter;
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.client.solrj.util.ClientUtils;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.StringUtils;
-import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.params.UpdateParams;
-import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.SolrjNamedThreadFactory;
-import org.eclipse.jetty.client.api.Request;
-import org.eclipse.jetty.client.api.Response;
-import org.eclipse.jetty.client.util.InputStreamResponseListener;
-import org.eclipse.jetty.client.util.OutputStreamContentProvider;
-import org.eclipse.jetty.http.HttpMethod;
-import org.eclipse.jetty.http.HttpStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-
-public class ConcurrentUpdateSolr2Client extends SolrClient {
- private static final long serialVersionUID = 1L;
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private static final Update END_UPDATE = new Update(null, null);
- private Http2SolrClient client;
- private final String basePath;
- final BlockingQueue<Update> queue;
- final ExecutorService scheduler;
- final Queue<Runner> runners;
- volatile CountDownLatch lock = null; // used to block everything
- final int threadCount;
- boolean shutdownExecutor = false;
- int pollQueueTime = 250;
- private final boolean streamDeletes;
- private volatile Integer connectionTimeout;
- private volatile Integer soTimeout;
- private volatile boolean closed;
-
- /**
- * Uses the supplied HttpClient to send documents to the Solr server.
- *
- * @deprecated use {@link ConcurrentUpdateSolrClient#ConcurrentUpdateSolrClient(ConcurrentUpdateSolrClient.Builder)} instead, as it is a more extension/subclassing-friendly alternative
- */
- @Deprecated
- protected ConcurrentUpdateSolr2Client(String solrServerUrl,
- Http2SolrClient client, int queueSize, int threadCount,
- ExecutorService es, boolean streamDeletes) {
- this((streamDeletes) ?
- new Builder(solrServerUrl)
- .withHttpClient(client)
- .withQueueSize(queueSize)
- .withThreadCount(threadCount)
- .withExecutorService(es)
- .alwaysStreamDeletes() :
- new Builder(solrServerUrl)
- .withHttpClient(client)
- .withQueueSize(queueSize)
- .withThreadCount(threadCount)
- .withExecutorService(es)
- .neverStreamDeletes());
- }
-
- protected ConcurrentUpdateSolr2Client(Builder builder) {
- this.client = builder.httpClient;
- this.queue = new LinkedBlockingQueue<>(builder.queueSize);
- this.threadCount = builder.threadCount;
- this.runners = new LinkedList<>();
- this.streamDeletes = builder.streamDeletes;
- this.connectionTimeout = builder.connectionTimeoutMillis;
- this.soTimeout = builder.socketTimeoutMillis;
- this.basePath = builder.baseSolrUrl;
-
- if (builder.executorService != null) {
- this.scheduler = builder.executorService;
- this.shutdownExecutor = false;
- } else {
- this.scheduler = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory("concurrentUpdateScheduler"));
- this.shutdownExecutor = true;
- }
-
- }
-
- public Set<String> getQueryParams() {
- return this.client.getQueryParams();
- }
-
- /**
- * Expert Method.
- * @param queryParams set of param keys to only send via the query string
- */
- public void setQueryParams(Set<String> queryParams) {
- this.client.setQueryParams(queryParams);
- }
-
- /**
- * Opens a connection and sends everything...
- */
- class Runner implements Runnable {
-
- @Override
- public void run() {
- log.debug("starting runner: {}", this);
- // This loop is so we can continue if an element was added to the queue after the last runner exited.
- for (;;) {
- try {
-
- sendUpdateStream();
-
- } catch (Throwable e) {
- if (e instanceof OutOfMemoryError) {
- throw (OutOfMemoryError) e;
- }
- handleError(e);
- } finally {
- synchronized (runners) {
- // check to see if anything else was added to the queue
- if (runners.size() == 1 && !queue.isEmpty() && !scheduler.isShutdown()) {
- // If there is something else to process, keep last runner alive by staying in the loop.
- } else {
- runners.remove(this);
- if (runners.isEmpty()) {
- // notify anyone waiting in blockUntilFinished
- runners.notifyAll();
- }
- break;
- }
- }
- }
- }
-
- log.debug("finished: {}", this);
- }
-
- //
- // Pull from the queue multiple times and streams over a single connection.
- // Exits on exception, interruption, or an empty queue to pull from.
- //
- void sendUpdateStream() throws Exception {
-
- try {
- while (!queue.isEmpty()) {
- InputStream rspBody = null;
- try {
- Update update;
- notifyQueueAndRunnersIfEmptyQueue();
- update = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
-
- if (update == END_UPDATE)
- break;
- if (update == null)
- break;
-
- String contentType = client.requestWriter.getUpdateContentType();
- final boolean isXml = ClientUtils.TEXT_XML.equals(contentType);
-
- final ModifiableSolrParams origParams = new ModifiableSolrParams(update.getRequest().getParams());
- final String origTargetCollection = update.getCollection();
-
- // The parser 'wt=' and 'version=' params are used instead of the
- // original params
- ModifiableSolrParams requestParams = new ModifiableSolrParams(origParams);
- requestParams.set(CommonParams.WT, client.parser.getWriterType());
- requestParams.set(CommonParams.VERSION, client.parser.getVersion());
-
- String basePath = ConcurrentUpdateSolr2Client.this.basePath;
- if (update.getCollection() != null)
- basePath += "/" + update.getCollection();
- if (!basePath.endsWith("/"))
- basePath += "/";
-
- OutputStreamContentProvider provider = new OutputStreamContentProvider();
- Request postRequest = client.getHttpClient()
- .newRequest(basePath + "update"
- + requestParams.toQueryString())
- .method(HttpMethod.POST)
- .timeout(connectionTimeout, TimeUnit.MILLISECONDS)
- .timeout(soTimeout, TimeUnit.MILLISECONDS)
- .header("User-Agent", HttpSolrClient.AGENT)
- .header("Content-Type", contentType)
- .content(provider);
- InputStreamResponseListener responseListener = new InputStreamResponseListener();
- client.addNecessaryAuth(postRequest);
- postRequest.send(responseListener);
-
- try (OutputStream out = provider.getOutputStream()) {
- if (isXml) {
- out.write("<stream>".getBytes(StandardCharsets.UTF_8)); // can be anything
- }
- Update upd = update;
- while (upd != null && upd != END_UPDATE) {
- UpdateRequest req = upd.getRequest();
- SolrParams currentParams = new ModifiableSolrParams(req.getParams());
- if (!origParams.toNamedList().equals(currentParams.toNamedList()) || !StringUtils.equals(origTargetCollection, upd.getCollection())) {
- queue.add(upd); // Request has different params or destination core/collection, return to queue
- break;
- }
-
- client.requestWriter.write(req, out);
- if (isXml) {
- // check for commit or optimize
- SolrParams params = req.getParams();
- if (params != null) {
- String fmt = null;
- if (params.getBool(UpdateParams.OPTIMIZE, false)) {
- fmt = "<optimize waitSearcher=\"%s\" />";
- } else if (params.getBool(UpdateParams.COMMIT, false)) {
- fmt = "<commit waitSearcher=\"%s\" />";
- }
- if (fmt != null) {
- byte[] content = String.format(Locale.ROOT,
- fmt, params.getBool(UpdateParams.WAIT_SEARCHER, false)
- + "")
- .getBytes(StandardCharsets.UTF_8);
- out.write(content);
- }
- }
- }
- out.flush();
-
- notifyQueueAndRunnersIfEmptyQueue();
- upd = queue.poll(pollQueueTime, TimeUnit.MILLISECONDS);
- }
-
- if (isXml) {
- out.write("</stream>".getBytes(StandardCharsets.UTF_8));
- }
- }
-
- Response response = responseListener.get(soTimeout, TimeUnit.MILLISECONDS);
- rspBody = responseListener.getInputStream();
-
- int statusCode = response.getStatus();
- if (statusCode != HttpStatus.OK_200) {
- StringBuilder msg = new StringBuilder();
- msg.append(response.getReason());
- msg.append("\n\n\n\n");
- msg.append("request: ").append(postRequest.getURI());
-
- SolrException solrExc;
- NamedList<String> metadata = null;
- // parse out the metadata from the SolrException
- try {
- String encoding = "UTF-8"; // default
- NamedList<Object> resp = client.parser.processResponse(rspBody, encoding);
- NamedList<Object> error = (NamedList<Object>) resp.get("error");
- if (error != null) {
- metadata = (NamedList<String>) error.get("metadata");
- String remoteMsg = (String) error.get("msg");
- if (remoteMsg != null) {
- msg.append("\nRemote error message: ");
- msg.append(remoteMsg);
- }
- }
- } catch (Exception exc) {
- // don't want to fail to report error if parsing the response fails
- log.warn("Failed to parse error response from " + basePath + " due to: " + exc);
- } finally {
- solrExc = new HttpSolrClient.RemoteSolrException(basePath , statusCode, msg.toString(), null);
- if (metadata != null) {
- solrExc.setMetadata(metadata);
- }
- }
-
- handleError(solrExc);
- } else {
- onSuccess(response, rspBody);
- }
-
- } finally {
- try {
- if (rspBody != null) {
- while (rspBody.read() != -1) {}
- }
- } catch (Exception e) {
- log.error("Error consuming and closing http response stream.", e);
- }
- notifyQueueAndRunnersIfEmptyQueue();
- }
- }
- } catch (InterruptedException e) {
- log.error("Interrupted on polling from queue", e);
- }
-
- }
- }
-
- private void notifyQueueAndRunnersIfEmptyQueue() {
- if (queue.size() == 0) {
- synchronized (queue) {
- // queue may be empty
- queue.notifyAll();
- }
- synchronized (runners) {
- // we notify runners too - if there is a high queue poll time and this is the update
- // that emptied the queue, we make an attempt to avoid the 250ms timeout in blockUntilFinished
- runners.notifyAll();
- }
- }
- }
-
- // *must* be called with runners monitor held, e.g. synchronized(runners){ addRunner() }
- private void addRunner() {
- MDC.put("ConcurrentUpdateSolrClient.url", client.getBaseURL());
- try {
- Runner r = new Runner();
- runners.add(r);
- try {
- scheduler.execute(r); // this can throw an exception if the scheduler has been shutdown, but that should be fine.
- } catch (RuntimeException e) {
- runners.remove(r);
- throw e;
- }
- } finally {
- MDC.remove("ConcurrentUpdateSolrClient.url");
- }
- }
-
- /**
- * Class representing an UpdateRequest and an optional collection.
- */
- static class Update {
- UpdateRequest request;
- String collection;
- /**
- *
- * @param request the update request.
- * @param collection The collection, can be null.
- */
- public Update(UpdateRequest request, String collection) {
- this.request = request;
- this.collection = collection;
- }
- /**
- * @return the update request.
- */
- public UpdateRequest getRequest() {
- return request;
- }
- public void setRequest(UpdateRequest request) {
- this.request = request;
- }
- /**
- * @return the collection, can be null.
- */
- public String getCollection() {
- return collection;
- }
- public void setCollection(String collection) {
- this.collection = collection;
- }
- }
-
- @Override
- public NamedList<Object> request(final SolrRequest request, String collection)
- throws SolrServerException, IOException {
- if (!(request instanceof UpdateRequest)) {
- return client.request(request, collection);
- }
- UpdateRequest req = (UpdateRequest) request;
- log.info("Client send req:{} to:{}", request, basePath);
- req.setBasePath(basePath);
- // this happens for commit...
- if (streamDeletes) {
- if ((req.getDocuments() == null || req.getDocuments().isEmpty())
- && (req.getDeleteById() == null || req.getDeleteById().isEmpty())
- && (req.getDeleteByIdMap() == null || req.getDeleteByIdMap().isEmpty())) {
- if (req.getDeleteQuery() == null) {
- blockUntilFinished();
- return client.request(request, collection);
- }
- }
- } else {
- if ((req.getDocuments() == null || req.getDocuments().isEmpty())) {
- blockUntilFinished();
- return client.request(request, collection);
- }
- }
-
-
- SolrParams params = req.getParams();
- if (params != null) {
- // check if it is waiting for the searcher
- if (params.getBool(UpdateParams.WAIT_SEARCHER, false)) {
- log.info("blocking for commit/optimize");
- blockUntilFinished(); // empty the queue
- return client.request(request, collection);
- }
- }
-
- try {
- CountDownLatch tmpLock = lock;
- if (tmpLock != null) {
- tmpLock.await();
- }
-
- Update update = new Update(req, collection);
- boolean success = queue.offer(update);
-
- for (;;) {
- synchronized (runners) {
- // see if queue is half full and we can add more runners
- // special case: if only using a threadCount of 1 and the queue
- // is filling up, allow 1 add'l runner to help process the queue
- if (runners.isEmpty() || (queue.remainingCapacity() < queue.size() && runners.size() < threadCount))
- {
- // We need more runners, so start a new one.
- addRunner();
- } else {
- // break out of the retry loop if we added the element to the queue
- // successfully, *and*
- // while we are still holding the runners lock to prevent race
- // conditions.
- if (success)
- break;
- }
- }
-
- // Retry to add to the queue w/o the runners lock held (else we risk
- // temporary deadlock)
- // This retry could also fail because
- // 1) existing runners were not able to take off any new elements in the
- // queue
- // 2) the queue was filled back up since our last try
- // If we succeed, the queue may have been completely emptied, and all
- // runners stopped.
- // In all cases, we should loop back to the top to see if we need to
- // start more runners.
- //
- if (!success) {
- success = queue.offer(update, 100, TimeUnit.MILLISECONDS);
- }
- }
- } catch (InterruptedException e) {
- log.error("interrupted", e);
- throw new IOException(e.getLocalizedMessage());
- }
-
- // RETURN A DUMMY result
- NamedList<Object> dummy = new NamedList<>();
- dummy.add("NOTE", "the request is processed in a background stream");
- return dummy;
- }
-
- public synchronized void blockUntilFinished() {
- lock = new CountDownLatch(1);
- try {
-
- waitForEmptyQueue();
- interruptRunnerThreadsPolling();
-
- synchronized (runners) {
-
- // NOTE: if the executor is shut down, runners may never become empty (a scheduled task may never be run,
- // which means it would never remove itself from the runners list. This is why we don't wait forever
- // and periodically check if the scheduler is shutting down.
- int loopCount = 0;
- while (!runners.isEmpty()) {
-
- if (scheduler.isShutdown())
- break;
-
- loopCount++;
-
- // Need to check if the queue is empty before really considering this is finished (SOLR-4260)
- int queueSize = queue.size();
- for (Update upd: queue) {
- if (upd == END_UPDATE) {
- queueSize--;
- }
- }
- if (queueSize > 0 && runners.isEmpty()) {
- // TODO: can this still happen?
- log.warn("No more runners, but queue still has " +
- queueSize + " adding more runners to process remaining requests on queue");
- addRunner();
- }
-
- interruptRunnerThreadsPolling();
-
- // try to avoid the worst case wait timeout
- // without bad spin
- int timeout;
- if (loopCount < 3) {
- timeout = 10;
- } else if (loopCount < 10) {
- timeout = 25;
- } else {
- timeout = 250;
- }
-
- try {
- runners.wait(timeout);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
- } finally {
- lock.countDown();
- lock = null;
- }
- }
-
- private void waitForEmptyQueue() {
- boolean threadInterrupted = Thread.currentThread().isInterrupted();
-
- while (!queue.isEmpty()) {
- if (scheduler.isTerminated()) {
- log.warn("The task queue still has elements but the update scheduler {} is terminated. Can't process any more tasks. "
- + "Queue size: {}, Runners: {}. Current thread Interrupted? {}", scheduler, queue.size(), runners.size(), threadInterrupted);
- break;
- }
-
- synchronized (runners) {
- int queueSize = queue.size();
- if (queueSize > 0 && runners.isEmpty()) {
- log.warn("No more runners, but queue still has " +
- queueSize + " adding more runners to process remaining requests on queue");
- addRunner();
- }
- }
- synchronized (queue) {
- try {
- queue.wait(250);
- } catch (InterruptedException e) {
- // If we set the thread as interrupted again, the next time the wait it's called i t's going to return immediately
- threadInterrupted = true;
- log.warn("Thread interrupted while waiting for update queue to be empty. There are still {} elements in the queue.",
- queue.size());
- }
- }
- }
- if (threadInterrupted) {
- Thread.currentThread().interrupt();
- }
- }
-
- public void handleError(Throwable ex) {
- log.error("error", ex);
- }
-
- /**
- * Intended to be used as an extension point for doing post processing after a request completes.
- */
- public void onSuccess(Response resp, InputStream respBody) {
- // no-op by design, override to add functionality
- }
-
- @Override
- public synchronized void close() {
- if (closed) {
- interruptRunnerThreadsPolling();
- return;
- }
- closed = true;
-
- if (shutdownExecutor) {
- scheduler.shutdown();
- interruptRunnerThreadsPolling();
- try {
- if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
- scheduler.shutdownNow();
- if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) log
- .error("ExecutorService did not terminate");
- }
- } catch (InterruptedException ie) {
- scheduler.shutdownNow();
- Thread.currentThread().interrupt();
- }
- } else {
- interruptRunnerThreadsPolling();
- }
- }
-
- private void interruptRunnerThreadsPolling() {
- synchronized (runners) {
- for (Runner ignored : runners) {
- queue.add(END_UPDATE);
- }
- }
- }
-
- /**
- * @deprecated since 7.0 Use {@link ConcurrentUpdateSolrClient.Builder} methods instead.
- */
- @Deprecated
- public void setConnectionTimeout(int timeout) {
- this.connectionTimeout = timeout;
- }
-
- /**
- * set soTimeout (read timeout) on the underlying HttpConnectionManager. This is desirable for queries, but probably
- * not for indexing.
- *
- * @deprecated since 7.0 Use {@link ConcurrentUpdateSolrClient.Builder} methods instead.
- */
- @Deprecated
- public void setSoTimeout(int timeout) {
- this.soTimeout = timeout;
- }
-
- public void shutdownNow() {
- if (closed) {
- return;
- }
- closed = true;
-
- if (shutdownExecutor) {
- scheduler.shutdown();
- interruptRunnerThreadsPolling();
- scheduler.shutdownNow(); // Cancel currently executing tasks
- try {
- if (!scheduler.awaitTermination(30, TimeUnit.SECONDS))
- log.error("ExecutorService did not terminate");
- } catch (InterruptedException ie) {
- scheduler.shutdownNow();
- Thread.currentThread().interrupt();
- }
- } else {
- interruptRunnerThreadsPolling();
- }
- }
-
- public void setParser(ResponseParser responseParser) {
- client.setParser(responseParser);
- }
-
-
- /**
- * @param pollQueueTime time for an open connection to wait for updates when
- * the queue is empty.
- */
- public void setPollQueueTime(int pollQueueTime) {
- this.pollQueueTime = pollQueueTime;
- }
-
- public void setRequestWriter(RequestWriter requestWriter) {
- client.setRequestWriter(requestWriter);
- }
-
- /**
- * Constructs {@link ConcurrentUpdateSolrClient} instances from provided configuration.
- */
- public static class Builder extends SolrClientBuilder<Http2SolrClient, Builder> {
- protected String baseSolrUrl;
- protected int queueSize = 10;
- protected int threadCount;
- protected ExecutorService executorService;
- protected boolean streamDeletes;
-
- /**
- * Create a Builder object, based on the provided Solr URL.
- *
- * Two different paths can be specified as a part of this URL:
- *
- * 1) A path pointing directly at a particular core
- * <pre>
- * SolrClient client = new ConcurrentUpdateSolrClient.Builder("http://my-solr-server:8983/solr/core1").build();
- * QueryResponse resp = client.query(new SolrQuery("*:*"));
- * </pre>
- * Note that when a core is provided in the base URL, queries and other requests can be made without mentioning the
- * core explicitly. However, the client can only send requests to that core.
- *
- * 2) The path of the root Solr path ("/solr")
- * <pre>
- * SolrClient client = new ConcurrentUpdateSolrClient.Builder("http://my-solr-server:8983/solr").build();
- * QueryResponse resp = client.query("core1", new SolrQuery("*:*"));
- * </pre>
- * In this case the client is more flexible and can be used to send requests to any cores. This flexibility though
- * requires that the core be specified on all requests.
- */
- public Builder(String baseSolrUrl) {
- this.baseSolrUrl = baseSolrUrl;
- }
-
- /**
- * The maximum number of requests buffered by the SolrClient's internal queue before being processed by background threads.
- *
- * This value should be carefully paired with the number of queue-consumer threads. A queue with a maximum size
- * set too high may require more memory. A queue with a maximum size set too low may suffer decreased throughput
- * as {@link ConcurrentUpdateSolrClient#request(SolrRequest)} calls block waiting to add requests to the queue.
- *
- * If not set, this defaults to 10.
- *
- * @see #withThreadCount(int)
- */
- public Builder withQueueSize(int queueSize) {
- if (queueSize <= 0) {
- throw new IllegalArgumentException("queueSize must be a positive integer.");
- }
- this.queueSize = queueSize;
- return this;
- }
-
- /**
- * The maximum number of threads used to empty {@link ConcurrentUpdateSolrClient}s queue.
- *
- * Threads are created when documents are added to the client's internal queue and exit when no updates remain in
- * the queue.
- * <p>
- * This value should be carefully paired with the maximum queue capacity. A client with too few threads may suffer
- * decreased throughput as the queue fills up and {@link ConcurrentUpdateSolrClient#request(SolrRequest)} calls
- * block waiting to add requests to the queue.
- */
- public Builder withThreadCount(int threadCount) {
- if (threadCount <= 0) {
- throw new IllegalArgumentException("threadCount must be a positive integer.");
- }
-
- this.threadCount = threadCount;
- return this;
- }
-
- /**
- * Provides the {@link ExecutorService} for the created client to use when servicing the update-request queue.
- */
- public Builder withExecutorService(ExecutorService executorService) {
- this.executorService = executorService;
- return this;
- }
-
- /**
- * Configures created clients to always stream delete requests.
- *
- * Streamed deletes are put into the update-queue and executed like any other update request.
- */
- public Builder alwaysStreamDeletes() {
- this.streamDeletes = true;
- return this;
- }
-
- /**
- * Configures created clients to not stream delete requests.
- *
- * With this option set when the created ConcurrentUpdateSolrClient sents a delete request it will first will lock
- * the queue and block until all queued updates have been sent, and then send the delete request.
- */
- public Builder neverStreamDeletes() {
- this.streamDeletes = false;
- return this;
- }
-
- /**
- * Create a {@link ConcurrentUpdateSolrClient} based on the provided configuration options.
- */
- public ConcurrentUpdateSolr2Client build() {
- if (baseSolrUrl == null) {
- throw new IllegalArgumentException("Cannot create HttpSolrClient without a valid baseSolrUrl!");
- }
-
- return new ConcurrentUpdateSolr2Client(this);
- }
-
- @Override
- public Builder getThis() {
- return this;
- }
- }
-}