You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/10/30 06:09:19 UTC

[lucene-solr] 01/01: @1082 Harden.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 5c173acdeed31ac7d0532cc2fbc9965210e3fdd8
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Fri Oct 30 01:08:43 2020 -0500

    @1082 Harden.
---
 .../org/apache/solr/cloud/overseer/SliceMutator.java |  8 ++++----
 .../solr/handler/admin/DaemonStreamApiTest.java      |  1 -
 .../solr/client/solrj/io/stream/DaemonStream.java    | 20 +++++++++++++-------
 3 files changed, 17 insertions(+), 12 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
index 47c32ca..61cb5d0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
@@ -164,12 +164,12 @@ public class SliceMutator {
       // TODO: this should only be calculated once and cached somewhere?
       if (log.isDebugEnabled()) log.debug("examine for setting or unsetting as leader replica={}", replica);
 
-      if (replica == oldLeader && !coreNodeName.equals(replica.getName())) {
-        if (log.isDebugEnabled()) log.debug("Unset leader");
-        replica = new ReplicaMutator(cloudManager).unsetLeader(replica);
-      } else if (coreNodeName.equals(replica.getName())) {
+      if (coreNodeName.equals(replica.getName())) {
         if (log.isDebugEnabled()) log.debug("Set leader");
         replica = new ReplicaMutator(cloudManager).setLeader(replica);
+      } else if (replica.getBool("leader", false)) {
+        if (log.isDebugEnabled()) log.debug("Unset leader");
+        replica = new ReplicaMutator(cloudManager).unsetLeader(replica);
       }
 
       newReplicas.put(replica.getName(), replica);
diff --git a/solr/core/src/test/org/apache/solr/handler/admin/DaemonStreamApiTest.java b/solr/core/src/test/org/apache/solr/handler/admin/DaemonStreamApiTest.java
index 74f7be3..77c0d43 100644
--- a/solr/core/src/test/org/apache/solr/handler/admin/DaemonStreamApiTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/admin/DaemonStreamApiTest.java
@@ -42,7 +42,6 @@ import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
-@ThreadLeakLingering(linger = 2000) // allow a small linger for daemon streams to stop
 //@Ignore // nocommit - need to fix the driver and this test again
 public class DaemonStreamApiTest extends SolrTestCaseJ4 {
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
index 0692bc2..c5d7a2a 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
@@ -26,6 +26,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.solr.client.solrj.io.Tuple;
@@ -41,6 +42,8 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
 import org.apache.solr.common.ParWork;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.SolrNamedThreadFactory;
+import org.apache.solr.common.util.TimeOut;
+import org.apache.solr.common.util.TimeSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -288,7 +291,7 @@ public class DaemonStream extends TupleStream implements Expressible {
 
     // a reference to the Thread that is executing the stream to track its state
     private volatile Thread executingThread;
-    private boolean shutdown;
+    private volatile boolean shutdown;
 
     public StreamRunner(long runInterval, String id) {
       this.runInterval = runInterval;
@@ -391,12 +394,15 @@ public class DaemonStream extends TupleStream implements Expressible {
         }
         iterations.incrementAndGet();
 
-        if (sleepMillis > 0) {
-          try {
-            Thread.sleep(sleepMillis);
-          } catch (InterruptedException e) {
-            log.error("Error in DaemonStream:{}", id, e);
-            break OUTER;
+        if (sleepMillis > 0 && !getShutdown()) {
+          TimeOut timeout = new TimeOut(sleepMillis, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+          while (!timeout.hasTimedOut() && !getShutdown()) {
+            try {
+              Thread.sleep(250);
+            } catch (InterruptedException e) {
+              log.error("Error in DaemonStream:{}", id, e);
+              break;
+            }
           }
         }
       }