You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/11/09 06:36:22 UTC

[lucene-solr] branch reference_impl_dev updated: @1108 Harden writing response zk node.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl_dev by this push:
     new b00adcd  @1108 Harden writing response zk node.
b00adcd is described below

commit b00adcd5e4a0504e8cfdc4a7aca7034540803ace
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Nov 9 00:35:30 2020 -0600

    @1108 Harden writing response zk node.
---
 .../src/java/org/apache/solr/cloud/Overseer.java   | 264 +++++++++++----------
 .../org/apache/solr/cloud/OverseerTaskQueue.java   |   7 +-
 2 files changed, 147 insertions(+), 124 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index c39335f..d921019 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -27,6 +27,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.BiConsumer;
 
 import org.apache.lucene.util.Version;
@@ -156,7 +157,7 @@ public class Overseer implements SolrCloseable {
   private volatile boolean closeAndDone;
   private volatile boolean initedHttpClient = false;
   private volatile QueueWatcher queueWatcher;
-  private volatile CollectionWorkQueueWatcher collectionQueueWatcher;
+  private volatile WorkQueueWatcher.CollectionWorkQueueWatcher collectionQueueWatcher;
 
   public boolean isDone() {
     return closeAndDone;
@@ -334,7 +335,7 @@ public class Overseer implements SolrCloseable {
     //systemCollectionCompatCheck(new StringBiConsumer());
 
     queueWatcher = new WorkQueueWatcher(getCoreContainer());
-    collectionQueueWatcher = new CollectionWorkQueueWatcher(getCoreContainer(), id, overseerLbClient, adminPath, stats, Overseer.this);
+    collectionQueueWatcher = new WorkQueueWatcher.CollectionWorkQueueWatcher(getCoreContainer(), id, overseerLbClient, adminPath, stats, Overseer.this);
 
     // TODO: don't track for a moment, can leak out of collection api tests
     // assert ObjectReleaseTracker.track(this);
@@ -798,6 +799,7 @@ public class Overseer implements SolrCloseable {
     protected final String path;
     protected final Overseer overseer;
     protected volatile boolean closed;
+    protected final ReentrantLock ourLock = new ReentrantLock(true);
 
     public QueueWatcher(CoreContainer cc, String path) throws KeeperException {
       this.cc = cc;
@@ -833,7 +835,7 @@ public class Overseer implements SolrCloseable {
     }
 
     @Override
-    public synchronized void process(WatchedEvent event) {
+    public void process(WatchedEvent event) {
       if (Event.EventType.None.equals(event.getType())) {
         return;
       }
@@ -841,14 +843,19 @@ public class Overseer implements SolrCloseable {
         return;
       }
 
-      log.info("Overseer work queue has changed, processing...");
-
+      ourLock.lock();
       try {
-        List<String> items = setWatch();
+        log.info("Overseer work queue has changed, processing...");
 
-        processQueueItems(items);
-      } catch (Exception e) {
-        log.error("Exception during overseer queue queue processing", e);
+        try {
+          List<String> items = setWatch();
+
+          processQueueItems(items);
+        } catch (Exception e) {
+          log.error("Exception during overseer queue queue processing", e);
+        }
+      } finally {
+        ourLock.unlock();
       }
 
     }
@@ -857,11 +864,16 @@ public class Overseer implements SolrCloseable {
 
     @Override
     public void close() {
-      this.closed = true;
+      ourLock.lock();
       try {
-        zkController.getZkClient().getSolrZooKeeper().removeWatches(path, this, WatcherType.Data, true);
-      } catch (Exception e) {
-        log.info("", e.getMessage());
+        this.closed = true;
+        try {
+          zkController.getZkClient().getSolrZooKeeper().removeWatches(path, this, WatcherType.Data, true);
+        } catch (Exception e) {
+          log.info("", e.getMessage());
+        }
+      } finally {
+        ourLock.unlock();
       }
     }
   }
@@ -873,153 +885,161 @@ public class Overseer implements SolrCloseable {
     }
 
     @Override
-    protected synchronized void processQueueItems(List<String> items) {
-      log.info("Found state update queue items {}", items);
-      List<String> fullPaths = new ArrayList<>(items.size());
-      for (String item : items) {
-        fullPaths.add(path + "/" + item);
-      }
+    protected void processQueueItems(List<String> items) {
+      ourLock.lock();
+      try {
+        log.info("Found state update queue items {}", items);
+        List<String> fullPaths = new ArrayList<>(items.size());
+        for (String item : items) {
+          fullPaths.add(path + "/" + item);
+        }
 
-      Map<String,byte[]> data = zkController.getZkClient().getData(fullPaths);
+        Map<String,byte[]> data = zkController.getZkClient().getData(fullPaths);
+
+        for (byte[] item : data.values()) {
+          final ZkNodeProps message = ZkNodeProps.load(item);
+          try {
+            boolean success = overseer.processQueueItem(message);
+          } catch (InterruptedException e) {
+            log.error("Overseer state update queue processing interrupted");
+            return;
+          }
+        }
 
-      for (byte[] item : data.values()) {
-        final ZkNodeProps message = ZkNodeProps.load(item);
         try {
-          boolean success = overseer.processQueueItem(message);
+          overseer.writePendingUpdates();
         } catch (InterruptedException e) {
           log.error("Overseer state update queue processing interrupted");
           return;
         }
-      }
 
-      try {
-        overseer.writePendingUpdates();
-      } catch (InterruptedException e) {
-        log.error("Overseer state update queue processing interrupted");
-        return;
-      }
+        zkController.getZkClient().delete(fullPaths, true);
 
-      zkController.getZkClient().delete(fullPaths, true);
-    }
-  }
+      } finally {
 
-  private static class CollectionWorkQueueWatcher extends QueueWatcher {
+        ourLock.unlock();
+      }
+    }
 
-    private final OverseerCollectionMessageHandler collMessageHandler;
-    private final OverseerConfigSetMessageHandler configMessageHandler;
-    private final DistributedMap failureMap;
-    private final DistributedMap runningMap;
+    private static class CollectionWorkQueueWatcher extends QueueWatcher {
 
-    private final DistributedMap completedMap;
+      private final OverseerCollectionMessageHandler collMessageHandler;
+      private final OverseerConfigSetMessageHandler configMessageHandler;
+      private final DistributedMap failureMap;
+      private final DistributedMap runningMap;
 
-    public CollectionWorkQueueWatcher(CoreContainer cc, String myId, LBHttp2SolrClient overseerLbClient, String adminPath, Stats stats, Overseer overseer) throws KeeperException {
-      super(cc, Overseer.OVERSEER_COLLECTION_QUEUE_WORK);
-      collMessageHandler = new OverseerCollectionMessageHandler(cc, myId, overseerLbClient, adminPath, stats, overseer);
-      configMessageHandler = new OverseerConfigSetMessageHandler(cc);
-      failureMap = Overseer.getFailureMap(cc.getZkController().getZkStateReader().getZkClient());
-      runningMap = Overseer.getRunningMap(cc.getZkController().getZkStateReader().getZkClient());
-      completedMap = Overseer.getCompletedMap(cc.getZkController().getZkStateReader().getZkClient());
-    }
+      private final DistributedMap completedMap;
 
-    @Override
-    public void close() {
-      super.close();
-      IOUtils.closeQuietly(collMessageHandler);
-      IOUtils.closeQuietly(configMessageHandler);
-    }
+      public CollectionWorkQueueWatcher(CoreContainer cc, String myId, LBHttp2SolrClient overseerLbClient, String adminPath, Stats stats, Overseer overseer) throws KeeperException {
+        super(cc, Overseer.OVERSEER_COLLECTION_QUEUE_WORK);
+        collMessageHandler = new OverseerCollectionMessageHandler(cc, myId, overseerLbClient, adminPath, stats, overseer);
+        configMessageHandler = new OverseerConfigSetMessageHandler(cc);
+        failureMap = Overseer.getFailureMap(cc.getZkController().getZkStateReader().getZkClient());
+        runningMap = Overseer.getRunningMap(cc.getZkController().getZkStateReader().getZkClient());
+        completedMap = Overseer.getCompletedMap(cc.getZkController().getZkStateReader().getZkClient());
+      }
 
-    @Override
-    protected synchronized void processQueueItems(List<String> items) throws KeeperException {
-      log.info("Found collection queue items {}", items);
-      List<String> fullPaths = new ArrayList<>(items.size());
-      for (String item : items) {
-        fullPaths.add(path + "/" + item);
+      @Override
+      public void close() {
+        super.close();
+        IOUtils.closeQuietly(collMessageHandler);
+        IOUtils.closeQuietly(configMessageHandler);
       }
 
-      Map<String,byte[]> data = zkController.getZkClient().getData(fullPaths);
+      @Override
+      protected void processQueueItems(List<String> items) {
 
-      ParWork.getRootSharedExecutor().submit(()->{
+        ourLock.lock();
         try {
-          runAsync(items, fullPaths, data);
-        } catch (Exception e) {
-          log.error("failed processing collection queue items " + items);
-        }
-      });
+          log.info("Found collection queue items {}", items);
+          List<String> fullPaths = new ArrayList<>(items.size());
+          for (String item : items) {
+            fullPaths.add(path + "/" + item);
+          }
 
-    }
+          Map<String,byte[]> data = zkController.getZkClient().getData(fullPaths);
 
-    private void runAsync(List<String> items, List<String> fullPaths, Map<String,byte[]> data) throws KeeperException {
-      for (Map.Entry<String,byte[]> entry : data.entrySet()) {
-        byte[] item = entry.getValue();
-        if (item == null) {
-          log.error("empty item {}", entry.getKey());
-          continue;
+          ParWork.getRootSharedExecutor().submit(() -> {
+            try {
+              runAsync(items, fullPaths, data);
+            } catch (Exception e) {
+              log.error("failed processing collection queue items " + items);
+            }
+          });
+        } finally {
+          ourLock.unlock();
         }
 
-        final ZkNodeProps message = ZkNodeProps.load(item);
-        try {
-          String operation = message.getStr(Overseer.QUEUE_OPERATION);
-          if (operation == null) {
-            log.error("Msg does not have required " + Overseer.QUEUE_OPERATION + ": {}", message);
-            continue;
-          }
-
-          final String asyncId = message.getStr(ASYNC);
+      }
 
-          OverseerSolrResponse response;
-          if (operation != null && operation.startsWith(CONFIGSETS_ACTION_PREFIX)) {
-            response = configMessageHandler.processMessage(message, operation);
-          } else {
-            response = collMessageHandler.processMessage(message, operation);
+      private void runAsync(List<String> items, List<String> fullPaths, Map<String,byte[]> data) throws KeeperException {
+        for (Map.Entry<String,byte[]> entry : data.entrySet()) {
+          byte[] item = entry.getValue();
+          if (item == null) {
+            log.error("empty item {}", entry.getKey());
+            continue;
           }
 
+          final ZkNodeProps message = ZkNodeProps.load(item);
+          try {
+            String operation = message.getStr(Overseer.QUEUE_OPERATION);
+            if (operation == null) {
+              log.error("Msg does not have required " + Overseer.QUEUE_OPERATION + ": {}", message);
+              continue;
+            }
 
-//          try {
-//            overseer.writePendingUpdates();
-//          } catch (InterruptedException e) {
-//            log.error("Overseer state update queue processing interrupted");
-//            return;
-//          }
+            final String asyncId = message.getStr(ASYNC);
 
-          log.info("response {}", response);
+            OverseerSolrResponse response;
+            if (operation != null && operation.startsWith(CONFIGSETS_ACTION_PREFIX)) {
+              response = configMessageHandler.processMessage(message, operation);
+            } else {
+              response = collMessageHandler.processMessage(message, operation);
+            }
 
+            //          try {
+            //            overseer.writePendingUpdates();
+            //          } catch (InterruptedException e) {
+            //            log.error("Overseer state update queue processing interrupted");
+            //            return;
+            //          }
+
+            log.info("response {}", response);
+
+            if (asyncId != null) {
+              if (response != null && (response.getResponse().get("failure") != null || response.getResponse().get("exception") != null)) {
+                if (log.isDebugEnabled()) {
+                  log.debug("Updated failed map for task with id:[{}]", asyncId);
+                }
+                failureMap.put(asyncId, OverseerSolrResponseSerializer.serialize(response));
+              } else {
+                if (log.isDebugEnabled()) {
+                  log.debug("Updated completed map for task with zkid:[{}]", asyncId);
+                }
+                completedMap.put(asyncId, OverseerSolrResponseSerializer.serialize(response));
 
-          if (asyncId != null) {
-            if (response != null && (response.getResponse().get("failure") != null || response.getResponse().get("exception") != null)) {
-              if (log.isDebugEnabled()) {
-                log.debug("Updated failed map for task with id:[{}]", asyncId);
               }
-              failureMap.put(asyncId, OverseerSolrResponseSerializer.serialize(response));
             } else {
-              if (log.isDebugEnabled()) {
-                log.debug("Updated completed map for task with zkid:[{}]", asyncId);
-              }
-              completedMap.put(asyncId, OverseerSolrResponseSerializer.serialize(response));
-
+              byte[] sdata = OverseerSolrResponseSerializer.serialize(response);
+              String responsePath = Overseer.OVERSEER_COLLECTION_MAP_COMPLETED + "/" + OverseerTaskQueue.RESPONSE_PREFIX + entry.getKey().substring(entry.getKey().lastIndexOf("-") + 1);
+              zkController.getZkClient().setData(responsePath, sdata, true);
+              log.debug("Completed task:[{}] {}", message, response.getResponse());
             }
-          } else {
-            byte[] sdata = OverseerSolrResponseSerializer.serialize(response);
-            String responsePath = Overseer.OVERSEER_COLLECTION_MAP_COMPLETED + "/" + OverseerTaskQueue.RESPONSE_PREFIX
-                + entry.getKey().substring(entry.getKey().lastIndexOf("-") + 1);
-            zkController.getZkClient().setData( responsePath, sdata, true);
-            log.debug("Completed task:[{}] {}", message, response.getResponse());
-          }
 
-
-        } catch (InterruptedException e) {
-          log.error("Overseer state update queue processing interrupted");
-          return;
+          } catch (InterruptedException e) {
+            log.error("Overseer state update queue processing interrupted");
+            return;
+          }
         }
-      }
 
-      for (String item : items) {
-        if (item.startsWith("qnr-")) {
-          fullPaths.remove(path + "/" + item);
+        for (String item : items) {
+          if (item.startsWith("qnr-")) {
+            fullPaths.remove(path + "/" + item);
+          }
         }
-      }
 
-      zkController.getZkClient().delete(fullPaths, true);
+        zkController.getZkClient().delete(fullPaths, true);
+      }
     }
   }
-
 }
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
index 9c0f6f9..be4668f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
@@ -201,9 +201,12 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
     for (;;) {
       try {
         return zookeeper.create(path, data, mode, true);
-      } catch (KeeperException.NoNodeException e) {
+      } catch (KeeperException.NodeExistsException e) {
+        log.warn("Found request node already, waiting to see if it frees up ...");
+        // TODO: use a watch?
+        Thread.sleep(250);
         try {
-          zookeeper.create(dir, BYTES, CreateMode.PERSISTENT, true);
+          return zookeeper.create(path, data, mode, true);
         } catch (KeeperException.NodeExistsException ne) {
           // someone created it
         }