You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/08/11 16:08:49 UTC
[lucene-solr] branch reference_impl_dev updated: @508 Dist update
work.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl_dev by this push:
new 91aba24 @508 Dist update work.
91aba24 is described below
commit 91aba2446964653eabdba77112f02ec51168898f
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Tue Aug 11 11:06:54 2020 -0500
@508 Dist update work.
---
.../apache/solr/handler/RequestHandlerBase.java | 2 +-
.../component/SolrExecutorCompletionService.java | 2 +-
.../org/apache/solr/servlet/SolrQoSFilter.java | 2 +-
.../org/apache/solr/update/SolrCmdDistributor.java | 2 +-
.../processor/DistributedUpdateProcessor.java | 18 ++++++++------
.../processor/DistributedZkUpdateProcessor.java | 29 +++++++++++++++++-----
.../solr/cloud/FullSolrCloudDistribCmdsTest.java | 10 +++++---
.../solr/client/solrj/impl/Http2SolrClient.java | 1 -
.../solr/client/solrj/impl/HttpSolrClient.java | 5 ++--
.../org/apache/solr/common/ParWorkExecService.java | 7 +++---
.../org/apache/solr/common/ParWorkExecutor.java | 2 +-
.../org/apache/solr/common/cloud/SolrZkClient.java | 15 -----------
.../apache/solr/common/cloud/ZkStateReader.java | 6 ++---
13 files changed, 54 insertions(+), 47 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java b/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java
index 8ec3de3..f892677 100644
--- a/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java
+++ b/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java
@@ -233,7 +233,7 @@ public abstract class RequestHandlerBase implements SolrRequestHandler, SolrInfo
if (isTragic) {
if (e instanceof SolrException) {
// Tragic exceptions should always throw a server error
- assert ((SolrException) e).code() == 500;
+ //assert ((SolrException) e).code() == 500;
} else {
// wrap it in a solr exception
e = new SolrException(SolrException.ErrorCode.SERVER_ERROR, e.getMessage(), e);
diff --git a/solr/core/src/java/org/apache/solr/handler/component/SolrExecutorCompletionService.java b/solr/core/src/java/org/apache/solr/handler/component/SolrExecutorCompletionService.java
index a4c6998..fef2b9e 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/SolrExecutorCompletionService.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/SolrExecutorCompletionService.java
@@ -61,7 +61,7 @@ public class SolrExecutorCompletionService<V> implements CompletionService<V> {
throw new NullPointerException();
} else {
RunnableFuture<V> f = this.newTaskFor(task, result);
- this.executor.execute(new SolrExecutorCompletionService.QueueingFuture(f, this.completionQueue), true);
+ this.executor.doSubmit(new SolrExecutorCompletionService.QueueingFuture(f, this.completionQueue), true);
return f;
}
}
diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java b/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java
index 9035239..8a1c2b4 100644
--- a/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java
+++ b/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java
@@ -60,7 +60,7 @@ public class SolrQoSFilter extends QoSFilter {
throws IOException, ServletException {
HttpServletRequest req = (HttpServletRequest) request;
String source = req.getHeader(QoSParams.REQUEST_SOURCE);
- if (source == null || !source.equals(QoSParams.INTERNAL)) {
+ if (!req.getPathInfo().startsWith("/img/") && (source == null || !source.equals(QoSParams.INTERNAL))) {
// TODO - we don't need to call this *every* request
double ourLoad = sysStats.getAvarageUsagePerCPU();
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index f7da47b..78b1f20 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -192,7 +192,7 @@ public class SolrCmdDistributor implements Closeable {
Set<CountDownLatch> latches = new HashSet<>(nodes.size());
// we need to do any retries before commit...
- // blockAndDoRetries();
+ blockAndDoRetries();
if (log.isDebugEnabled()) log.debug("Distrib commit to: {} params: {}", nodes, params);
for (Node node : nodes) {
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index b5991dc..38d3b04 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -242,9 +242,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
SolrInputDocument clonedDoc = shouldCloneCmdDoc() ? cmd.solrDoc.deepCopy(): null;
-
+ AddUpdateCommand cloneCmd = null;
if (clonedDoc != null) {
- cmd.solrDoc = clonedDoc;
+ cloneCmd = (AddUpdateCommand) cmd.clone();
+ cloneCmd.solrDoc = clonedDoc;
}
try (ParWork worker = new ParWork(this)) {
if (!forwardToLeader) {
@@ -253,7 +254,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
try {
// TODO: possibly set checkDeleteByQueries as a flag on the command?
- log.info("Local add cmd");
+ if (log.isDebugEnabled()) log.debug("Local add cmd");
doLocalAdd(cmd);
// if the update updates a doc that is part of a nested structure,
@@ -274,15 +275,16 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
});
}
boolean zkAware = req.getCore().getCoreContainer().isZooKeeperAware();
- log.info("Is zk aware {}", zkAware);
+ if (log.isDebugEnabled()) log.debug("Is zk aware {}", zkAware);
if (zkAware && hasNodes()) {
- log.info("Collect distrib add");
+ if (log.isDebugEnabled()) log.debug("Collect distrib add");
+ AddUpdateCommand finalCloneCmd = cloneCmd == null ? cmd : cloneCmd;
worker.collect(() -> {
- log.info("Run distrib add collection");
+ if (log.isDebugEnabled()) log.debug("Run distrib add collection");
try {
- doDistribAdd(cmd);
- log.info("after distrib add collection");
+ doDistribAdd(finalCloneCmd);
+ if (log.isDebugEnabled()) log.debug("after distrib add collection");
} catch (Throwable e) {
ParWork.propegateInterrupt(e);
throw new SolrException(ErrorCode.SERVER_ERROR, e);
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index 969659b..ff7d2c0 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -177,15 +177,13 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
Replica leaderReplica = null;
zkCheck();
- nodes = getCollectionUrls(collection, EnumSet.of(Replica.Type.TLOG,Replica.Type.NRT), true);
-
if (req.getParams().get(COMMIT_END_POINT, "").equals("terminal")) {
log.info("Do a local commit on single replica directly");
doLocalCommit(cmd);
return;
}
-
+ nodes = getCollectionUrls(collection, EnumSet.of(Replica.Type.TLOG,Replica.Type.NRT), true);
if (nodes != null) {
nodes.removeIf((node) -> node.getNodeProps().getCoreNodeName().equals(
@@ -230,8 +228,28 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
log.info("send commit to leaders nodes={}", useNodes);
params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
+ try (ParWork worker = new ParWork(this, false, true)) {
+ List<SolrCmdDistributor.Node> finalUseNodes1 = useNodes;
+ worker.collect(() -> {
+ cmdDistrib.distribCommit(cmd, finalUseNodes1, params);
+ });
- cmdDistrib.distribCommit(cmd, useNodes, params);
+ if (isLeader) {
+
+ log.info("Do a local commit on NRT endpoint for leader");
+ worker.collect(() -> {
+ try {
+ doLocalCommit(cmd);
+ } catch (IOException e) {
+ log.error("Error on local commit");
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ }
+ });
+
+ }
+ worker.addCollect("commitToLeaders");
+ }
+ return;
}
}
if (isLeader) {
@@ -241,7 +259,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
log.info("Do a local commit on NRT endpoint for leader");
try {
doLocalCommit(cmd);
- } catch (Exception e) {
+ } catch (IOException e) {
log.error("Error on local commit");
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
@@ -302,7 +320,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
@Override
protected void doDistribAdd(AddUpdateCommand cmd) throws IOException {
- log.info("in zk dist add");
log.info("Distribute add cmd {} to {} {}", cmd, nodes, isLeader);
if (isLeader && !isSubShardLeader) {
DocCollection coll = clusterState.getCollection(collection);
diff --git a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
index 8638ef1..550f943 100644
--- a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
@@ -63,6 +63,7 @@ import org.slf4j.LoggerFactory;
* Super basic testing, no shard restarting or anything.
*/
@Slow
+@Ignore
public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final AtomicInteger NAME_COUNTER = new AtomicInteger(1);
@@ -114,7 +115,7 @@ public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
public static String createAndSetNewDefaultCollection() throws Exception {
final CloudHttp2SolrClient cloudClient = cluster.getSolrClient();
final String name = "test_collection_" + NAME_COUNTER.getAndIncrement();
- CollectionAdminRequest.createCollection(name, "_default", 2, 2).setMaxShardsPerNode(10)
+ CollectionAdminRequest.createCollection(name, "_default", 3, 4).setMaxShardsPerNode(10)
.process(cloudClient);
cloudClient.setDefaultCollection(name);
return name;
@@ -465,7 +466,7 @@ public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
final CloudHttp2SolrClient cloudClient = cluster.getSolrClient();
final String collectionName = createAndSetNewDefaultCollection();
- final int numDocs = TEST_NIGHTLY ? atLeast(150) : 55;
+ final int numDocs = 5000;//TEST_NIGHTLY ? atLeast(500) : 5000;
final JettySolrRunner nodeToUpdate = cluster.getRandomJetty(random());
try (ConcurrentUpdateSolrClient indexClient
= getConcurrentUpdateSolrClient(nodeToUpdate.getBaseUrl() + "/" + collectionName, 10, 2)) {
@@ -475,10 +476,13 @@ public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
indexClient.add(sdoc("id", i, "text_t",
TestUtil.randomRealisticUnicodeString(random(), 200)));
}
- indexClient.blockUntilFinished();
+ // indexClient.blockUntilFinished();
assertEquals(0, indexClient.commit().getStatus());
indexClient.blockUntilFinished();
}
+
+ cluster.waitForActiveCollection(collectionName, 3, 12);
+
assertEquals(numDocs, cloudClient.query(params("q","*:*")).getResults().getNumFound());
checkShardConsistency(params("q","*:*", "rows", ""+(1 + numDocs),"_trace","addAll"));
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 02044e5..cc61151 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -260,7 +260,6 @@ public class Http2SolrClient extends SolrClient {
}
});
}
- closer.collect(httpClientExecutor);
closer.collect(() -> {
try {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
index 4273109..f146b58 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
@@ -50,6 +50,7 @@ import org.apache.solr.client.solrj.V2RequestSupport;
import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.common.ParWork;
+import org.apache.solr.common.ParWorkExecService;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
@@ -314,13 +315,11 @@ public class HttpSolrClient extends BaseHttpSolrClient {
public HttpUriRequestResponse httpUriRequest(final SolrRequest request, final ResponseParser processor) throws SolrServerException, IOException {
HttpUriRequestResponse mrr = new HttpUriRequestResponse();
final HttpRequestBase method = createMethod(request, null);
- ExecutorService pool = ExecutorUtil.newMDCAwareFixedThreadPool(1, new SolrNamedThreadFactory("httpUriRequest"));
try {
MDC.put("HttpSolrClient.url", baseUrl);
- mrr.future = pool.submit(() -> executeMethod(method, request.getUserPrincipal(), processor, isV2ApiRequest(request)));
+ mrr.future = ((ParWorkExecService) ParWork.getExecutor()).doSubmit(() -> executeMethod(method, request.getUserPrincipal(), processor, isV2ApiRequest(request)), true);
} finally {
- pool.shutdown();
MDC.remove("HttpSolrClient.url");
}
assert method != null;
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
index 1baf365..5902aad 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
@@ -225,7 +225,7 @@ public class ParWorkExecService implements ExecutorService {
throws InterruptedException, ExecutionException, TimeoutException {
Object ret;
try {
- ret = future.get();
+ ret = future.get(l, timeUnit);
} finally {
available.release();
}
@@ -326,11 +326,12 @@ public class ParWorkExecService implements ExecutorService {
}
double ourLoad = ParWork.getSysStats().getAvarageUsagePerCPU();
- if (ourLoad > 1) {
+
+ if (ourLoad > 99.0D) {
return false;
} else {
double sLoad = load / (double) ParWork.PROC_COUNT;
- if (sLoad > 1.0D) {
+ if (sLoad > ParWork.PROC_COUNT) {
return false;
}
if (log.isDebugEnabled()) log.debug("ParWork, load:" + sLoad);
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
index 311b1b7c..9c3adb5 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
@@ -50,7 +50,7 @@ public class ParWorkExecutor extends ExecutorUtil.MDCAwareThreadPoolExecutor {
Thread t = new Thread(group, r, name + threadNumber.getAndIncrement(), 0) {
public void run() {
try {
- super.run();
+ r.run();
} finally {
ParWork.closeExecutor();
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 10b3234..141affb 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -171,26 +171,11 @@ public class SolrZkClient implements Closeable {
connManager.start();
connManager.waitForConnected(this.zkClientConnectTimeout);
} catch (TimeoutException e) {
- try (ParWork worker = new ParWork(this, true)) {
- worker.add("zkCallbackExecutor", zkCallbackExecutor);
- worker.add("connectionManager", connManager);
- worker.add("zkCallbackExecutor", zkConnManagerCallbackExecutor);
- }
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
} catch (InterruptedException e) {
ParWork.propegateInterrupt(e);
- try (ParWork worker = new ParWork(this, true)) {
- worker.add("zkCallbackExecutor", zkCallbackExecutor);
- worker.add("connectionManager", connManager);
- worker.add("zkCallbackExecutor", zkConnManagerCallbackExecutor);
- }
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
} catch (IOException e) {
- try (ParWork worker = new ParWork(this, true)) {
- worker.add("zkCallbackExecutor", zkCallbackExecutor);
- worker.add("connectionManager", connManager);
- worker.add("zkCallbackExecutor", zkConnManagerCallbackExecutor);
- }
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
}
return this;
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 6265a59..bdcd0686 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -762,8 +762,8 @@ public class ZkStateReader implements SolrCloseable {
private class LazyCollectionRef extends ClusterState.CollectionRef {
private final String collName;
- private long lastUpdateTime;
- private DocCollection cachedDocCollection;
+ private volatile long lastUpdateTime;
+ private volatile DocCollection cachedDocCollection;
public LazyCollectionRef(String collName) {
super(null);
@@ -772,7 +772,7 @@ public class ZkStateReader implements SolrCloseable {
}
@Override
- public synchronized DocCollection get(boolean allowCached) {
+ public DocCollection get(boolean allowCached) {
gets.incrementAndGet();
if (!allowCached || lastUpdateTime < 0 || System.nanoTime() - lastUpdateTime > LAZY_CACHE_TIME) {
boolean shouldFetch = true;