You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/11/09 06:36:22 UTC
[lucene-solr] branch reference_impl_dev updated: @1108 Harden
writing response zk node.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl_dev by this push:
new b00adcd @1108 Harden writing response zk node.
b00adcd is described below
commit b00adcd5e4a0504e8cfdc4a7aca7034540803ace
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Nov 9 00:35:30 2020 -0600
@1108 Harden writing response zk node.
---
.../src/java/org/apache/solr/cloud/Overseer.java | 264 +++++++++++----------
.../org/apache/solr/cloud/OverseerTaskQueue.java | 7 +-
2 files changed, 147 insertions(+), 124 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index c39335f..d921019 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -27,6 +27,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import org.apache.lucene.util.Version;
@@ -156,7 +157,7 @@ public class Overseer implements SolrCloseable {
private volatile boolean closeAndDone;
private volatile boolean initedHttpClient = false;
private volatile QueueWatcher queueWatcher;
- private volatile CollectionWorkQueueWatcher collectionQueueWatcher;
+ private volatile WorkQueueWatcher.CollectionWorkQueueWatcher collectionQueueWatcher;
public boolean isDone() {
return closeAndDone;
@@ -334,7 +335,7 @@ public class Overseer implements SolrCloseable {
//systemCollectionCompatCheck(new StringBiConsumer());
queueWatcher = new WorkQueueWatcher(getCoreContainer());
- collectionQueueWatcher = new CollectionWorkQueueWatcher(getCoreContainer(), id, overseerLbClient, adminPath, stats, Overseer.this);
+ collectionQueueWatcher = new WorkQueueWatcher.CollectionWorkQueueWatcher(getCoreContainer(), id, overseerLbClient, adminPath, stats, Overseer.this);
// TODO: don't track for a moment, can leak out of collection api tests
// assert ObjectReleaseTracker.track(this);
@@ -798,6 +799,7 @@ public class Overseer implements SolrCloseable {
protected final String path;
protected final Overseer overseer;
protected volatile boolean closed;
+ protected final ReentrantLock ourLock = new ReentrantLock(true);
public QueueWatcher(CoreContainer cc, String path) throws KeeperException {
this.cc = cc;
@@ -833,7 +835,7 @@ public class Overseer implements SolrCloseable {
}
@Override
- public synchronized void process(WatchedEvent event) {
+ public void process(WatchedEvent event) {
if (Event.EventType.None.equals(event.getType())) {
return;
}
@@ -841,14 +843,19 @@ public class Overseer implements SolrCloseable {
return;
}
- log.info("Overseer work queue has changed, processing...");
-
+ ourLock.lock();
try {
- List<String> items = setWatch();
+ log.info("Overseer work queue has changed, processing...");
- processQueueItems(items);
- } catch (Exception e) {
- log.error("Exception during overseer queue queue processing", e);
+ try {
+ List<String> items = setWatch();
+
+ processQueueItems(items);
+ } catch (Exception e) {
+ log.error("Exception during overseer queue queue processing", e);
+ }
+ } finally {
+ ourLock.unlock();
}
}
@@ -857,11 +864,16 @@ public class Overseer implements SolrCloseable {
@Override
public void close() {
- this.closed = true;
+ ourLock.lock();
try {
- zkController.getZkClient().getSolrZooKeeper().removeWatches(path, this, WatcherType.Data, true);
- } catch (Exception e) {
- log.info("", e.getMessage());
+ this.closed = true;
+ try {
+ zkController.getZkClient().getSolrZooKeeper().removeWatches(path, this, WatcherType.Data, true);
+ } catch (Exception e) {
+ log.info("", e.getMessage());
+ }
+ } finally {
+ ourLock.unlock();
}
}
}
@@ -873,153 +885,161 @@ public class Overseer implements SolrCloseable {
}
@Override
- protected synchronized void processQueueItems(List<String> items) {
- log.info("Found state update queue items {}", items);
- List<String> fullPaths = new ArrayList<>(items.size());
- for (String item : items) {
- fullPaths.add(path + "/" + item);
- }
+ protected void processQueueItems(List<String> items) {
+ ourLock.lock();
+ try {
+ log.info("Found state update queue items {}", items);
+ List<String> fullPaths = new ArrayList<>(items.size());
+ for (String item : items) {
+ fullPaths.add(path + "/" + item);
+ }
- Map<String,byte[]> data = zkController.getZkClient().getData(fullPaths);
+ Map<String,byte[]> data = zkController.getZkClient().getData(fullPaths);
+
+ for (byte[] item : data.values()) {
+ final ZkNodeProps message = ZkNodeProps.load(item);
+ try {
+ boolean success = overseer.processQueueItem(message);
+ } catch (InterruptedException e) {
+ log.error("Overseer state update queue processing interrupted");
+ return;
+ }
+ }
- for (byte[] item : data.values()) {
- final ZkNodeProps message = ZkNodeProps.load(item);
try {
- boolean success = overseer.processQueueItem(message);
+ overseer.writePendingUpdates();
} catch (InterruptedException e) {
log.error("Overseer state update queue processing interrupted");
return;
}
- }
- try {
- overseer.writePendingUpdates();
- } catch (InterruptedException e) {
- log.error("Overseer state update queue processing interrupted");
- return;
- }
+ zkController.getZkClient().delete(fullPaths, true);
- zkController.getZkClient().delete(fullPaths, true);
- }
- }
+ } finally {
- private static class CollectionWorkQueueWatcher extends QueueWatcher {
+ ourLock.unlock();
+ }
+ }
- private final OverseerCollectionMessageHandler collMessageHandler;
- private final OverseerConfigSetMessageHandler configMessageHandler;
- private final DistributedMap failureMap;
- private final DistributedMap runningMap;
+ private static class CollectionWorkQueueWatcher extends QueueWatcher {
- private final DistributedMap completedMap;
+ private final OverseerCollectionMessageHandler collMessageHandler;
+ private final OverseerConfigSetMessageHandler configMessageHandler;
+ private final DistributedMap failureMap;
+ private final DistributedMap runningMap;
- public CollectionWorkQueueWatcher(CoreContainer cc, String myId, LBHttp2SolrClient overseerLbClient, String adminPath, Stats stats, Overseer overseer) throws KeeperException {
- super(cc, Overseer.OVERSEER_COLLECTION_QUEUE_WORK);
- collMessageHandler = new OverseerCollectionMessageHandler(cc, myId, overseerLbClient, adminPath, stats, overseer);
- configMessageHandler = new OverseerConfigSetMessageHandler(cc);
- failureMap = Overseer.getFailureMap(cc.getZkController().getZkStateReader().getZkClient());
- runningMap = Overseer.getRunningMap(cc.getZkController().getZkStateReader().getZkClient());
- completedMap = Overseer.getCompletedMap(cc.getZkController().getZkStateReader().getZkClient());
- }
+ private final DistributedMap completedMap;
- @Override
- public void close() {
- super.close();
- IOUtils.closeQuietly(collMessageHandler);
- IOUtils.closeQuietly(configMessageHandler);
- }
+ public CollectionWorkQueueWatcher(CoreContainer cc, String myId, LBHttp2SolrClient overseerLbClient, String adminPath, Stats stats, Overseer overseer) throws KeeperException {
+ super(cc, Overseer.OVERSEER_COLLECTION_QUEUE_WORK);
+ collMessageHandler = new OverseerCollectionMessageHandler(cc, myId, overseerLbClient, adminPath, stats, overseer);
+ configMessageHandler = new OverseerConfigSetMessageHandler(cc);
+ failureMap = Overseer.getFailureMap(cc.getZkController().getZkStateReader().getZkClient());
+ runningMap = Overseer.getRunningMap(cc.getZkController().getZkStateReader().getZkClient());
+ completedMap = Overseer.getCompletedMap(cc.getZkController().getZkStateReader().getZkClient());
+ }
- @Override
- protected synchronized void processQueueItems(List<String> items) throws KeeperException {
- log.info("Found collection queue items {}", items);
- List<String> fullPaths = new ArrayList<>(items.size());
- for (String item : items) {
- fullPaths.add(path + "/" + item);
+ @Override
+ public void close() {
+ super.close();
+ IOUtils.closeQuietly(collMessageHandler);
+ IOUtils.closeQuietly(configMessageHandler);
}
- Map<String,byte[]> data = zkController.getZkClient().getData(fullPaths);
+ @Override
+ protected void processQueueItems(List<String> items) {
- ParWork.getRootSharedExecutor().submit(()->{
+ ourLock.lock();
try {
- runAsync(items, fullPaths, data);
- } catch (Exception e) {
- log.error("failed processing collection queue items " + items);
- }
- });
+ log.info("Found collection queue items {}", items);
+ List<String> fullPaths = new ArrayList<>(items.size());
+ for (String item : items) {
+ fullPaths.add(path + "/" + item);
+ }
- }
+ Map<String,byte[]> data = zkController.getZkClient().getData(fullPaths);
- private void runAsync(List<String> items, List<String> fullPaths, Map<String,byte[]> data) throws KeeperException {
- for (Map.Entry<String,byte[]> entry : data.entrySet()) {
- byte[] item = entry.getValue();
- if (item == null) {
- log.error("empty item {}", entry.getKey());
- continue;
+ ParWork.getRootSharedExecutor().submit(() -> {
+ try {
+ runAsync(items, fullPaths, data);
+ } catch (Exception e) {
+ log.error("failed processing collection queue items " + items);
+ }
+ });
+ } finally {
+ ourLock.unlock();
}
- final ZkNodeProps message = ZkNodeProps.load(item);
- try {
- String operation = message.getStr(Overseer.QUEUE_OPERATION);
- if (operation == null) {
- log.error("Msg does not have required " + Overseer.QUEUE_OPERATION + ": {}", message);
- continue;
- }
-
- final String asyncId = message.getStr(ASYNC);
+ }
- OverseerSolrResponse response;
- if (operation != null && operation.startsWith(CONFIGSETS_ACTION_PREFIX)) {
- response = configMessageHandler.processMessage(message, operation);
- } else {
- response = collMessageHandler.processMessage(message, operation);
+ private void runAsync(List<String> items, List<String> fullPaths, Map<String,byte[]> data) throws KeeperException {
+ for (Map.Entry<String,byte[]> entry : data.entrySet()) {
+ byte[] item = entry.getValue();
+ if (item == null) {
+ log.error("empty item {}", entry.getKey());
+ continue;
}
+ final ZkNodeProps message = ZkNodeProps.load(item);
+ try {
+ String operation = message.getStr(Overseer.QUEUE_OPERATION);
+ if (operation == null) {
+ log.error("Msg does not have required " + Overseer.QUEUE_OPERATION + ": {}", message);
+ continue;
+ }
-// try {
-// overseer.writePendingUpdates();
-// } catch (InterruptedException e) {
-// log.error("Overseer state update queue processing interrupted");
-// return;
-// }
+ final String asyncId = message.getStr(ASYNC);
- log.info("response {}", response);
+ OverseerSolrResponse response;
+ if (operation != null && operation.startsWith(CONFIGSETS_ACTION_PREFIX)) {
+ response = configMessageHandler.processMessage(message, operation);
+ } else {
+ response = collMessageHandler.processMessage(message, operation);
+ }
+ // try {
+ // overseer.writePendingUpdates();
+ // } catch (InterruptedException e) {
+ // log.error("Overseer state update queue processing interrupted");
+ // return;
+ // }
+
+ log.info("response {}", response);
+
+ if (asyncId != null) {
+ if (response != null && (response.getResponse().get("failure") != null || response.getResponse().get("exception") != null)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Updated failed map for task with id:[{}]", asyncId);
+ }
+ failureMap.put(asyncId, OverseerSolrResponseSerializer.serialize(response));
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Updated completed map for task with zkid:[{}]", asyncId);
+ }
+ completedMap.put(asyncId, OverseerSolrResponseSerializer.serialize(response));
- if (asyncId != null) {
- if (response != null && (response.getResponse().get("failure") != null || response.getResponse().get("exception") != null)) {
- if (log.isDebugEnabled()) {
- log.debug("Updated failed map for task with id:[{}]", asyncId);
}
- failureMap.put(asyncId, OverseerSolrResponseSerializer.serialize(response));
} else {
- if (log.isDebugEnabled()) {
- log.debug("Updated completed map for task with zkid:[{}]", asyncId);
- }
- completedMap.put(asyncId, OverseerSolrResponseSerializer.serialize(response));
-
+ byte[] sdata = OverseerSolrResponseSerializer.serialize(response);
+ String responsePath = Overseer.OVERSEER_COLLECTION_MAP_COMPLETED + "/" + OverseerTaskQueue.RESPONSE_PREFIX + entry.getKey().substring(entry.getKey().lastIndexOf("-") + 1);
+ zkController.getZkClient().setData(responsePath, sdata, true);
+ log.debug("Completed task:[{}] {}", message, response.getResponse());
}
- } else {
- byte[] sdata = OverseerSolrResponseSerializer.serialize(response);
- String responsePath = Overseer.OVERSEER_COLLECTION_MAP_COMPLETED + "/" + OverseerTaskQueue.RESPONSE_PREFIX
- + entry.getKey().substring(entry.getKey().lastIndexOf("-") + 1);
- zkController.getZkClient().setData( responsePath, sdata, true);
- log.debug("Completed task:[{}] {}", message, response.getResponse());
- }
-
- } catch (InterruptedException e) {
- log.error("Overseer state update queue processing interrupted");
- return;
+ } catch (InterruptedException e) {
+ log.error("Overseer state update queue processing interrupted");
+ return;
+ }
}
- }
- for (String item : items) {
- if (item.startsWith("qnr-")) {
- fullPaths.remove(path + "/" + item);
+ for (String item : items) {
+ if (item.startsWith("qnr-")) {
+ fullPaths.remove(path + "/" + item);
+ }
}
- }
- zkController.getZkClient().delete(fullPaths, true);
+ zkController.getZkClient().delete(fullPaths, true);
+ }
}
}
-
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
index 9c0f6f9..be4668f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
@@ -201,9 +201,12 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
for (;;) {
try {
return zookeeper.create(path, data, mode, true);
- } catch (KeeperException.NoNodeException e) {
+ } catch (KeeperException.NodeExistsException e) {
+ log.warn("Found request node already, waiting to see if it frees up ...");
+ // TODO: use a watch?
+ Thread.sleep(250);
try {
- zookeeper.create(dir, BYTES, CreateMode.PERSISTENT, true);
+ return zookeeper.create(path, data, mode, true);
} catch (KeeperException.NodeExistsException ne) {
// someone created it
}