You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/10/31 16:07:22 UTC
[lucene-solr] branch reference_impl_dev updated: @1100 Harden.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl_dev by this push:
new f02c615 @1100 Harden.
f02c615 is described below
commit f02c61538cbee3a4de1fc9fa195f98deb5663114
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Sat Oct 31 11:06:47 2020 -0500
@1100 Harden.
---
.../src/java/org/apache/solr/cloud/Overseer.java | 104 ++-----------------
.../apache/solr/cloud/OverseerTaskProcessor.java | 8 +-
.../org/apache/solr/cloud/OverseerTaskQueue.java | 3 +-
.../org/apache/solr/cloud/ZkDistributedQueue.java | 20 +++-
.../java/org/apache/solr/core/CoreContainer.java | 18 +---
.../src/java/org/apache/solr/core/CoreSorter.java | 4 +
.../src/java/org/apache/solr/core/SolrCore.java | 12 ++-
.../org/apache/solr/cloud/DeleteReplicaTest.java | 2 +-
.../org/apache/solr/cloud/DeleteShardTest.java | 4 -
.../org/apache/solr/cloud/MoveReplicaTest.java | 2 -
.../test/org/apache/solr/cloud/OverseerTest.java | 112 ---------------------
.../org/apache/solr/cloud/TestPrepRecovery.java | 6 +-
.../org/apache/solr/request/SimpleFacetsTest.java | 2 +
.../apache/solr/schema/CurrencyFieldTypeTest.java | 2 +
.../org/apache/solr/util/OrderedExecutorTest.java | 24 +++--
.../org/apache/solr/common/cloud/SolrZkClient.java | 4 +-
.../apache/solr/common/cloud/SolrZooKeeper.java | 59 ++++++-----
.../apache/solr/client/solrj/io/sql/JdbcTest.java | 1 +
.../java/org/apache/solr/cloud/ZkTestServer.java | 12 +--
19 files changed, 103 insertions(+), 296 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index c5582cd..45f4ebe 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -200,17 +200,12 @@ public class Overseer implements SolrCloseable {
private final String myId;
//queue where everybody can throw tasks
private final ZkDistributedQueue stateUpdateQueue;
- //TODO remove in 9.0, we do not push message into this queue anymore
- //Internal queue where overseer stores events that have not yet been published into cloudstate
- //If Overseer dies while extracting the main queue a new overseer will start from this queue
- private final ZkDistributedQueue workQueue;
private volatile boolean isClosed = false;
public ClusterStateUpdater(final ZkStateReader reader, final String myId, Stats zkStats) {
this.zkClient = reader.getZkClient();
this.stateUpdateQueue = getStateUpdateQueue(zkStats);
- this.workQueue = getInternalWorkQueue(zkClient, zkStats);
this.myId = myId;
this.reader = reader;
}
@@ -226,7 +221,7 @@ public class Overseer implements SolrCloseable {
log.info("Starting to work on the main queue : {}", LeaderElector.getNodeName(myId));
- ZkStateWriter zkStateWriter = null;
+ ZkStateWriter zkStateWriter = new ZkStateWriter(reader, stats);;
try {
reader.forciblyRefreshAllClusterStateSlow();
} catch (KeeperException e) {
@@ -239,83 +234,15 @@ public class Overseer implements SolrCloseable {
ClusterState clusterState = reader.getClusterState();
assert clusterState != null;
- // we write updates in batch, but if an exception is thrown when writing new clusterstate,
- // we do not sure which message is bad message, therefore we will re-process node one by one
- int fallbackQueueSize = Integer.MAX_VALUE;
- ZkDistributedQueue fallbackQueue = workQueue;
+
while (!checkClosed()) {
if (log.isDebugEnabled()) log.debug("Start of Overseer loop ...");
- if (zkStateWriter == null) {
- try {
- zkStateWriter = new ZkStateWriter(reader, stats);
- // clusterState = reader.getClusterState();
- // if there were any errors while processing
- // the state queue, items would have been left in the
- // work queue so let's process those first
- byte[] data = fallbackQueue.peek(null);
- // TODO: can we do this with a builk call instead?
- while (fallbackQueueSize > 0 && data != null) {
- final ZkNodeProps message = ZkNodeProps.load(data);
- if (log.isDebugEnabled()) log.debug("processMessage: fallbackQueueSize: {}, message = {}", fallbackQueue.getZkStats().getQueueLength(), message);
- // force flush to ZK after each message because there is no fallback if workQueue items
- // are removed from workQueue but fail to be written to ZK
- try {
- processQueueItem(message, reader.getClusterState(), zkStateWriter, false, null);
- } catch (InterruptedException | AlreadyClosedException e) {
- ParWork.propagateInterrupt(e);
- return;
- } catch (KeeperException.SessionExpiredException e) {
- log.warn("Solr cannot talk to ZK, exiting Overseer work queue loop", e);
- return;
- } catch (Exception e) {
- SolrException exp = new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
- try {
- if (isBadMessage(e)) {
- log.warn(
- "Exception when process message = {}, consider as bad message and poll out from the queue",
- message);
- fallbackQueue.poll(null);
- }
- } catch (InterruptedException e1) {
- ParWork.propagateInterrupt(e);
- return;
- } catch (Exception e1) {
- exp.addSuppressed(e1);
- }
- throw exp;
- }
- fallbackQueue.poll(null); // poll-ing removes the element we got by peek-ing
- data = fallbackQueue.peek(null);
- fallbackQueueSize--;
- }
- // force flush at the end of the loop, if there are no pending updates, this is a no op call
- clusterState = zkStateWriter.writePendingUpdates(clusterState, null);
- // the workQueue is empty now, use stateUpdateQueue as fallback queue
- fallbackQueue = stateUpdateQueue;
- fallbackQueueSize = 0;
- } catch (KeeperException.SessionExpiredException e) {
- log.error("run()", e);
-
- log.warn("Solr cannot talk to ZK, exiting Overseer work queue loop", e);
- return;
- } catch (InterruptedException | AlreadyClosedException e) {
- ParWork.propagateInterrupt(e, true);
- return;
- } catch (Exception e) {
- log.error("Unexpected error in Overseer state update loop", e);
- return;
-// if (!isClosed()) {
-// continue;
-// }
- }
- }
-
LinkedList<Pair<String, byte[]>> queue = null;
try {
// We do not need to filter any nodes here cause all processed nodes are removed once we flush clusterstate
- long wait = 5000;
+ long wait = 1000;
queue = new LinkedList<>(stateUpdateQueue.peekElements(1000, wait, (x) -> x.startsWith(OverseerTaskQueue.RESPONSE_PREFIX)));
} catch (AlreadyClosedException e) {
if (isClosed()) {
@@ -373,9 +300,7 @@ public class Overseer implements SolrCloseable {
if (log.isDebugEnabled()) log.debug("going to peekElements processedNodes={}", processedNodes);
queue = new LinkedList<>(stateUpdateQueue.peekElements(10, wait, node -> processedNodes.contains(node) || node.startsWith(OverseerTaskQueue.RESPONSE_PREFIX)));
}
- fallbackQueueSize = processedNodes.size();
- // we should force write all pending updates because the next iteration might sleep until there
- // are more items in the main queue
+
clusterState = zkStateWriter.writePendingUpdates(clusterState, () -> {
if (log.isDebugEnabled()) log.debug("clear processedNodes={}", processedNodes);
stateUpdateQueue.remove(processedNodes);
@@ -1042,24 +967,9 @@ public class Overseer implements SolrCloseable {
});
}
- /**
- * Internal overseer work queue. This should not be used outside of Overseer.
- * <p>
- * This queue is used to store overseer operations that have been removed from the
- * state update queue but are being executed as part of a batch. Once
- * the result of the batch is persisted to zookeeper, these items are removed from the
- * work queue. If the overseer dies while processing a batch then a new overseer always
- * operates from the work queue first and only then starts processing operations from the
- * state update queue.
- * This method will create the /overseer znode in ZooKeeper if it does not exist already.
- *
- * @param zkClient the {@link SolrZkClient} to be used for reading/writing to the queue
- * @param zkStats a {@link Stats} object which tracks statistics for all zookeeper operations performed by this queue
- * @return a {@link ZkDistributedQueue} object
- */
- static ZkDistributedQueue getInternalWorkQueue(final SolrZkClient zkClient, Stats zkStats) {
- return new ZkDistributedQueue(zkClient, "/overseer/queue-work", zkStats);
- }
+// static ZkDistributedQueue getInternalWorkQueue(final SolrZkClient zkClient, Stats zkStats) {
+// return new ZkDistributedQueue(zkClient, "/overseer/queue-work", zkStats);
+// }
/* Internal map for failed tasks, not to be used outside of the Overseer */
static DistributedMap getRunningMap(final SolrZkClient zkClient) throws KeeperException {
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
index 5c0f33c..0b6498d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
@@ -197,7 +197,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
while (runningTasksSize() > MAX_PARALLEL_TASKS) {
synchronized (waitLock) {
- waitLock.wait(1000);//wait for 1000 ms or till a task is complete
+ waitLock.wait(250);
}
waited = true;
}
@@ -208,14 +208,12 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
if (log.isDebugEnabled()) log.debug("Add {} blocked tasks to process", blockedTasks.size());
heads.addAll(blockedTasks.values());
blockedTasks.clear(); // clear it now; may get refilled below.
- //If we have enough items in the blocked tasks already, it makes
- // no sense to read more items from the work queue. it makes sense
- // to clear out at least a few items in the queue before we read more items
+
if (heads.size() < MAX_BLOCKED_TASKS) {
//instead of reading MAX_PARALLEL_TASKS items always, we should only fetch as much as we can execute
int toFetch = Math.min(MAX_BLOCKED_TASKS - heads.size(), MAX_PARALLEL_TASKS - runningTasksSize());
if (log.isDebugEnabled()) log.debug("PeekTopN for {} items", toFetch);
- List<QueueEvent> newTasks = workQueue.peekTopN(toFetch, excludedTasks, 5000);
+ List<QueueEvent> newTasks = workQueue.peekTopN(toFetch, excludedTasks, 10);
if (log.isDebugEnabled()) log.debug("Got {} tasks from work-queue : [{}]", newTasks.size(), newTasks);
heads.addAll(newTasks);
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
index 38c019e..eabc34f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
@@ -139,7 +139,7 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
try {
zookeeper.setData(responsePath, event.getBytes(), true);
} catch (KeeperException.NoNodeException ignored) {
- // this will often not exist or have been removed
+ // this will often not exist or have been removed - nocommit - debug this response stuff
if (log.isDebugEnabled()) log.debug("Response ZK path: {} doesn't exist.", responsePath);
}
try {
@@ -150,7 +150,6 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
updateLock.lockInterruptibly();
try {
knownChildren.remove(event.getId());
- knownChildren.put(responseId, event.getBytes());
} finally {
if (updateLock.isHeldByCurrentThread()) {
updateLock.unlock();
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
index 06941c5..a413df8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
@@ -204,7 +204,7 @@ public class ZkDistributedQueue implements DistributedQueue {
return result;
}
- ChildWatcher watcher = new ChildWatcher();
+ ChildWatcher watcher = new ChildWatcher(acceptFilter);
TreeMap<String,byte[]> foundChildren = fetchZkChildren(watcher, null);
if (foundChildren.size() > 0) {
@@ -318,7 +318,7 @@ public class ZkDistributedQueue implements DistributedQueue {
return result;
}
- ChildWatcher watcher = new ChildWatcher();
+ ChildWatcher watcher = new ChildWatcher(acceptFilter);
TreeMap<String,byte[]> foundChildren = fetchZkChildren(watcher, acceptFilter);
TimeOut timeout = new TimeOut(waitNanos, TimeUnit.NANOSECONDS, TimeSource.NANO_TIME);
@@ -555,7 +555,7 @@ public class ZkDistributedQueue implements DistributedQueue {
public Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Predicate<String> acceptFilter) throws KeeperException, InterruptedException {
if (log.isDebugEnabled()) log.debug("peekElements {} {}", max, acceptFilter);
List<Pair<String,byte[]>> result = null;
- ChildWatcher watcher = new ChildWatcher();
+ ChildWatcher watcher = new ChildWatcher(acceptFilter);
TreeMap<String,byte[]> foundChildren = fetchZkChildren(watcher, acceptFilter);
long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
TimeOut timeout = new TimeOut(waitNanos, TimeUnit.NANOSECONDS, TimeSource.NANO_TIME);
@@ -793,6 +793,12 @@ public class ZkDistributedQueue implements DistributedQueue {
@VisibleForTesting class ChildWatcher implements Watcher {
+ private final Predicate<String> acceptFilter;
+
+ public ChildWatcher(Predicate<String> acceptFilter) {
+ this.acceptFilter = acceptFilter;
+ }
+
@Override
public void process(WatchedEvent event) {
// session events are not change events, and do not remove the watcher; except for Expired
@@ -805,8 +811,12 @@ public class ZkDistributedQueue implements DistributedQueue {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
updateLock.lock();
try {
- fetchZkChildren(null, null);
- changed.signalAll();
+ TreeMap<String,byte[]> found = fetchZkChildren(null, acceptFilter);
+ if (!found.isEmpty()) {
+ changed.signalAll();
+ } else {
+ fetchZkChildren(this, acceptFilter);
+ }
} catch (KeeperException | InterruptedException e) {
log.error("", e);
} finally {
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index b77b20c..cb7ed71 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -42,6 +42,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -352,8 +353,9 @@ public class CoreContainer implements Closeable {
this.asyncSolrCoreLoad = asyncSolrCoreLoad;
- this.replayUpdatesExecutor = new OrderedExecutor( cfg.getReplayUpdatesThreads(),
- ParWork.getExecutorService(cfg.getReplayUpdatesThreads()));
+ this.replayUpdatesExecutor = new OrderedExecutor(cfg.getReplayUpdatesThreads(),
+ ParWork.getParExecutorService("replayUpdatesExecutor", cfg.getReplayUpdatesThreads(), cfg.getReplayUpdatesThreads(),
+ 0, new LinkedBlockingQueue<>(cfg.getReplayUpdatesThreads())));
metricManager = new SolrMetricManager(loader, cfg.getMetricsConfig());
String registryName = SolrMetricManager.getRegistryName(SolrInfoBean.Group.node);
@@ -394,18 +396,6 @@ public class CoreContainer implements Closeable {
solrCoreLoadExecutor = new PerThreadExecService(ParWork.getRootSharedExecutor(), Math.max(32, Runtime.getRuntime().availableProcessors()),
false, false);
- // if (solrCoreLoadExecutor == null) {
- // synchronized (CoreContainer.class) {
- // if (solrCoreLoadExecutor == null) {
- //// solrCoreLoadExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor(0, Math.max(3, Runtime.getRuntime().availableProcessors() / 2),
- //// 2, TimeUnit.SECOND,
- //// new BlockingArrayQueue<>(100, 10),
- //// new SolrNamedThreadFactory("SolrCoreLoader"));
- // solrCoreLoadExecutor = new ParWorkExecutor("SolrCoreLoader", Math.max(3, Runtime.getRuntime().availableProcessors() / 2));
- // }
- // }
- // }
-
}
@SuppressWarnings({"unchecked"})
diff --git a/solr/core/src/java/org/apache/solr/core/CoreSorter.java b/solr/core/src/java/org/apache/solr/core/CoreSorter.java
index 9797b93..618855f 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreSorter.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreSorter.java
@@ -94,6 +94,10 @@ public final class CoreSorter implements Comparator<CoreDescriptor> {
private final Map<String, CountsForEachShard> shardsVsReplicaCounts = new HashMap<>();
CoreSorter init(ZkController zkController, Collection<CoreDescriptor> coreDescriptors) {
+
+ assert zkController != null;
+ assert zkController.getCoreContainer() != null;
+ assert zkController.getCoreContainer().getNodeConfig() != null;
String myNodeName = zkController.getCoreContainer().getNodeConfig().getNodeName();
ClusterState state = zkController.getCoreContainer().getZkController().getClusterState();
for (CoreDescriptor coreDescriptor : coreDescriptors) {
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index d694b2d..3328138 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -28,6 +28,7 @@ import java.io.Writer;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Constructor;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -35,6 +36,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@@ -3107,16 +3109,18 @@ public final class SolrCore implements SolrInfoBean, Closeable {
public static void deleteUnloadedCore(CoreDescriptor cd, boolean deleteDataDir, boolean deleteInstanceDir) {
if (deleteDataDir) {
log.info("Removing SolrCore dataDir on unload {}", cd.getInstanceDir().resolve(cd.getDataDir()));
- File dataDir = cd.getInstanceDir().resolve(cd.getDataDir()).toFile();
+ Path dataDir = cd.getInstanceDir().resolve(cd.getDataDir());
try {
- FileUtils.deleteDirectory(dataDir);
+
+ Files.walk(dataDir).sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
+
} catch (Exception e) {
- log.error("Failed to delete data dir for unloaded core: {} dir: {}", cd.getName(), dataDir.getAbsolutePath(), e);
+ log.error("Failed to delete data dir for unloaded core: {} dir: {}", cd.getName(), dataDir, e);
}
}
if (deleteInstanceDir) {
try {
- FileUtils.deleteDirectory(cd.getInstanceDir().toFile());
+ Files.walk(cd.getInstanceDir()).sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
} catch (Exception e) {
log.error("Failed to delete instance dir for unloaded core: {} dir: {}", cd.getName(), cd.getInstanceDir(), e);
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
index f85408e..6c3716b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
@@ -391,7 +391,7 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
final String collectionName = "deleteReplicaOnIndexing";
CollectionAdminRequest.createCollection(collectionName, "conf", 1, 2)
.process(cluster.getSolrClient());
- cluster.waitForActiveCollection(collectionName, 10, TimeUnit.SECONDS, 1, 2);
+
AtomicBoolean closed = new AtomicBoolean(false);
List<Future> futures = new ArrayList<>(TEST_NIGHTLY ? 50 : 5);
Thread[] threads = new Thread[TEST_NIGHTLY ? 50 : 5];
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
index f8dcac0..62ce22e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
@@ -86,10 +86,6 @@ public class DeleteShardTest extends SolrCloudTestCase {
});
CollectionAdminRequest.deleteShard(collection, "shard2").process(cluster.getSolrClient());
- waitForState("Expected 'shard2' to be removed", collection, (n, c) -> {
- return c != null && c.getSlice("shard2") == null;
- });
-
}
protected void setSliceState(String collection, String slice, State state) throws Exception {
diff --git a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
index de5f808..145c32c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
@@ -187,8 +187,6 @@ public class MoveReplicaTest extends SolrCloudTestCase {
moveReplica.setInPlaceMove(inPlaceMove);
moveReplica.process(cloudClient);
- cluster.waitForActiveCollection(coll, create.getNumShards(), create.getNumShards() * (create.getNumNrtReplicas() + create.getNumPullReplicas() + create.getNumTlogReplicas()));
-
assertEquals(100, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound());
checkNumOfCores(cloudClient, replica.getNodeName(), coll, sourceNumCores);
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index 0f7e2e0..adf885d 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -813,49 +813,6 @@ public class OverseerTest extends SolrTestCaseJ4 {
}
}
- @Test
- public void testExceptionWhenFlushClusterState() throws Exception {
-
- SolrZkClient overseerClient = null;
- ZkStateReader reader = null;
-
- try {
-
- ZkController.createClusterZkNodes(zkClient);
-
- reader = new ZkStateReader(zkClient);
- reader.createClusterStateWatchersAndUpdate();
-
- // We did not create /collections/collection1 -> this message will cause exception when Overseer tries to flush
- // the collection state
- ZkNodeProps badMessage = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
- "name", "collection1",
- ZkStateReader.REPLICATION_FACTOR, "1",
- ZkStateReader.NUM_SHARDS_PROP, "1",
- "createNodeSet", "");
- ZkDistributedQueue workQueue = Overseer.getInternalWorkQueue(zkClient, new Stats());
- workQueue.offer(Utils.toJSON(badMessage));
- overseerClient = electNewOverseer(server.getZkAddress());
-
- ZkDistributedQueue q = getOpenOverseer().getStateUpdateQueue();
- q.offer(Utils.toJSON(badMessage));
-
- TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
- while(!timeOut.hasTimedOut()) {
- if (q.peek(null) == null) {
- break;
- }
- Thread.sleep(50);
- }
-
- assertTrue(showQpeek(workQueue), workQueue.peek(null) == null);
- assertTrue(showQpeek(q), q.peek(null) == null);
- } finally {
- close(overseerClient);
- close(reader);
- }
- }
-
private String showQpeek(ZkDistributedQueue q) throws KeeperException, InterruptedException {
if (q == null) {
return "";
@@ -1184,75 +1141,6 @@ public class OverseerTest extends SolrTestCaseJ4 {
}
}
-
- @Test
- public void testReplay() throws Exception{
-
- SolrZkClient overseerClient = null;
- ZkStateReader reader = null;
-
- try {
-
- ZkController.createClusterZkNodes(zkClient);
-
- reader = new ZkStateReader(zkClient);
- reader.createClusterStateWatchersAndUpdate();
- //prepopulate work queue with some items to emulate previous overseer died before persisting state
- DistributedQueue queue = Overseer.getInternalWorkQueue(zkClient, new Stats());
-
- zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + COLLECTION, false, true);
-
- ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
- "name", COLLECTION,
- ZkStateReader.REPLICATION_FACTOR, "1",
- ZkStateReader.NUM_SHARDS_PROP, "1",
- "createNodeSet", "");
- queue.offer(Utils.toJSON(m));
- m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
- ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
- ZkStateReader.NODE_NAME_PROP, "node1",
- ZkStateReader.SHARD_ID_PROP, "shard1",
- ZkStateReader.COLLECTION_PROP, COLLECTION,
- ZkStateReader.CORE_NAME_PROP, "core1",
- ZkStateReader.ROLES_PROP, "",
- ZkStateReader.STATE_PROP, Replica.State.RECOVERING.toString());
- queue.offer(Utils.toJSON(m));
- m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
- ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
- ZkStateReader.NODE_NAME_PROP, "node1",
- ZkStateReader.SHARD_ID_PROP, "shard1",
- ZkStateReader.COLLECTION_PROP, COLLECTION,
- ZkStateReader.CORE_NAME_PROP, "core2",
- ZkStateReader.ROLES_PROP, "",
- ZkStateReader.STATE_PROP, Replica.State.RECOVERING.toString());
- queue.offer(Utils.toJSON(m));
-
- overseerClient = electNewOverseer(server.getZkAddress());
-
- //submit to proper queue
- queue = overseers.get(0).getStateUpdateQueue();
- m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
- ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
- ZkStateReader.NODE_NAME_PROP, "node1",
- ZkStateReader.SHARD_ID_PROP, "shard1",
- ZkStateReader.COLLECTION_PROP, COLLECTION,
- ZkStateReader.CORE_NAME_PROP, "core3",
- ZkStateReader.ROLES_PROP, "",
- ZkStateReader.STATE_PROP, Replica.State.RECOVERING.toString());
- queue.offer(Utils.toJSON(m));
-
- reader.waitForState(COLLECTION, 1000, TimeUnit.MILLISECONDS,
- (liveNodes, collectionState) -> collectionState != null && collectionState.getSlice("shard1") != null
- && collectionState.getSlice("shard1").getReplicas().size() == 3);
-
- assertNotNull(reader.getClusterState().getCollection(COLLECTION).getSlice("shard1"));
- assertEquals(3, reader.getClusterState().getCollection(COLLECTION).getSlice("shard1").getReplicasMap().size());
- } finally {
- close(overseerClient);
- close(reader);
- }
- }
-
@Test
public void testExternalClusterStateChangeBehavior() throws Exception {
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestPrepRecovery.java b/solr/core/src/test/org/apache/solr/cloud/TestPrepRecovery.java
index 8f90b77..6160b9f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestPrepRecovery.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestPrepRecovery.java
@@ -70,6 +70,8 @@ public class TestPrepRecovery extends SolrCloudTestCase {
.setNode(newNodeName)
.process(solrClient);
+ cluster.waitForActiveCollection(collectionName, 1, 3);
+
// now delete the leader
Replica leader = solrClient.getZkStateReader().getLeaderRetry(collectionName, "shard1");
CollectionAdminRequest.deleteReplica(collectionName, "shard1", leader.getName())
@@ -80,12 +82,10 @@ public class TestPrepRecovery extends SolrCloudTestCase {
CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
.setNode(newNodeName)
.process(solrClient);
-
- // in the absence of the fixes made in SOLR-10914, this statement will timeout after 90s
- cluster.waitForActiveCollection(collectionName, 1, 3);
}
@Test
+ @Nightly
public void testLeaderNotResponding() throws Exception {
CloudHttp2SolrClient solrClient = cluster.getSolrClient();
diff --git a/solr/core/src/test/org/apache/solr/request/SimpleFacetsTest.java b/solr/core/src/test/org/apache/solr/request/SimpleFacetsTest.java
index b7cdbb2..9e851e9 100644
--- a/solr/core/src/test/org/apache/solr/request/SimpleFacetsTest.java
+++ b/solr/core/src/test/org/apache/solr/request/SimpleFacetsTest.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
+import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
@@ -47,6 +48,7 @@ import org.slf4j.LoggerFactory;
import static org.apache.solr.common.util.Utils.fromJSONString;
+@LuceneTestCase.Nightly
public class SimpleFacetsTest extends SolrTestCaseJ4 {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
diff --git a/solr/core/src/test/org/apache/solr/schema/CurrencyFieldTypeTest.java b/solr/core/src/test/org/apache/solr/schema/CurrencyFieldTypeTest.java
index 8da34d5..7b9c723 100644
--- a/solr/core/src/test/org/apache/solr/schema/CurrencyFieldTypeTest.java
+++ b/solr/core/src/test/org/apache/solr/schema/CurrencyFieldTypeTest.java
@@ -25,6 +25,7 @@ import java.util.Set;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.SolrParams;
@@ -37,6 +38,7 @@ import org.junit.Ignore;
import org.junit.Test;
/** Tests CurrencyField and CurrencyFieldType. */
+@LuceneTestCase.Nightly
public class CurrencyFieldTypeTest extends SolrTestCaseJ4 {
private final String fieldName;
private final Class<? extends ExchangeRateProvider> expectedProviderClass;
diff --git a/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java b/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
index 389df5b..f47dfa0 100644
--- a/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
+++ b/solr/core/src/test/org/apache/solr/util/OrderedExecutorTest.java
@@ -32,12 +32,14 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.SolrTestCase;
import org.apache.solr.common.ParWork;
+import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.OrderedExecutor;
import org.junit.Test;
@@ -50,8 +52,9 @@ public class OrderedExecutorTest extends SolrTestCase {
@Test
public void testExecutionInOrder() {
IntBox intBox = new IntBox();
- OrderedExecutor orderedExecutor = new OrderedExecutor(TEST_NIGHTLY ? 10 : 3,
- ParWork.getExecutorService(TEST_NIGHTLY ? 10 : 3));
+ OrderedExecutor orderedExecutor = new OrderedExecutor(10,
+ ParWork.getParExecutorService("replayUpdatesExecutor", 10, 10,
+ 0, new LinkedBlockingQueue<>(10)));
for (int i = 0; i < 100; i++) {
orderedExecutor.submit(1, () -> intBox.value.incrementAndGet());
@@ -62,8 +65,9 @@ public class OrderedExecutorTest extends SolrTestCase {
@Test
public void testLockWhenQueueIsFull() throws ExecutionException {
- final OrderedExecutor orderedExecutor = new OrderedExecutor
- (TEST_NIGHTLY ? 10 : 3, ParWork.getExecutorService(TEST_NIGHTLY ? 10 : 3));
+ OrderedExecutor orderedExecutor = new OrderedExecutor(10,
+ ParWork.getParExecutorService("replayUpdatesExecutor", 10, 10,
+ 0, new LinkedBlockingQueue<>(10)));
try {
// AAA and BBB events will both depend on the use of the same lockId
@@ -108,8 +112,9 @@ public class OrderedExecutorTest extends SolrTestCase {
public void testRunInParallel() throws ExecutionException, InterruptedException {
final int parallelism = atLeast(3);
- final OrderedExecutor orderedExecutor = new OrderedExecutor
- (parallelism, ParWork.getExecutorService(parallelism));
+ OrderedExecutor orderedExecutor = new OrderedExecutor(parallelism,
+ ParWork.getParExecutorService("replayUpdatesExecutor", parallelism, parallelism,
+ 0, new LinkedBlockingQueue<>(parallelism)));
try {
// distinct lockIds should be able to be used in parallel, up to the size of the executor,
@@ -121,7 +126,7 @@ public class OrderedExecutorTest extends SolrTestCase {
List<Future> futures = new ArrayList<>();
for (int i = 0; i < parallelism; i++) {
final int lockId = i;
- futures.add(testExecutor.submit(() -> {
+ futures.add(ParWork.getRootSharedExecutor().submit(() -> {
orderedExecutor.submit(lockId, () -> {
try {
log.info("Worker #{} starting", lockId);
@@ -212,8 +217,9 @@ public class OrderedExecutorTest extends SolrTestCase {
base.put(i, i);
run.put(i, i);
}
- OrderedExecutor orderedExecutor = new OrderedExecutor(TEST_NIGHTLY ? 10 : 3,
- ParWork.getExecutorService(TEST_NIGHTLY ? 10 : 3, true, true));
+ OrderedExecutor orderedExecutor = new OrderedExecutor(10,
+ ParWork.getParExecutorService("replayUpdatesExecutor", 10, 10,
+ 0, new LinkedBlockingQueue<>(10)));
try {
for (int i = 0; i < (TEST_NIGHTLY ? 1000 : 55); i++) {
int key = random().nextInt(N);
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 88c9bbc..3298204 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -742,9 +742,9 @@ public class SolrZkClient implements Closeable {
if (keCode == KeeperException.Code.NONODE) {
if (log.isDebugEnabled()) log.debug("No node found for {}", path1);
}
+ } else {
+ dataMap.put(path1, data);
}
-
- dataMap.put(path1, data);
latch.countDown();
}, null);
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java
index 5e7d5d7..faf1d61 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java
@@ -17,6 +17,7 @@
package org.apache.solr.common.cloud;
import java.io.IOException;
+import java.lang.invoke.MethodHandles;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@@ -32,21 +33,20 @@ import org.apache.solr.common.util.SuppressForbidden;
import org.apache.zookeeper.ClientCnxn;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
// we use this class to expose nasty stuff for tests
@SuppressWarnings({"try"})
public class SolrZooKeeper extends ZooKeeper {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
final Set<Thread> spawnedThreads = ConcurrentHashMap.newKeySet();
private CloseTracker closeTracker;
- // for test debug
- //static Map<SolrZooKeeper,Exception> clients = new ConcurrentHashMap<SolrZooKeeper,Exception>();
-
- public SolrZooKeeper(String connectString, int sessionTimeout,
- Watcher watcher) throws IOException {
+ public SolrZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException {
super(connectString, sessionTimeout, watcher);
assert (closeTracker = new CloseTracker()) != null;
- //clients.put(this, new RuntimeException());
}
public ClientCnxn getConnection() {
@@ -62,8 +62,7 @@ public class SolrZooKeeper extends ZooKeeper {
@Override
public void run() {
try {
- AccessController.doPrivileged(
- (PrivilegedAction<Void>) this::closeZookeeperChannel);
+ AccessController.doPrivileged((PrivilegedAction<Void>) this::closeZookeeperChannel);
} finally {
spawnedThreads.remove(this);
}
@@ -75,13 +74,11 @@ public class SolrZooKeeper extends ZooKeeper {
synchronized (cnxn) {
try {
- final Field sendThreadFld = cnxn.getClass()
- .getDeclaredField("sendThread");
+ final Field sendThreadFld = cnxn.getClass().getDeclaredField("sendThread");
sendThreadFld.setAccessible(true);
Object sendThread = sendThreadFld.get(cnxn);
if (sendThread != null) {
- Method method = sendThread.getClass()
- .getDeclaredMethod("testableCloseSocket");
+ Method method = sendThread.getClass().getDeclaredMethod("testableCloseSocket");
method.setAccessible(true);
try {
method.invoke(sendThread);
@@ -91,8 +88,7 @@ public class SolrZooKeeper extends ZooKeeper {
}
} catch (Exception e) {
ParWork.propagateInterrupt(e);
- throw new RuntimeException("Closing Zookeeper send channel failed.",
- e);
+ throw new RuntimeException("Closing Zookeeper send channel failed.", e);
}
}
return null; // Void
@@ -105,23 +101,26 @@ public class SolrZooKeeper extends ZooKeeper {
@Override
public void close() {
assert closeTracker.close();
- try (ParWork closer = new ParWork(this)) {
- closer.collect("zookeeper", ()->{
- try {
- SolrZooKeeper.super.close();
- } catch (InterruptedException e) {
- ParWork.propagateInterrupt(e);
- }
- });
-// closer.collect("keep send thread from sleeping", ()->{
-// // ZooKeeperExposed zk = new ZooKeeperExposed(this, cnxn);
-//
-// // zk.interruptSendThread();
-// // zk.interruptEventThread();
-// });
+ // try (ParWork closer = new ParWork(this)) {
+ // closer.collect("zookeeper", ()->{
+ // try {
+ // SolrZooKeeper.super.close();
+ // } catch (InterruptedException e) {
+ // ParWork.propagateInterrupt(e);
+ // }
+ // });
+ //// closer.collect("keep send thread from sleeping", ()->{
+ //// // ZooKeeperExposed zk = new ZooKeeperExposed(this, cnxn);
+ ////
+ //// // zk.interruptSendThread();
+ //// // zk.interruptEventThread();
+ //// });
+ try {
+ SolrZooKeeper.super.close();
+ } catch (InterruptedException e) {
+ ParWork.propagateInterrupt(e, true);
}
-
-
}
}
+
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/sql/JdbcTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/sql/JdbcTest.java
index 8ea1562..d684930 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/sql/JdbcTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/sql/JdbcTest.java
@@ -54,6 +54,7 @@ import org.junit.Test;
@Slow
@LuceneTestCase.SuppressCodecs({"Lucene3x", "Lucene40", "Lucene41", "Lucene42", "Lucene45"})
+@LuceneTestCase.Nightly
public class JdbcTest extends SolrCloudTestCase {
private static final String COLLECTIONORALIAS = "collection1";
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
index 6330a91..8650193 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
@@ -627,7 +627,6 @@ public class ZkTestServer implements Closeable {
}
}
- // zooThread.interrupt();
timer.cancel();
ParWork.close(chRootClient);
@@ -635,13 +634,14 @@ public class ZkTestServer implements Closeable {
startupWait = new CountDownLatch(1);
if (zooThread != null) {
+ // zooThread.interrupt();
+ zooThread.join(10000);
+ if (zooThread.isAlive()) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Zookeeper thread still running");
+ }
assert ObjectReleaseTracker.release(zooThread);
}
- // zooThread.interrupt();
- zooThread.join(10000);
- if (zooThread.isAlive()) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Zookeeper thread still running");
- }
+
zooThread = null;
assert ObjectReleaseTracker.release(this);