You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by mk...@apache.org on 2021/11/15 11:40:49 UTC

[lucene-solr] branch branch_8x updated: SOLR-15635: avoid redundant closeHooks invocation by MDCThreadPool (#2609)

This is an automated email from the ASF dual-hosted git repository.

mkhl pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/branch_8x by this push:
     new 66e2c28  SOLR-15635: avoid redundant closeHooks invocation by MDCThreadPool (#2609)
66e2c28 is described below

commit 66e2c28e57b1584382ddc183596534357326a0c9
Author: Mikhail Khludnev <mk...@users.noreply.github.com>
AuthorDate: Mon Nov 15 14:40:32 2021 +0300

    SOLR-15635: avoid redundant closeHooks invocation by MDCThreadPool (#2609)
---
 solr/CHANGES.txt                                   |  3 +
 .../org/apache/solr/request/SolrRequestInfo.java   | 70 +++++++++++++------
 .../test/org/apache/solr/TestCrossCoreJoin.java    | 27 +++++---
 .../apache/solr/request/TestSolrRequestInfo.java   | 80 ++++++++++++++++++++++
 4 files changed, 149 insertions(+), 31 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index dcea00e..6ef0705 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -46,6 +46,9 @@ Bug Fixes
 
 * SOLR-15696: Incremental backups no longer fail on collections where a 'SPLITSHARD' operation previously occurred. (Jason Gerlowski)
 
+* SOLR-15635: Don't close hooks twice when SolrRequestInfo is cleared twice; or /export with classic join
+  closed fromCore if provided (Mikhail Khludnev, David Smiley)
+
 * SOLR-15628: The SolrException.log() helper method has been fixed to correctly passes the Throwable to the Logger w/o stringification (hossman)
 
 * SOLR-15722: Delete Replica does not delete the Per replica state (noble)
diff --git a/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java b/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
index a83f0b8..ecb1081 100644
--- a/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
+++ b/solr/core/src/java/org/apache/solr/request/SolrRequestInfo.java
@@ -40,19 +40,21 @@ import org.slf4j.LoggerFactory;
 /** Information about the Solr request/response held in a {@link ThreadLocal}. */
 public class SolrRequestInfo {
 
-  protected static final int MAX_STACK_SIZE = 10;
+  private static final int MAX_STACK_SIZE = 10;
 
-  protected static final ThreadLocal<Deque<SolrRequestInfo>> threadLocal = ThreadLocal.withInitial(LinkedList::new);
+  private static final ThreadLocal<Deque<SolrRequestInfo>> threadLocal = ThreadLocal.withInitial(LinkedList::new);
 
-  protected SolrQueryRequest req;
-  protected SolrQueryResponse rsp;
-  protected Date now;
+  private int refCount = 1; // prevent closing when still used
+
+  private SolrQueryRequest req;
+  private SolrQueryResponse rsp;
+  private Date now;
   protected HttpServletRequest httpRequest;
-  protected TimeZone tz;
-  protected ResponseBuilder rb;
-  protected List<Closeable> closeHooks;
-  protected SolrDispatchFilter.Action action;
-  protected boolean useServerToken = false;
+  private TimeZone tz;
+  private ResponseBuilder rb;
+  private List<Closeable> closeHooks;
+  private SolrDispatchFilter.Action action;
+  private boolean useServerToken = false;
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
@@ -70,23 +72,25 @@ public class SolrRequestInfo {
     Deque<SolrRequestInfo> stack = threadLocal.get();
     if (info == null) {
       throw new IllegalArgumentException("SolrRequestInfo is null");
-    } else if (stack.size() <= MAX_STACK_SIZE) {
-      stack.push(info);
-    } else {
+    } else if (stack.size() > MAX_STACK_SIZE) {
       assert false : "SolrRequestInfo Stack is full";
       log.error("SolrRequestInfo Stack is full");
     }
+    log.trace("{} {}", info, "setRequestInfo()");
+    assert !info.isClosed() : "SRI is already closed (odd).";
+    stack.push(info);
   }
 
-  /** Removes the most recent SolrRequestInfo from the stack */
+  /** Removes the most recent SolrRequestInfo from the stack.  Close hooks are called. */
   public static void clearRequestInfo() {
+    log.trace("clearRequestInfo()");
     Deque<SolrRequestInfo> stack = threadLocal.get();
     if (stack.isEmpty()) {
       assert false : "clearRequestInfo called too many times";
       log.error("clearRequestInfo called too many times");
     } else {
       SolrRequestInfo info = stack.pop();
-      closeHooks(info);
+      info.close();
     }
   }
 
@@ -95,18 +99,25 @@ public class SolrRequestInfo {
    * we expect it to be empty by now because all "set" calls need to be balanced with a "clear".
    */
   public static void reset() {
+    log.trace("reset()");
     Deque<SolrRequestInfo> stack = threadLocal.get();
-    boolean isEmpty = stack.isEmpty();
+    assert stack.isEmpty() : "SolrRequestInfo Stack should have been cleared.";
     while (!stack.isEmpty()) {
       SolrRequestInfo info = stack.pop();
-      closeHooks(info);
+      info.close();
     }
-    assert isEmpty : "SolrRequestInfo Stack should have been cleared.";
   }
 
-  private static void closeHooks(SolrRequestInfo info) {
-    if (info.closeHooks != null) {
-      for (Closeable hook : info.closeHooks) {
+  private synchronized void close() {
+    log.trace("{} {}", this, "close()");
+
+    if (--refCount > 0) {
+      log.trace("{} {}", this, "not closing; still referenced");
+      return;
+    }
+
+    if (closeHooks != null) {
+      for (Closeable hook : closeHooks) {
         try {
           hook.close();
         } catch (Exception e) {
@@ -114,6 +125,7 @@ public class SolrRequestInfo {
         }
       }
     }
+    closeHooks = null;
   }
 
   public SolrRequestInfo(SolrQueryRequest req, SolrQueryResponse rsp) {
@@ -186,6 +198,9 @@ public class SolrRequestInfo {
   public void addCloseHook(Closeable hook) {
     // is this better here, or on SolrQueryRequest?
     synchronized (this) {
+      if (isClosed()) {
+        throw new IllegalStateException("Already closed!");
+      }
       if (closeHooks == null) {
         closeHooks = new LinkedList<>();
       }
@@ -213,13 +228,24 @@ public class SolrRequestInfo {
     this.useServerToken = use;
   }
 
+  private synchronized boolean isClosed() {
+    return refCount <= 0;
+  }
+
   public static ExecutorUtil.InheritableThreadLocalProvider getInheritableThreadLocalProvider() {
     return new ExecutorUtil.InheritableThreadLocalProvider() {
       @Override
       @SuppressWarnings({"unchecked"})
       public void store(@SuppressWarnings({"rawtypes"})AtomicReference ctx) {
         SolrRequestInfo me = SolrRequestInfo.getRequestInfo();
-        if (me != null) ctx.set(me);
+        if (me != null) {
+          // increase refCount in store(), while we're still in the thread of the provider to avoid
+          //  a race if this thread finishes its work before the pool'ed thread runs
+          synchronized (me) {
+            me.refCount++;
+          }
+          ctx.set(me);
+        }
       }
 
       @Override
diff --git a/solr/core/src/test/org/apache/solr/TestCrossCoreJoin.java b/solr/core/src/test/org/apache/solr/TestCrossCoreJoin.java
index 77f4afa..d21b5db 100644
--- a/solr/core/src/test/org/apache/solr/TestCrossCoreJoin.java
+++ b/solr/core/src/test/org/apache/solr/TestCrossCoreJoin.java
@@ -53,17 +53,17 @@ public class TestCrossCoreJoin extends SolrTestCaseJ4 {
 
     fromCore = coreContainer.create("fromCore", ImmutableMap.of("configSet", "minimal"));
 
-    assertU(add(doc("id", "1", "name", "john", "title", "Director", "dept_s", "Engineering")));
-    assertU(add(doc("id", "2", "name", "mark", "title", "VP", "dept_s", "Marketing")));
-    assertU(add(doc("id", "3", "name", "nancy", "title", "MTS", "dept_s", "Sales")));
-    assertU(add(doc("id", "4", "name", "dave", "title", "MTS", "dept_s", "Support", "dept_s", "Engineering")));
-    assertU(add(doc("id", "5", "name", "tina", "title", "VP", "dept_s", "Engineering")));
+    assertU(add(doc("id", "1", "id_s_dv", "1", "name", "john", "title", "Director", "dept_s", "Engineering")));
+    assertU(add(doc("id", "2", "id_s_dv", "2", "name", "mark", "title", "VP", "dept_s", "Marketing")));
+    assertU(add(doc("id", "3", "id_s_dv", "3", "name", "nancy", "title", "MTS", "dept_s", "Sales")));
+    assertU(add(doc("id", "4", "id_s_dv", "4", "name", "dave", "title", "MTS", "dept_s", "Support", "dept_s", "Engineering")));
+    assertU(add(doc("id", "5", "id_s_dv", "5", "name", "tina", "title", "VP", "dept_s", "Engineering")));
     assertU(commit());
 
-    update(fromCore, add(doc("id", "10", "dept_id_s", "Engineering", "text", "These guys develop stuff", "cat", "dev")));
-    update(fromCore, add(doc("id", "11", "dept_id_s", "Marketing", "text", "These guys make you look good")));
-    update(fromCore, add(doc("id", "12", "dept_id_s", "Sales", "text", "These guys sell stuff")));
-    update(fromCore, add(doc("id", "13", "dept_id_s", "Support", "text", "These guys help customers")));
+    update(fromCore, add(doc("id", "10", "id_s_dv", "10", "dept_id_s", "Engineering", "text", "These guys develop stuff", "cat", "dev")));
+    update(fromCore, add(doc("id", "11", "id_s_dv", "11", "dept_id_s", "Marketing", "text", "These guys make you look good")));
+    update(fromCore, add(doc("id", "12", "id_s_dv", "12", "dept_id_s", "Sales", "text", "These guys sell stuff")));
+    update(fromCore, add(doc("id", "13", "id_s_dv", "13", "dept_id_s", "Support", "text", "These guys help customers")));
     update(fromCore, commit());
 
   }
@@ -91,6 +91,15 @@ public class TestCrossCoreJoin extends SolrTestCaseJ4 {
         , "/response=={'numFound':3,'start':0,'numFoundExact':true,'docs':[{'id':'1'},{'id':'4'},{'id':'5'}]}"
     );
 
+    assertJQ(req( "qt", "/export", 
+            "q", joinPrefix + " from=dept_id_s to=dept_s fromIndex=fromCore}cat:dev", "fl", "id_s_dv",
+            "sort", "id_s_dv asc",
+            "debugQuery", random().nextBoolean() ? "true":"false")
+            , "/response=={'numFound':3,'docs':[{'id_s_dv':'1'},{'id_s_dv':'4'},{'id_s_dv':'5'}]}"
+    );
+    assertFalse(fromCore.isClosed());
+    assertFalse(h.getCore().isClosed());
+
     // find people that develop stuff - but limit via filter query to a name of "john"
     // this tests filters being pushed down to queries (SOLR-3062)
     assertJQ(req("q", joinPrefix + " from=dept_id_s to=dept_s fromIndex=fromCore}cat:dev", "fl", "id", "fq", "name:john",
diff --git a/solr/core/src/test/org/apache/solr/request/TestSolrRequestInfo.java b/solr/core/src/test/org/apache/solr/request/TestSolrRequestInfo.java
new file mode 100644
index 0000000..0b1573b
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/request/TestSolrRequestInfo.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.request;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.response.SolrQueryResponse;
+import org.junit.BeforeClass;
+
+public class TestSolrRequestInfo extends SolrTestCaseJ4 {
+
+    @BeforeClass
+    public static void beforeClass() throws Exception {
+        initCore("solrconfig.xml","schema11.xml");
+    }
+
+    public void testCloseHookTwice(){
+        final SolrRequestInfo info = new SolrRequestInfo(
+                new LocalSolrQueryRequest(h.getCore(), params()),
+                new SolrQueryResponse());
+        AtomicInteger counter = new AtomicInteger();
+        info.addCloseHook(counter::incrementAndGet);
+        SolrRequestInfo.setRequestInfo(info);
+        SolrRequestInfo.setRequestInfo(info);
+        SolrRequestInfo.clearRequestInfo();
+        assertNotNull(SolrRequestInfo.getRequestInfo());
+        SolrRequestInfo.clearRequestInfo();
+        assertEquals("hook should be closed only once", 1, counter.get());
+        assertNull(SolrRequestInfo.getRequestInfo());
+    }
+
+    public void testThreadPool() throws InterruptedException {
+        final SolrRequestInfo info = new SolrRequestInfo(
+                new LocalSolrQueryRequest(h.getCore(), params()),
+                new SolrQueryResponse());
+        AtomicInteger counter = new AtomicInteger();
+
+        SolrRequestInfo.setRequestInfo(info);
+        ExecutorUtil.MDCAwareThreadPoolExecutor pool = new ExecutorUtil.MDCAwareThreadPoolExecutor(1, 1, 1,
+                TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));
+        AtomicBoolean run = new AtomicBoolean(false);
+        pool.execute(() -> {
+            final SolrRequestInfo poolInfo = SolrRequestInfo.getRequestInfo();
+            assertSame(info, poolInfo);
+            info.addCloseHook(counter::incrementAndGet);
+            run.set(true);
+        });
+        if (random().nextBoolean()) {
+            pool.shutdown();
+        } else {
+            pool.shutdownNow();
+        }
+        SolrRequestInfo.clearRequestInfo();
+        SolrRequestInfo.reset();
+
+        pool.awaitTermination(1, TimeUnit.MINUTES);
+        assertTrue(run.get());
+        assertEquals("hook should be closed only once", 1, counter.get());
+        assertNull(SolrRequestInfo.getRequestInfo());
+    }
+}