You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mk...@apache.org on 2021/11/15 11:40:49 UTC
[lucene-solr] branch branch_8x updated: SOLR-15635: avoid redundant closeHooks invocation by MDCThreadPool (#2609)
This is an automated email from the ASF dual-hosted git repository.
mkhl pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/branch_8x by this push:
new 66e2c28 SOLR-15635: avoid redundant closeHooks invocation by MDCThreadPool (#2609)
66e2c28 is described below
commit 66e2c28e57b1584382ddc183596534357326a0c9
Author: Mikhail Khludnev <mk...@users.noreply.github.com>
AuthorDate: Mon Nov 15 14:40:32 2021 +0300
SOLR-15635: avoid redundant closeHooks invocation by MDCThreadPool (#2609)
---
solr/CHANGES.txt | 3 +
.../org/apache/solr/request/SolrRequestInfo.java | 70 +++++++++++++------
.../test/org/apache/solr/TestCrossCoreJoin.java | 27 +++++---
.../apache/solr/request/TestSolrRequestInfo.java | 80 ++++++++++++++++++++++
4 files changed, 149 insertions(+), 31 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index dcea00e..6ef0705 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -46,6 +46,9 @@ Bug Fixes
* SOLR-15696: Incremental backups no longer fail on collections where a 'SPLITSHARD' operation previously occurred. (Jason Gerlowski)
+* SOLR-15635: Don't close hooks twice when SolrRequestInfo is cleared twice; or /export with classic join
+ closed fromCore if provided (Mikhail Khludnev, David Smiley)
+
* SOLR-15628: The SolrException.log() helper method has been fixed to correctly passes the Throwable to the Logger w/o stringification (hossman)
* SOLR-15722: Delete Replica does not delete the Per replica state (noble)
diff --git a/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java b/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
index a83f0b8..ecb1081 100644
--- a/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
+++ b/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
@@ -40,19 +40,21 @@ import org.slf4j.LoggerFactory;
/** Information about the Solr request/response held in a {@link ThreadLocal}. */
public class SolrRequestInfo {
- protected static final int MAX_STACK_SIZE = 10;
+ private static final int MAX_STACK_SIZE = 10;
- protected static final ThreadLocal<Deque<SolrRequestInfo>> threadLocal = ThreadLocal.withInitial(LinkedList::new);
+ private static final ThreadLocal<Deque<SolrRequestInfo>> threadLocal = ThreadLocal.withInitial(LinkedList::new);
- protected SolrQueryRequest req;
- protected SolrQueryResponse rsp;
- protected Date now;
+ private int refCount = 1; // prevent closing when still used
+
+ private SolrQueryRequest req;
+ private SolrQueryResponse rsp;
+ private Date now;
protected HttpServletRequest httpRequest;
- protected TimeZone tz;
- protected ResponseBuilder rb;
- protected List<Closeable> closeHooks;
- protected SolrDispatchFilter.Action action;
- protected boolean useServerToken = false;
+ private TimeZone tz;
+ private ResponseBuilder rb;
+ private List<Closeable> closeHooks;
+ private SolrDispatchFilter.Action action;
+ private boolean useServerToken = false;
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -70,23 +72,25 @@ public class SolrRequestInfo {
Deque<SolrRequestInfo> stack = threadLocal.get();
if (info == null) {
throw new IllegalArgumentException("SolrRequestInfo is null");
- } else if (stack.size() <= MAX_STACK_SIZE) {
- stack.push(info);
- } else {
+ } else if (stack.size() > MAX_STACK_SIZE) {
assert false : "SolrRequestInfo Stack is full";
log.error("SolrRequestInfo Stack is full");
}
+ log.trace("{} {}", info, "setRequestInfo()");
+ assert !info.isClosed() : "SRI is already closed (odd).";
+ stack.push(info);
}
- /** Removes the most recent SolrRequestInfo from the stack */
+ /** Removes the most recent SolrRequestInfo from the stack. Close hooks are called. */
public static void clearRequestInfo() {
+ log.trace("clearRequestInfo()");
Deque<SolrRequestInfo> stack = threadLocal.get();
if (stack.isEmpty()) {
assert false : "clearRequestInfo called too many times";
log.error("clearRequestInfo called too many times");
} else {
SolrRequestInfo info = stack.pop();
- closeHooks(info);
+ info.close();
}
}
@@ -95,18 +99,25 @@ public class SolrRequestInfo {
* we expect it to be empty by now because all "set" calls need to be balanced with a "clear".
*/
public static void reset() {
+ log.trace("reset()");
Deque<SolrRequestInfo> stack = threadLocal.get();
- boolean isEmpty = stack.isEmpty();
+ assert stack.isEmpty() : "SolrRequestInfo Stack should have been cleared.";
while (!stack.isEmpty()) {
SolrRequestInfo info = stack.pop();
- closeHooks(info);
+ info.close();
}
- assert isEmpty : "SolrRequestInfo Stack should have been cleared.";
}
- private static void closeHooks(SolrRequestInfo info) {
- if (info.closeHooks != null) {
- for (Closeable hook : info.closeHooks) {
+ private synchronized void close() {
+ log.trace("{} {}", this, "close()");
+
+ if (--refCount > 0) {
+ log.trace("{} {}", this, "not closing; still referenced");
+ return;
+ }
+
+ if (closeHooks != null) {
+ for (Closeable hook : closeHooks) {
try {
hook.close();
} catch (Exception e) {
@@ -114,6 +125,7 @@ public class SolrRequestInfo {
}
}
}
+ closeHooks = null;
}
public SolrRequestInfo(SolrQueryRequest req, SolrQueryResponse rsp) {
@@ -186,6 +198,9 @@ public class SolrRequestInfo {
public void addCloseHook(Closeable hook) {
// is this better here, or on SolrQueryRequest?
synchronized (this) {
+ if (isClosed()) {
+ throw new IllegalStateException("Already closed!");
+ }
if (closeHooks == null) {
closeHooks = new LinkedList<>();
}
@@ -213,13 +228,24 @@ public class SolrRequestInfo {
this.useServerToken = use;
}
+ private synchronized boolean isClosed() {
+ return refCount <= 0;
+ }
+
public static ExecutorUtil.InheritableThreadLocalProvider getInheritableThreadLocalProvider() {
return new ExecutorUtil.InheritableThreadLocalProvider() {
@Override
@SuppressWarnings({"unchecked"})
public void store(@SuppressWarnings({"rawtypes"})AtomicReference ctx) {
SolrRequestInfo me = SolrRequestInfo.getRequestInfo();
- if (me != null) ctx.set(me);
+ if (me != null) {
+ // increase refCount in store(), while we're still in the thread of the provider to avoid
+ // a race if this thread finishes its work before the pool'ed thread runs
+ synchronized (me) {
+ me.refCount++;
+ }
+ ctx.set(me);
+ }
}
@Override
diff --git a/solr/core/src/test/org/apache/solr/TestCrossCoreJoin.java b/solr/core/src/test/org/apache/solr/TestCrossCoreJoin.java
index 77f4afa..d21b5db 100644
--- a/solr/core/src/test/org/apache/solr/TestCrossCoreJoin.java
+++ b/solr/core/src/test/org/apache/solr/TestCrossCoreJoin.java
@@ -53,17 +53,17 @@ public class TestCrossCoreJoin extends SolrTestCaseJ4 {
fromCore = coreContainer.create("fromCore", ImmutableMap.of("configSet", "minimal"));
- assertU(add(doc("id", "1", "name", "john", "title", "Director", "dept_s", "Engineering")));
- assertU(add(doc("id", "2", "name", "mark", "title", "VP", "dept_s", "Marketing")));
- assertU(add(doc("id", "3", "name", "nancy", "title", "MTS", "dept_s", "Sales")));
- assertU(add(doc("id", "4", "name", "dave", "title", "MTS", "dept_s", "Support", "dept_s", "Engineering")));
- assertU(add(doc("id", "5", "name", "tina", "title", "VP", "dept_s", "Engineering")));
+ assertU(add(doc("id", "1", "id_s_dv", "1", "name", "john", "title", "Director", "dept_s", "Engineering")));
+ assertU(add(doc("id", "2", "id_s_dv", "2", "name", "mark", "title", "VP", "dept_s", "Marketing")));
+ assertU(add(doc("id", "3", "id_s_dv", "3", "name", "nancy", "title", "MTS", "dept_s", "Sales")));
+ assertU(add(doc("id", "4", "id_s_dv", "4", "name", "dave", "title", "MTS", "dept_s", "Support", "dept_s", "Engineering")));
+ assertU(add(doc("id", "5", "id_s_dv", "5", "name", "tina", "title", "VP", "dept_s", "Engineering")));
assertU(commit());
- update(fromCore, add(doc("id", "10", "dept_id_s", "Engineering", "text", "These guys develop stuff", "cat", "dev")));
- update(fromCore, add(doc("id", "11", "dept_id_s", "Marketing", "text", "These guys make you look good")));
- update(fromCore, add(doc("id", "12", "dept_id_s", "Sales", "text", "These guys sell stuff")));
- update(fromCore, add(doc("id", "13", "dept_id_s", "Support", "text", "These guys help customers")));
+ update(fromCore, add(doc("id", "10", "id_s_dv", "10", "dept_id_s", "Engineering", "text", "These guys develop stuff", "cat", "dev")));
+ update(fromCore, add(doc("id", "11", "id_s_dv", "11", "dept_id_s", "Marketing", "text", "These guys make you look good")));
+ update(fromCore, add(doc("id", "12", "id_s_dv", "12", "dept_id_s", "Sales", "text", "These guys sell stuff")));
+ update(fromCore, add(doc("id", "13", "id_s_dv", "13", "dept_id_s", "Support", "text", "These guys help customers")));
update(fromCore, commit());
}
@@ -91,6 +91,15 @@ public class TestCrossCoreJoin extends SolrTestCaseJ4 {
, "/response=={'numFound':3,'start':0,'numFoundExact':true,'docs':[{'id':'1'},{'id':'4'},{'id':'5'}]}"
);
+ assertJQ(req( "qt", "/export",
+ "q", joinPrefix + " from=dept_id_s to=dept_s fromIndex=fromCore}cat:dev", "fl", "id_s_dv",
+ "sort", "id_s_dv asc",
+ "debugQuery", random().nextBoolean() ? "true":"false")
+ , "/response=={'numFound':3,'docs':[{'id_s_dv':'1'},{'id_s_dv':'4'},{'id_s_dv':'5'}]}"
+ );
+ assertFalse(fromCore.isClosed());
+ assertFalse(h.getCore().isClosed());
+
// find people that develop stuff - but limit via filter query to a name of "john"
// this tests filters being pushed down to queries (SOLR-3062)
assertJQ(req("q", joinPrefix + " from=dept_id_s to=dept_s fromIndex=fromCore}cat:dev", "fl", "id", "fq", "name:john",
diff --git a/solr/core/src/test/org/apache/solr/request/TestSolrRequestInfo.java b/solr/core/src/test/org/apache/solr/request/TestSolrRequestInfo.java
new file mode 100644
index 0000000..0b1573b
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/request/TestSolrRequestInfo.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.request;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.response.SolrQueryResponse;
+import org.junit.BeforeClass;
+
+public class TestSolrRequestInfo extends SolrTestCaseJ4 {
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ initCore("solrconfig.xml","schema11.xml");
+ }
+
+ public void testCloseHookTwice(){
+ final SolrRequestInfo info = new SolrRequestInfo(
+ new LocalSolrQueryRequest(h.getCore(), params()),
+ new SolrQueryResponse());
+ AtomicInteger counter = new AtomicInteger();
+ info.addCloseHook(counter::incrementAndGet);
+ SolrRequestInfo.setRequestInfo(info);
+ SolrRequestInfo.setRequestInfo(info);
+ SolrRequestInfo.clearRequestInfo();
+ assertNotNull(SolrRequestInfo.getRequestInfo());
+ SolrRequestInfo.clearRequestInfo();
+ assertEquals("hook should be closed only once", 1, counter.get());
+ assertNull(SolrRequestInfo.getRequestInfo());
+ }
+
+ public void testThreadPool() throws InterruptedException {
+ final SolrRequestInfo info = new SolrRequestInfo(
+ new LocalSolrQueryRequest(h.getCore(), params()),
+ new SolrQueryResponse());
+ AtomicInteger counter = new AtomicInteger();
+
+ SolrRequestInfo.setRequestInfo(info);
+ ExecutorUtil.MDCAwareThreadPoolExecutor pool = new ExecutorUtil.MDCAwareThreadPoolExecutor(1, 1, 1,
+ TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));
+ AtomicBoolean run = new AtomicBoolean(false);
+ pool.execute(() -> {
+ final SolrRequestInfo poolInfo = SolrRequestInfo.getRequestInfo();
+ assertSame(info, poolInfo);
+ info.addCloseHook(counter::incrementAndGet);
+ run.set(true);
+ });
+ if (random().nextBoolean()) {
+ pool.shutdown();
+ } else {
+ pool.shutdownNow();
+ }
+ SolrRequestInfo.clearRequestInfo();
+ SolrRequestInfo.reset();
+
+ pool.awaitTermination(1, TimeUnit.MINUTES);
+ assertTrue(run.get());
+ assertEquals("hook should be closed only once", 1, counter.get());
+ assertNull(SolrRequestInfo.getRequestInfo());
+ }
+}