You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/10/30 06:09:03 UTC
[lucene-solr] 02/02: @1081 Harden.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 5729f9c0ed0d931129c2336df8b00aed9579c050
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Fri Oct 30 01:08:43 2020 -0500
@1081 Harden.
---
.../org/apache/solr/cloud/overseer/SliceMutator.java | 8 ++++----
.../solr/handler/admin/DaemonStreamApiTest.java | 1 -
.../solr/client/solrj/io/stream/DaemonStream.java | 20 +++++++++++++-------
3 files changed, 17 insertions(+), 12 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
index 47c32ca..61cb5d0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
@@ -164,12 +164,12 @@ public class SliceMutator {
// TODO: this should only be calculated once and cached somewhere?
if (log.isDebugEnabled()) log.debug("examine for setting or unsetting as leader replica={}", replica);
- if (replica == oldLeader && !coreNodeName.equals(replica.getName())) {
- if (log.isDebugEnabled()) log.debug("Unset leader");
- replica = new ReplicaMutator(cloudManager).unsetLeader(replica);
- } else if (coreNodeName.equals(replica.getName())) {
+ if (coreNodeName.equals(replica.getName())) {
if (log.isDebugEnabled()) log.debug("Set leader");
replica = new ReplicaMutator(cloudManager).setLeader(replica);
+ } else if (replica.getBool("leader", false)) {
+ if (log.isDebugEnabled()) log.debug("Unset leader");
+ replica = new ReplicaMutator(cloudManager).unsetLeader(replica);
}
newReplicas.put(replica.getName(), replica);
diff --git a/solr/core/src/test/org/apache/solr/handler/admin/DaemonStreamApiTest.java b/solr/core/src/test/org/apache/solr/handler/admin/DaemonStreamApiTest.java
index 74f7be3..77c0d43 100644
--- a/solr/core/src/test/org/apache/solr/handler/admin/DaemonStreamApiTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/admin/DaemonStreamApiTest.java
@@ -42,7 +42,6 @@ import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
-@ThreadLeakLingering(linger = 2000) // allow a small linger for daemon streams to stop
//@Ignore // nocommit - need to fix the driver and this test again
public class DaemonStreamApiTest extends SolrTestCaseJ4 {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
index 0692bc2..c5d7a2a 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DaemonStream.java
@@ -26,6 +26,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.solr.client.solrj.io.Tuple;
@@ -41,6 +42,8 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrNamedThreadFactory;
+import org.apache.solr.common.util.TimeOut;
+import org.apache.solr.common.util.TimeSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -288,7 +291,7 @@ public class DaemonStream extends TupleStream implements Expressible {
// a reference to the Thread that is executing the stream to track its state
private volatile Thread executingThread;
- private boolean shutdown;
+ private volatile boolean shutdown;
public StreamRunner(long runInterval, String id) {
this.runInterval = runInterval;
@@ -391,12 +394,15 @@ public class DaemonStream extends TupleStream implements Expressible {
}
iterations.incrementAndGet();
- if (sleepMillis > 0) {
- try {
- Thread.sleep(sleepMillis);
- } catch (InterruptedException e) {
- log.error("Error in DaemonStream:{}", id, e);
- break OUTER;
+ if (sleepMillis > 0 && !getShutdown()) {
+ TimeOut timeout = new TimeOut(sleepMillis, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+ while (!timeout.hasTimedOut() && !getShutdown()) {
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ log.error("Error in DaemonStream:{}", id, e);
+ break;
+ }
}
}
}