You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/09/17 15:22:37 UTC
[lucene-solr] 03/18: @838 More cleanup.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit c16af3e1c786e73ac1e7b184851c3256fe4509e1
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Sep 16 18:18:37 2020 -0500
@838 More cleanup.
---
.../collector/SchedulerMetricsCollector.java | 5 +-
.../client/solrj/embedded/JettySolrRunner.java | 6 +-
.../java/org/apache/solr/cloud/LeaderElector.java | 10 +-
.../src/java/org/apache/solr/cloud/Overseer.java | 3 +-
.../apache/solr/cloud/OverseerElectionContext.java | 26 +--
.../org/apache/solr/cloud/OverseerTaskQueue.java | 1 -
.../org/apache/solr/cloud/ReplicateFromLeader.java | 2 +-
.../solr/cloud/ShardLeaderElectionContext.java | 2 +-
.../org/apache/solr/cloud/ZkCollectionTerms.java | 2 +-
.../java/org/apache/solr/cloud/ZkController.java | 19 +-
.../org/apache/solr/cloud/ZkDistributedQueue.java | 233 +++++++++++----------
.../java/org/apache/solr/cloud/ZkShardTerms.java | 9 +-
.../OverseerCollectionMessageHandler.java | 5 +-
.../cloud/autoscaling/OverseerTriggerThread.java | 4 +-
.../solr/cloud/autoscaling/ScheduledTriggers.java | 2 +-
.../autoscaling/sim/GenericDistributedQueue.java | 2 +-
.../cloud/autoscaling/sim/SimCloudManager.java | 2 +-
.../solr/metrics/rrd/SolrRrdBackendFactory.java | 2 +-
.../src/java/org/apache/solr/pkg/PackageAPI.java | 2 +-
.../java/org/apache/solr/update/CommitTracker.java | 9 +-
.../apache/solr/update/DirectUpdateHandler2.java | 2 +-
.../java/org/apache/solr/update/UpdateHandler.java | 4 +-
.../src/java/org/apache/solr/update/UpdateLog.java | 6 +-
solr/server/etc/jetty-http.xml | 2 +-
solr/server/etc/jetty-https.xml | 2 +-
.../solr/client/solrj/impl/CloudSolrClient.java | 2 +-
.../solrj/impl/HttpClusterStateProvider.java | 2 +-
.../solr/client/solrj/impl/LBHttpSolrClient.java | 2 +-
.../apache/solr/common/cloud/ZkStateReader.java | 15 +-
.../apache/solr/cloud/MiniSolrCloudCluster.java | 2 +-
.../java/org/apache/solr/cloud/ZkTestServer.java | 6 +-
31 files changed, 212 insertions(+), 179 deletions(-)
diff --git a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/collector/SchedulerMetricsCollector.java b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/collector/SchedulerMetricsCollector.java
index e1bdc60..f13bd27 100644
--- a/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/collector/SchedulerMetricsCollector.java
+++ b/solr/contrib/prometheus-exporter/src/java/org/apache/solr/prometheus/collector/SchedulerMetricsCollector.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import io.prometheus.client.Collector;
@@ -49,7 +50,7 @@ public class SchedulerMetricsCollector implements Closeable {
private final int duration;
private final TimeUnit timeUnit;
- private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(
+ private final ScheduledThreadPoolExecutor scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(
1,
new SolrNamedThreadFactory("scheduled-metrics-collector"));
@@ -71,6 +72,8 @@ public class SchedulerMetricsCollector implements Closeable {
this.metricCollectors = metricCollectors;
this.duration = duration;
this.timeUnit = timeUnit;
+ scheduler.setRemoveOnCancelPolicy(true);
+ scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
}
public void start() {
diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
index 361bd76..9e535a3 100644
--- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
+++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
@@ -339,7 +339,7 @@ public class JettySolrRunner implements Closeable {
HTTP2ServerConnectionFactory http2ConnectionFactory = new HTTP2ServerConnectionFactory(configuration);
- http2ConnectionFactory.setMaxConcurrentStreams(64);
+ http2ConnectionFactory.setMaxConcurrentStreams(256);
http2ConnectionFactory.setInputBufferSize(4096);
ALPNServerConnectionFactory alpn = new ALPNServerConnectionFactory(
@@ -951,8 +951,10 @@ public class JettySolrRunner implements Closeable {
}
} catch (KeeperException e) {
SolrException.log(log, e);
+ return;
} catch (InterruptedException e) {
- ParWork.propagateInterrupt(e);
+ log.info("interrupted");
+ return;
}
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
index b938728..192ef39 100644
--- a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
+++ b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
@@ -338,15 +338,14 @@ public class LeaderElector {
try {
zkClient.delete(myNode, -1);
} catch (AlreadyClosedException | InterruptedException e) {
- ParWork.propagateInterrupt(e);
log.info("Already shutting down");
return;
} catch (KeeperException.NoNodeException nne) {
log.info("No znode found to delete at {}", myNode);
// expected . don't do anything
} catch (Exception e) {
- ParWork.propagateInterrupt(e);
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception canceling election", e);
+ log.error("Exception canceling election", e);
+ return;
}
return;
}
@@ -356,8 +355,9 @@ public class LeaderElector {
} catch (AlreadyClosedException | InterruptedException e) {
log.info("Already shutting down");
return;
- } catch (Exception e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception canceling election", e);
+ } catch (Exception e) {
+ log.error("Exception canceling election", e);
+ return;
}
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index b6698f9..0e9b89d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -715,10 +715,9 @@ public class Overseer implements SolrCloseable {
log.warn("ZooKeeper session expired");
return;
} catch (InterruptedException | AlreadyClosedException e) {
- ParWork.propagateInterrupt(e);
+ log.info("Already closed");
return;
} catch (Exception e) {
- ParWork.propagateInterrupt(e);
log.error("Unexpected error in Overseer state update loop", e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
index 1ec1e23..73f8c7e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
@@ -56,19 +56,19 @@ final class OverseerElectionContext extends ShardLeaderElectionContextBase {
// TODO: the idea here is that we could clear the Overseer queue
// if we knew we are the first Overseer in a cluster startup
// needs more testing in real world vs tests
- if (!weAreReplacement) {
- // kills the queues
- ZkDistributedQueue queue = new ZkDistributedQueue(
- overseer.getZkController().getZkStateReader().getZkClient(),
- "/overseer/queue", new Stats(), 0, new ConnectionManager.IsClosed() {
- public boolean isClosed() {
- return overseer.isClosed() || overseer.getZkController()
- .getCoreContainer().isShutDown();
- }
- });
- clearQueue(queue);
- clearQueue(Overseer.getInternalWorkQueue(zkClient, new Stats()));
- }
+// if (!weAreReplacement) {
+// // kills the queues
+// ZkDistributedQueue queue = new ZkDistributedQueue(
+// overseer.getZkController().getZkStateReader().getZkClient(),
+// "/overseer/queue", new Stats(), 0, new ConnectionManager.IsClosed() {
+// public boolean isClosed() {
+// return overseer.isClosed() || overseer.getZkController()
+// .getCoreContainer().isShutDown();
+// }
+// });
+// clearQueue(queue);
+// clearQueue(Overseer.getInternalWorkQueue(zkClient, new Stats()));
+// }
super.runLeaderProcess(context, weAreReplacement, pauseBeforeStartMs);
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
index 980388b..aa65dc4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
@@ -167,7 +167,6 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
- ParWork.propagateInterrupt(e);
return;
}
try {
diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
index 78be024..adb0e23 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
@@ -150,6 +150,6 @@ public class ReplicateFromLeader implements Closeable {
} catch (Exception e) {
ParWork.propagateInterrupt(e);
}
- ObjectReleaseTracker.release(this);
+ assert ObjectReleaseTracker.release(this);
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
index 8144e56..a88a1cc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
@@ -87,7 +87,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
super.close();
IOUtils.closeQuietly(syncStrategy);
this.isClosed = true;
- ObjectReleaseTracker.release(this);
+ assert ObjectReleaseTracker.release(this);
}
@Override
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java
index d90b062..c50ea48 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java
@@ -78,7 +78,7 @@ class ZkCollectionTerms implements AutoCloseable {
this.closed = true;
terms.values().forEach(ZkShardTerms::close);
}
- ObjectReleaseTracker.release(this);
+ assert ObjectReleaseTracker.release(this);
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index bfd9135..db48baa 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -964,17 +964,19 @@ public class ZkController implements Closeable {
});
} catch (KeeperException e) {
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ log.error("Zk Exception", e);
+ return;
} catch (InterruptedException e) {
- ParWork.propagateInterrupt(e);
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ log.info("interrupted");
+ return;
}
try {
zkClient.mkdirs("/cluster/cluster_lock");
} catch (KeeperException.NodeExistsException e) {
// okay
} catch (KeeperException e) {
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ log.error("Zk Exception", e);
+ return;
}
boolean createdClusterNodes = false;
try {
@@ -1000,7 +1002,7 @@ public class ZkController implements Closeable {
} catch (Exception e) {
ParWork.propagateInterrupt(e);
log.error("Failed creating initial zk layout", e);
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ return;
}
createdClusterNodes = true;
} else {
@@ -1520,11 +1522,11 @@ public class ZkController implements Closeable {
throw new AlreadyClosedException();
}
- getZkStateReader().waitForState(collection, 10, TimeUnit.SECONDS, (n,c) -> c != null && c.getLeader(shardId) != null && c.getLeader(shardId).getState().equals(
+ getZkStateReader().waitForState(collection, 30, TimeUnit.SECONDS, (n,c) -> c != null && c.getLeader(shardId) != null && c.getLeader(shardId).getState().equals(
Replica.State.ACTIVE));
// there should be no stale leader state at this point, dont hit zk directly
- String leaderUrl = zkStateReader.getLeaderUrl(collection, shardId, 5000);
+ String leaderUrl = zkStateReader.getLeaderUrl(collection, shardId, 15000);
String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
log.debug("We are {} and leader is {}", ourUrl, leaderUrl);
@@ -2682,7 +2684,8 @@ public class ZkController implements Closeable {
private final Map<String, Set<Runnable>> confDirectoryListeners = new HashMap<>();
- private class WatcherImpl implements Watcher {
+ private class
+ WatcherImpl implements Watcher {
private final String zkDir;
private WatcherImpl(String dir) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
index c4a27ac..2fd8d9d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
@@ -329,7 +329,7 @@ public class ZkDistributedQueue implements DistributedQueue {
* will be immediately visible when this method returns.
*/
@Override
- public void offer(byte[] data) throws KeeperException, InterruptedException {
+ public void offer(byte[] data) throws KeeperException {
Timer.Context time = stats.time(dir + "_offer");
try {
try {
@@ -353,12 +353,13 @@ public class ZkDistributedQueue implements DistributedQueue {
// Explicitly set isDirty here so that synchronous same-thread calls behave as expected.
// This will get set again when the watcher actually fires, but that's ok.
- zookeeper
- .create(dir + "/" + PREFIX, data, CreateMode.PERSISTENT_SEQUENTIAL,
- true);
+ zookeeper.create(dir + "/" + PREFIX, data, CreateMode.PERSISTENT_SEQUENTIAL, true);
return;
} catch (KeeperException.NoNodeException e) {
// someone created it
+ } catch (InterruptedException e) {
+ ParWork.propagateInterrupt(e);
+ throw new AlreadyClosedException(e);
}
} finally {
@@ -401,45 +402,53 @@ public class ZkDistributedQueue implements DistributedQueue {
* The caller must double check that the actual node still exists, since the in-memory
* list is inherently stale.
*/
- private String firstChild(boolean remove) throws KeeperException, InterruptedException {
- updateLock.lockInterruptibly();
+ private String firstChild(boolean remove) {
try {
- // We always return from cache first, the cache will be cleared if the node is not exist
- if (!knownChildren.isEmpty()) {
- return remove ? knownChildren.pollFirst() : knownChildren.first();
- }
-
- if (knownChildren.isEmpty()) {
- return null;
- }
+ updateLock.lockInterruptibly();
+ try {
+ // We always return from cache first, the cache will be cleared if the node is not exist
+ if (!knownChildren.isEmpty()) {
+ return remove ? knownChildren.pollFirst() : knownChildren.first();
+ }
+ if (knownChildren.isEmpty()) {
+ return null;
+ }
- return remove ? knownChildren.pollFirst() : knownChildren.first();
- } finally {
- if (updateLock.isHeldByCurrentThread()) {
- updateLock.unlock();
+ return remove ? knownChildren.pollFirst() : knownChildren.first();
+ } finally {
+ if (updateLock.isHeldByCurrentThread()) {
+ updateLock.unlock();
+ }
}
+ } catch (InterruptedException e) {
+ ParWork.propagateInterrupt(e);
+ throw new AlreadyClosedException(e);
}
}
/**
* Return the current set of children from ZK; does not change internal state.
*/
- TreeSet<String> fetchZkChildren(Watcher watcher) throws InterruptedException, KeeperException {
+ TreeSet<String> fetchZkChildren(Watcher watcher) throws KeeperException {
TreeSet<String> orderedChildren = new TreeSet<>();
-
- List<String> childNames = zookeeper.getChildren(dir, watcher, true);
- stats.setQueueLength(childNames.size());
- for (String childName : childNames) {
- // Check format
- if (!childName.regionMatches(0, PREFIX, 0, PREFIX.length())) {
- // responses can be written to same queue with different naming scheme
- if (log.isDebugEnabled()) log.debug("Found child node with improper name: {}", childName);
- continue;
+ try {
+ List<String> childNames = zookeeper.getChildren(dir, watcher, true);
+ stats.setQueueLength(childNames.size());
+ for (String childName : childNames) {
+ // Check format
+ if (!childName.regionMatches(0, PREFIX, 0, PREFIX.length())) {
+ // responses can be written to same queue with different naming scheme
+ if (log.isDebugEnabled()) log.debug("Found child node with improper name: {}", childName);
+ continue;
+ }
+ orderedChildren.add(childName);
}
- orderedChildren.add(childName);
+ return orderedChildren;
+ } catch (InterruptedException e) {
+ ParWork.propagateInterrupt(e);
+ throw new AlreadyClosedException(e);
}
- return orderedChildren;
}
/**
@@ -449,70 +458,74 @@ public class ZkDistributedQueue implements DistributedQueue {
* Package-private to support {@link OverseerTaskQueue} specifically.</p>
*/
@Override
- public Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Predicate<String> acceptFilter) throws KeeperException, InterruptedException {
+ public Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Predicate<String> acceptFilter) throws KeeperException {
List<String> foundChildren = new ArrayList<>();
long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
TimeOut timeout = new TimeOut(waitNanos, TimeUnit.NANOSECONDS, TimeSource.NANO_TIME);
+ try {
+ while (true) {
+ // Trigger a refresh, but only force it if this is not the first iteration.
+ //firstChild(false, !first);
+
+ updateLock.lockInterruptibly();
+ try {
+ for (String child : knownChildren) {
+ if (acceptFilter.test(child)) {
+ foundChildren.add(child);
+ }
+ }
+ if (!foundChildren.isEmpty()) {
+ break;
+ }
+ if (waitNanos <= 0) {
+ break;
+ }
- while (true && !Thread.currentThread().isInterrupted()) {
- // Trigger a refresh, but only force it if this is not the first iteration.
- //firstChild(false, !first);
+ TreeSet<String> existingChildren = knownChildren;
- updateLock.lockInterruptibly();
- try {
- for (String child : knownChildren) {
- if (acceptFilter.test(child)) {
- foundChildren.add(child);
+ while (existingChildren == knownChildren) {
+ try {
+ changed.await(5000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ ParWork.propagateInterrupt(e);
+ throw new AlreadyClosedException();
+ }
+ if (timeout.hasTimedOut()) {
+ //throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout");
+ return Collections.emptyList();
+ }
+ }
+ } finally {
+ if (updateLock.isHeldByCurrentThread()) {
+ updateLock.unlock();
}
}
+
if (!foundChildren.isEmpty()) {
break;
}
- if (waitNanos <= 0) {
- break;
- }
+ }
- TreeSet<String> existingChildren = knownChildren;
-
- while (existingChildren == knownChildren) {
- try {
- changed.await(500, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- ParWork.propagateInterrupt(e);
- throw new AlreadyClosedException();
- }
- if (timeout.hasTimedOut()) {
- //throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout");
- return Collections.emptyList();
- }
+ // Technically we could restart the method if we fail to actually obtain any valid children
+ // from ZK, but this is a super rare case, and the latency of the ZK fetches would require
+ // much more sophisticated waitNanos tracking.
+ List<Pair<String,byte[]>> result = new ArrayList<>(foundChildren.size());
+ for (String child : foundChildren) {
+ if (result.size() >= max) {
+ break;
}
- } finally {
- if (updateLock.isHeldByCurrentThread()) {
- updateLock.unlock();
+ try {
+ byte[] data = zookeeper.getData(dir + "/" + child, null, null);
+ result.add(new Pair<>(child, data));
+ } catch (KeeperException.NoNodeException e) {
+ continue;
}
}
-
- if (!foundChildren.isEmpty()) {
- break;
- }
- }
-
- // Technically we could restart the method if we fail to actually obtain any valid children
- // from ZK, but this is a super rare case, and the latency of the ZK fetches would require
- // much more sophisticated waitNanos tracking.
- List<Pair<String, byte[]>> result = new ArrayList<>();
- for (String child : foundChildren) {
- if (result.size() >= max) {
- break;
- }
- try {
- byte[] data = zookeeper.getData(dir + "/" + child, null, null);
- result.add(new Pair<>(child, data));
- } catch (KeeperException.NoNodeException e) {
- continue;
- }
+ return result;
+ } catch (InterruptedException e) {
+ ParWork.propagateInterrupt(e);
+ throw new AlreadyClosedException(e);
}
- return result;
}
/**
@@ -520,36 +533,46 @@ public class ZkDistributedQueue implements DistributedQueue {
*
* @return the data at the head of the queue.
*/
- private byte[] firstElement() throws KeeperException, InterruptedException {
- while (true && !Thread.currentThread().isInterrupted()) {
- String firstChild = firstChild(false);
- if (firstChild == null) {
- return null;
- }
- try {
- return zookeeper.getData(dir + "/" + firstChild, null, null);
- } catch (KeeperException.NoNodeException e) {
- return null;
+ private byte[] firstElement() throws KeeperException {
+ try {
+ while (true) {
+ String firstChild = null;
+ firstChild = firstChild(false);
+ if (firstChild == null) {
+ return null;
+ }
+ try {
+ return zookeeper.getData(dir + "/" + firstChild, null, null);
+ } catch (KeeperException.NoNodeException e) {
+ return null;
+ }
}
+ } catch (InterruptedException e) {
+ ParWork.propagateInterrupt(e);
+ throw new AlreadyClosedException(e);
}
- return null;
}
- private byte[] removeFirst() throws KeeperException, InterruptedException {
- while (true) {
- String firstChild = firstChild(true);
- if (firstChild == null) {
- return null;
- }
- try {
- String path = dir + "/" + firstChild;
- byte[] result = zookeeper.getData(path, null, null);
- zookeeper.delete(path, -1);
- // stats.setQueueLength(knownChildren.size());
- return result;
- } catch (KeeperException.NoNodeException e) {
- return null;
+ private byte[] removeFirst() throws KeeperException {
+ try {
+ while (true) {
+ String firstChild = firstChild(true);
+ if (firstChild == null) {
+ return null;
+ }
+ try {
+ String path = dir + "/" + firstChild;
+ byte[] result = zookeeper.getData(path, null, null);
+ zookeeper.delete(path, -1);
+ // stats.setQueueLength(knownChildren.size());
+ return result;
+ } catch (KeeperException.NoNodeException e) {
+ return null;
+ }
}
+ } catch (InterruptedException e) {
+ ParWork.propagateInterrupt(e);
+ throw new AlreadyClosedException(e);
}
}
@@ -575,8 +598,6 @@ public class ZkDistributedQueue implements DistributedQueue {
changed.signalAll();
} catch (KeeperException e) {
log.error("", e);
- } catch (InterruptedException e) {
- log.info("ZkDistributedQueue ChildWatcher interrupted");
} finally {
updateLock.unlock();
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
index 99ac563..44c425d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -159,7 +159,7 @@ public class ZkShardTerms implements AutoCloseable{
listeners.clear();
- ObjectReleaseTracker.release(this);
+ assert ObjectReleaseTracker.release(this);
}
// package private for testing, only used by tests
@@ -357,7 +357,12 @@ public class ZkShardTerms implements AutoCloseable{
}
// Some events may be missed during register a watcher, so it is safer to refresh terms after registering watcher
- refreshTerms(this);
+ try {
+ refreshTerms(this);
+ } catch (Exception e) {
+ log.error("", e);
+ return;
+ }
}
};
refreshTerms(watcher);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index d6cb547..b7e591e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -879,8 +879,9 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
rstats2 = zkStateReader.getZkClient().exists(asyncPathToWaitOn, this);
} catch (KeeperException e) {
log.error("ZooKeeper exception", e);
+ return;
} catch (InterruptedException e) {
- ParWork.propagateInterrupt(e);
+ log.info("interrupted");
return;
}
if (rstats2 != null) {
@@ -987,7 +988,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
} catch (NullPointerException e) {
// okay
}
- ObjectReleaseTracker.release(this);
+ assert ObjectReleaseTracker.release(this);
}
@Override
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
index 42b68d3..b14056d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
@@ -115,7 +115,7 @@ public class OverseerTriggerThread implements Runnable, SolrCloseable {
activeTriggers.clear();
if (log.isDebugEnabled()) log.debug("OverseerTriggerThread has been closed explicitly");
- ObjectReleaseTracker.release(this);
+ assert ObjectReleaseTracker.release(this);
}
/**
@@ -313,7 +313,7 @@ public class OverseerTriggerThread implements Runnable, SolrCloseable {
} catch (IOException e) {
log.warn("IO Error: [{}]", e);
} catch (InterruptedException | AlreadyClosedException e) {
- ParWork.propagateInterrupt(e);
+ log.info("interrupted");
} catch (Exception e) {
log.error("Unexpected exception", e);
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
index 3a4e6bf..8dc233e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
@@ -551,7 +551,7 @@ public class ScheduledTriggers implements Closeable {
if (log.isDebugEnabled()) log.debug("ScheduledTriggers closed completely");
- ObjectReleaseTracker.release(this);
+ assert ObjectReleaseTracker.release(this);
}
/**
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/GenericDistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/GenericDistributedQueue.java
index 8e5fe03..dae73cf 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/GenericDistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/GenericDistributedQueue.java
@@ -98,7 +98,7 @@ public class GenericDistributedQueue implements DistributedQueue {
*/
private final Condition changed = updateLock.newCondition();
- private boolean isDirty = true;
+ private volatile boolean isDirty = true;
private int watcherCount = 0;
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
index 5d3c8ff..96408c0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
@@ -1011,7 +1011,7 @@ public class SimCloudManager implements SolrCloudManager {
closer.collect(simCloudManagerPool);
}
- ObjectReleaseTracker.release(this);
+ assert ObjectReleaseTracker.release(this);
}
/**
diff --git a/solr/core/src/java/org/apache/solr/metrics/rrd/SolrRrdBackendFactory.java b/solr/core/src/java/org/apache/solr/metrics/rrd/SolrRrdBackendFactory.java
index f13c508..b6439b0 100644
--- a/solr/core/src/java/org/apache/solr/metrics/rrd/SolrRrdBackendFactory.java
+++ b/solr/core/src/java/org/apache/solr/metrics/rrd/SolrRrdBackendFactory.java
@@ -102,7 +102,7 @@ public class SolrRrdBackendFactory extends RrdBackendFactory implements SolrClos
* @param timeSource time source
*/
public SolrRrdBackendFactory(SolrClient solrClient, String collection, int syncPeriod, TimeSource timeSource) {
- assert ObjectReleaseTracker.release(this);
+ assert ObjectReleaseTracker.track(this);
this.solrClient = solrClient;
this.timeSource = timeSource;
this.collection = collection;
diff --git a/solr/core/src/java/org/apache/solr/pkg/PackageAPI.java b/solr/core/src/java/org/apache/solr/pkg/PackageAPI.java
index bb8d5cd..42eebb6 100644
--- a/solr/core/src/java/org/apache/solr/pkg/PackageAPI.java
+++ b/solr/core/src/java/org/apache/solr/pkg/PackageAPI.java
@@ -121,7 +121,7 @@ public class PackageAPI {
} catch (KeeperException e) {
log.error("A ZK error has occurred", e);
} catch (InterruptedException e) {
- ParWork.propagateInterrupt(e);
+ log.info("interrupted");
}
}
diff --git a/solr/core/src/java/org/apache/solr/update/CommitTracker.java b/solr/core/src/java/org/apache/solr/update/CommitTracker.java
index 0184184..df22dad 100644
--- a/solr/core/src/java/org/apache/solr/update/CommitTracker.java
+++ b/solr/core/src/java/org/apache/solr/update/CommitTracker.java
@@ -34,6 +34,7 @@ import java.util.Locale;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -63,8 +64,7 @@ public final class CommitTracker implements Runnable, Closeable {
// note: can't use ExecutorsUtil because it doesn't have a *scheduled* ExecutorService.
// Not a big deal but it means we must take care of MDC logging here.
- private final ScheduledExecutorService scheduler =
- Executors.newScheduledThreadPool(1, new SolrNamedThreadFactory("commitScheduler"));
+ private final ScheduledThreadPoolExecutor scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, new SolrNamedThreadFactory("commitScheduler"));
@SuppressWarnings({"rawtypes"})
private volatile ScheduledFuture pending;
@@ -94,6 +94,8 @@ public final class CommitTracker implements Runnable, Closeable {
this.softCommit = softCommit;
this.openSearcher = openSearcher;
+ scheduler.setRemoveOnCancelPolicy(true);
+ scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
log.info("{} AutoCommit: {}", name, this);
assert ObjectReleaseTracker.track(this);
}
@@ -110,9 +112,8 @@ public final class CommitTracker implements Runnable, Closeable {
// okay
}
pending = null;
- scheduler.shutdownNow();
ParWork.close(scheduler);
- ObjectReleaseTracker.release(this);
+ assert ObjectReleaseTracker.release(this);
}
/** schedule individual commits */
diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
index 742c838..c6624475 100644
--- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
+++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
@@ -813,7 +813,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
}
super.close();
numDocsPending.reset();
- ObjectReleaseTracker.release(this);
+ assert ObjectReleaseTracker.release(this);
}
// IndexWriterCloser interface method - called from solrCoreState.decref(this)
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
index 2143f8f..83631af 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
@@ -101,7 +101,7 @@ UpdateHandler implements SolrInfoBean, Closeable {
public void close() throws IOException {
this.closed = true;
if (ulog != null) ulog.close();
- ObjectReleaseTracker.release(this);
+ assert ObjectReleaseTracker.release(this);
}
protected void callPostCommitCallbacks() {
@@ -165,7 +165,7 @@ UpdateHandler implements SolrInfoBean, Closeable {
}
} catch (Throwable e) {
IOUtils.closeQuietly(ourUpdateLog);
- ObjectReleaseTracker.release(this);
+ assert ObjectReleaseTracker.release(this);
if (e instanceof Error) {
throw e;
}
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index a838423..4cfc303 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -377,7 +377,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
throw new AlreadyClosedException();
}
if (dataDir != null) {
- ObjectReleaseTracker.release(this);
+ assert ObjectReleaseTracker.release(this);
}
assert ObjectReleaseTracker.track(this);
try {
@@ -459,7 +459,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
core.getCoreMetricManager().registerMetricProducer(SolrInfoBean.Category.TLOG.toString(), this);
} catch (Throwable e) {
ParWork.propagateInterrupt(e);
- ObjectReleaseTracker.release(this);
+ assert ObjectReleaseTracker.release(this);
if (e instanceof Error) {
throw e;
}
@@ -1522,7 +1522,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
}
}
- ObjectReleaseTracker.release(this);
+ assert ObjectReleaseTracker.release(this);
}
diff --git a/solr/server/etc/jetty-http.xml b/solr/server/etc/jetty-http.xml
index bcc0a49..009b10f 100644
--- a/solr/server/etc/jetty-http.xml
+++ b/solr/server/etc/jetty-http.xml
@@ -34,7 +34,7 @@
<Item>
<New class="org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory">
<Arg name="config"><Ref refid="httpConfig" /></Arg>
- <Set name="maxConcurrentStreams">64</Set>
+ <Set name="maxConcurrentStreams">256</Set>
<Set name="inputBufferSize">4096</Set>
</New>
</Item>
diff --git a/solr/server/etc/jetty-https.xml b/solr/server/etc/jetty-https.xml
index 555090f..f49b6ce 100644
--- a/solr/server/etc/jetty-https.xml
+++ b/solr/server/etc/jetty-https.xml
@@ -54,7 +54,7 @@
<Item>
<New class="org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory">
<Arg name="config"><Ref refid="sslHttpConfig"/></Arg>
- <Set name="maxConcurrentStreams">64</Set>
+ <Set name="maxConcurrentStreams">256</Set>
<Set name="inputBufferSize">4096</Set>
</New>
</Item>
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
index e457074..e4872c7 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
@@ -197,7 +197,7 @@ public class CloudSolrClient extends BaseCloudSolrClient {
}
super.close();
- ObjectReleaseTracker.release(this);
+ assert ObjectReleaseTracker.release(this);
}
public LBHttpSolrClient getLbClient() {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClusterStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClusterStateProvider.java
index 102d269..167fa8a 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClusterStateProvider.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClusterStateProvider.java
@@ -51,6 +51,6 @@ public class HttpClusterStateProvider extends BaseHttpClusterStateProvider {
if (this.clientIsInternal) {
ParWork.close(httpClient);
}
- ObjectReleaseTracker.release(this);
+ assert ObjectReleaseTracker.release(this);
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrClient.java
index 3d9a36d..90018e8 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrClient.java
@@ -313,7 +313,7 @@ public class LBHttpSolrClient extends LBSolrClient {
closer.collect(urlToClient.values());
}
urlToClient.clear();
- ObjectReleaseTracker.release(this);
+ assert ObjectReleaseTracker.release(this);
}
/**
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 655a9ae..3163add 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -1127,11 +1127,8 @@ public class ZkStateReader implements SolrCloseable {
}
} catch (KeeperException e) {
log.error("Error reading cluster properties from zookeeper", SolrZkClient.checkInterrupted(e));
- if (e instanceof KeeperException.SessionExpiredException) {
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
- }
} catch (InterruptedException e) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted", e);
+ log.info("interrupted");
}
}
@@ -1472,7 +1469,12 @@ public class ZkStateReader implements SolrCloseable {
return;
}
log.debug("A collections change: [{}], has occurred - updating...", event);
- refreshAndWatch();
+ try {
+ refreshAndWatch();
+ } catch (Exception e) {
+ log.error("An error has occurred", e);
+ return;
+ }
synchronized (getUpdateLock()) {
constructState(Collections.emptySet());
}
@@ -1517,10 +1519,7 @@ public class ZkStateReader implements SolrCloseable {
refreshLiveNodes(this);
} catch (KeeperException e) {
log.error("A ZK error has occurred", e);
- throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
} catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
log.warn("Interrupted", e);
}
}
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
index 488924d..5c7d863 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
@@ -719,7 +719,7 @@ public class MiniSolrCloudCluster {
} finally {
System.clearProperty("zkHost");
- ObjectReleaseTracker.release(this);
+ assert ObjectReleaseTracker.release(this);
}
}
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
index c4f88cd..f94e246 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
@@ -403,7 +403,7 @@ public class ZkTestServer implements Closeable {
});
}
} finally {
- ObjectReleaseTracker.release(this);
+ assert ObjectReleaseTracker.release(this);
}
}
@@ -633,7 +633,7 @@ public class ZkTestServer implements Closeable {
startupWait = new CountDownLatch(1);
if (zooThread != null) {
- ObjectReleaseTracker.release(zooThread);
+ assert ObjectReleaseTracker.release(zooThread);
}
// zooThread.interrupt();
zooThread.join(10000);
@@ -641,7 +641,7 @@ public class ZkTestServer implements Closeable {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Zookeeper thread still running");
}
zooThread = null;
- ObjectReleaseTracker.release(this);
+ assert ObjectReleaseTracker.release(this);
// if (cleaupDir) {