You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/03/11 18:28:33 UTC
[lucene-solr] 02/02: @1456 Cleanup current proper waiting.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit fbff83681a0baa8c4edf51a61af6c9f66e92addd
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Mar 11 12:28:06 2021 -0600
@1456 Cleanup current proper waiting.
Took 25 minutes
---
.../src/java/org/apache/solr/cloud/Overseer.java | 76 ++++++++++++++++++----
.../CreateCollectionsIndexAndRestartTest.java | 2 +-
.../apache/solr/common/cloud/ZkStateReader.java | 2 +-
3 files changed, 65 insertions(+), 15 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index f200fe7..80bcf42 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -810,15 +810,24 @@ public class Overseer implements SolrCloseable {
public void start() throws KeeperException, InterruptedException {
- zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
- startItems = super.getItems();
- log.info("Overseer found entries on start {}", startItems);
- processQueueItems(startItems, true);
+ if (closed) return;
+ ourLock.lock();
+ try {
+ if (closed) return;
+ zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
+ startItems = super.getItems();
+ log.info("Overseer found entries on start {}", startItems);
+ processQueueItems(startItems, true);
+ } finally {
+ ourLock.unlock();
+ }
}
@Override
protected void processQueueItems(List<String> items, boolean onStart) {
+ if (closed) return;
List<String> fullPaths = new ArrayList<>(items.size());
+ ourLock.lock();
try {
if (log.isDebugEnabled()) log.debug("Found state update queue items {}", items);
for (String item : items) {
@@ -826,12 +835,13 @@ public class Overseer implements SolrCloseable {
}
Map<String,byte[]> data = zkController.getZkClient().getData(fullPaths);
+ Set<String> shardStateCollections = null;
List<Future> futures = new ArrayList<>();
for (byte[] item : data.values()) {
final ZkNodeProps message = ZkNodeProps.load(item);
try {
+ String operation = message.getStr(Overseer.QUEUE_OPERATION);
if (onStart) {
- String operation = message.getStr(Overseer.QUEUE_OPERATION);
if (operation.equals("state")) {
message.getProperties().remove(OverseerAction.DOWNNODE);
if (message.getProperties().size() == 1) {
@@ -839,6 +849,15 @@ public class Overseer implements SolrCloseable {
}
}
}
+
+ // hack
+ if (operation.equals("updateshardstate")) {
+ if (shardStateCollections == null) {
+ shardStateCollections = new HashSet<>();
+ }
+ shardStateCollections.add(message.getStr("collection"));
+ }
+
Future future = overseer.processQueueItem(message);
if (future != null) {
futures.add(future);
@@ -854,10 +873,31 @@ public class Overseer implements SolrCloseable {
log.error("failed waiting for enqueued updates", e);
}
}
-
+ futures.clear();
Set<String> collections = overseer.zkStateWriter.getDirtyStateCollections();
for (String collection : collections) {
- overseer.writePendingUpdates(collection);
+ futures.add(overseer.writePendingUpdates(collection));
+ }
+
+ for (Future future : futures) {
+ try {
+ future.get();
+ } catch (Exception e) {
+ log.error("failed waiting for enqueued updates", e);
+ }
+ }
+ futures.clear();
+ if (shardStateCollections != null) {
+ for (String collection : shardStateCollections) {
+ futures.add(overseer.writePendingUpdates(collection));
+ }
+ }
+ for (Future future : futures) {
+ try {
+ future.get();
+ } catch (Exception e) {
+ log.error("failed waiting for enqueued updates", e);
+ }
}
} finally {
@@ -869,7 +909,7 @@ public class Overseer implements SolrCloseable {
log.warn("Failed deleting processed items", e);
}
}
-
+ ourLock.unlock();
}
}
}
@@ -893,23 +933,32 @@ public class Overseer implements SolrCloseable {
@Override
public void close() {
+ super.close();
IOUtils.closeQuietly(collMessageHandler);
IOUtils.closeQuietly(configMessageHandler);
- super.close();
}
@Override
public void start() throws KeeperException, InterruptedException {
- zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
+ if (closed) return;
+ ourLock.lock();
+ try {
+ if (closed) return;
+ zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
- startItems = super.getItems();
+ startItems = super.getItems();
- log.info("Overseer found entries on start {}", startItems);
- processQueueItems(startItems, true);
+ log.info("Overseer found entries on start {}", startItems);
+ processQueueItems(startItems, true);
+ } finally {
+ ourLock.unlock();
+ }
}
@Override
protected void processQueueItems(List<String> items, boolean onStart) {
+ if (closed) return;
+ ourLock.lock();
List<String> fullPaths = new ArrayList<>(items.size());
try {
log.info("Found collection queue items {} onStart={}", items, onStart);
@@ -941,6 +990,7 @@ public class Overseer implements SolrCloseable {
} catch (Exception e) {
log.warn("Delete items failed {}", e.getMessage());
}
+ ourLock.unlock();
}
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java
index 0ba3acf..3e70f81 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java
@@ -56,7 +56,7 @@ public class CreateCollectionsIndexAndRestartTest extends SolrCloudTestCase {
@Test
public void start() throws Exception {
- int collectionCnt = 40;
+ int collectionCnt = 2;
List<Future> futures = new ArrayList<>();
List<Future> indexFutures = new ArrayList<>();
for (int i = 0; i < collectionCnt; i ++) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 65448e4..a278b0b 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -1855,7 +1855,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
Replica replica = docCollection.getReplicaById(id);
- if (log.isDebugEnabled()) log.debug("Got additional state update replica={} id={} ids={} {} {}", replica, id, docCollection.getReplicaByIds(), state == null ? "leader" : state);
+ if (log.isDebugEnabled()) log.debug("Got additional state update {} replica={} id={} ids={} {}", state == null ? "leader" : state, replica, id, docCollection.getReplicaByIds());
if (replica != null) {