You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/03/05 01:45:25 UTC

[lucene-solr] branch reference_impl updated: @1424 Return a few things that were tmp out but needed.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl by this push:
     new 5d16e5e  @1424 Return a few things that were tmp out but needed.
5d16e5e is described below

commit 5d16e5e6adef34694ab5b5ca9ba34b5e9e95a945
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Mar 4 19:44:44 2021 -0600

    @1424 Return a few things that were tmp out but needed.
---
 .../src/java/org/apache/solr/cloud/Overseer.java   | 33 ++++++-----
 .../org/apache/solr/cloud/OverseerTaskQueue.java   | 22 +++++--
 .../apache/solr/cloud/overseer/ZkStateWriter.java  |  8 +--
 .../java/org/apache/solr/core/CoreContainer.java   | 69 +++++++++++-----------
 .../solr/handler/admin/RequestSyncShardOp.java     |  1 -
 .../org/apache/solr/common/cloud/SolrZkClient.java | 11 ++--
 .../apache/solr/common/cloud/ZkCmdExecutor.java    | 14 ++---
 .../src/java/org/apache/solr/SolrTestCase.java     |  6 +-
 .../src/java/org/apache/solr/SolrTestCaseJ4.java   |  4 --
 9 files changed, 91 insertions(+), 77 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 1a45047..6b6535f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -52,6 +52,7 @@ import org.apache.solr.core.CoreContainer;
 import org.apache.solr.handler.admin.CollectionsHandler;
 import org.apache.solr.logging.MDCLoggingContext;
 import org.apache.solr.update.UpdateShardHandler;
+import org.apache.zookeeper.AddWatchMode;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
@@ -354,8 +355,12 @@ public class Overseer implements SolrCloseable {
 
     queueWatcher = new WorkQueueWatcher(getCoreContainer());
     collectionQueueWatcher = new WorkQueueWatcher.CollectionWorkQueueWatcher(getCoreContainer(), id, overseerLbClient, adminPath, stats, Overseer.this);
-    queueWatcher.start();
-    collectionQueueWatcher.start();
+    try {
+      queueWatcher.start();
+      collectionQueueWatcher.start();
+    } catch (InterruptedException e) {
+      log.warn("interrupted", e);
+    }
 
 
     closed = false;
@@ -764,29 +769,26 @@ public class Overseer implements SolrCloseable {
       this.path = path;
     }
 
-    public abstract void start();
+    public abstract void start() throws KeeperException, InterruptedException;
 
-    private List<String> setWatch() {
+    private List<String> getItems() {
       try {
 
         if (log.isDebugEnabled()) log.debug("set watch on Overseer work queue {}", path);
-        closeWatcher();
-        List<String> children = zkController.getZkClient().getChildren(path, this, true);
+
+        List<String> children = zkController.getZkClient().getChildren(path, null, null, true, true);
 
         List<String> items = new ArrayList<>(children);
         Collections.sort(items);
         return items;
       } catch (KeeperException.SessionExpiredException e) {
         log.warn("ZooKeeper session expired");
-        overseer.close();
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
       } catch (InterruptedException | AlreadyClosedException e) {
         log.info("Already closed");
-        overseer.close();
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
       } catch (Exception e) {
         log.error("Unexpected error in Overseer state update loop", e);
-        overseer.close();
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
       }
     }
@@ -804,7 +806,7 @@ public class Overseer implements SolrCloseable {
       ourLock.lock();
       try {
         try {
-          List<String> items = setWatch();
+          List<String> items = getItems();
           if (items.size() > 0) {
             processQueueItems(items, false);
           }
@@ -845,8 +847,9 @@ public class Overseer implements SolrCloseable {
     }
 
 
-    public void start() {
-      startItems = super.setWatch();
+    public void start() throws KeeperException, InterruptedException {
+      zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
+      startItems = super.getItems();
       log.info("Overseer found entries on start {}", startItems);
       processQueueItems(startItems, true);
     }
@@ -915,8 +918,10 @@ public class Overseer implements SolrCloseable {
       }
 
       @Override
-      public void start() {
-        startItems = super.setWatch();
+      public void start() throws KeeperException, InterruptedException {
+        zkController.getZkClient().addWatch(path, this, AddWatchMode.PERSISTENT);
+
+        startItems = super.getItems();
 
         log.info("Overseer found entries on start {}", startItems);
         processQueueItems(startItems, true);
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
index e920dd3..8a20352 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
@@ -29,6 +29,7 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -150,13 +151,22 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
 
       if (log.isDebugEnabled()) log.debug("{} fired on path {} state {} latchEventType {}", event.getType(), event.getPath(), event.getState());
 
-      this.event = event;
-
-      lock.lock();
+      Stat stat = null;
       try {
-        eventReceived.signalAll();
-      } finally {
-        lock.unlock();
+        stat = zkClient.exists(path, null, true);
+      } catch (KeeperException e) {
+        log.error("exists failed", e);
+      } catch (InterruptedException e) {
+        log.error("interrupted", e);
+      }
+      if (stat != null && stat.getDataLength() > 0) {
+        lock.lock();
+        try {
+          this.event = event;
+          eventReceived.signalAll();
+        } finally {
+          lock.unlock();
+        }
       }
     }
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 732330f..06470f4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -475,12 +475,12 @@ public class ZkStateWriter {
               lastVersion.set(version);
               if (log.isDebugEnabled()) log.debug("Write state.json prevVersion={} bytes={} col={}", version, data.length, collection);
 
-              reader.getZkClient().setData(path, data, version, true, false);
+              reader.getZkClient().setData(path, data, version, true, true);
               trackVersions.put(collection.getName(), version + 1);
               if (dirtyStructure.contains(collection.getName())) {
                 if (log.isDebugEnabled()) log.debug("structure change in {}", collection.getName());
                 dirtyStructure.remove(collection.getName());
-                reader.getZkClient().setData(pathSCN, null, -1, true, false);
+                reader.getZkClient().setData(pathSCN, null, -1, true, true);
 
                 ZkNodeProps updates = stateUpdates.get(collection.getName());
                 if (updates != null) {
@@ -488,7 +488,7 @@ public class ZkStateWriter {
                  String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(collection.getName());
                  if (log.isDebugEnabled()) log.debug("write state updates for collection {} {}", collection.getName(), updates);
                   try {
-                    reader.getZkClient().setData(stateUpdatesPath, Utils.toJSON(updates), -1, true, false);
+                    reader.getZkClient().setData(stateUpdatesPath, Utils.toJSON(updates), -1, true, true);
                   } catch (KeeperException.NoNodeException e) {
                     if (log.isDebugEnabled()) log.debug("No node found for " + stateUpdatesPath, e);
                     lastVersion.set(-1);
@@ -567,7 +567,7 @@ public class ZkStateWriter {
     if (log.isDebugEnabled()) log.debug("write state updates for collection {} {}", collection.getName(), updates);
     dirtyState.remove(collection.getName());
     try {
-      reader.getZkClient().setData(stateUpdatesPath, Utils.toJSON(updates), -1, true, false);
+      reader.getZkClient().setData(stateUpdatesPath, Utils.toJSON(updates), -1, true, true);
     } catch (KeeperException.NoNodeException e) {
       if (log.isDebugEnabled()) log.debug("No node found for state.json", e);
       lastVersion.set(-1);
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index af5ae29..4118149 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -139,6 +139,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
@@ -859,40 +860,40 @@ public class CoreContainer implements Closeable {
     status |= CORE_DISCOVERY_COMPLETE;
     startedLoadingCores = true;
 
-//    if (isZooKeeperAware()) {
-//
-//      log.info("Waiting to see RECOVERY states for node on startup ...");
-//      for (final CoreDescriptor cd : cds) {
-//        String collection = cd.getCollectionName();
-//        try {
-//          getZkController().getZkStateReader().waitForState(collection, 5, TimeUnit.SECONDS, (n, c) -> {
-//            if (c == null) {
-//              if (log.isDebugEnabled()) log.debug("Found  incorrect state c={}", c);
-//              return false;
-//            }
-//            String nodeName = getZkController().getNodeName();
-//            List<Replica> replicas = c.getReplicas();
-//            for (Replica replica : replicas) {
-//              if (replica.getNodeName().equals(nodeName)) {
-//                if (!replica.getState().equals(Replica.State.RECOVERING)) {
-//                  if (log.isDebugEnabled()) log.debug("Found  incorrect state {} {} ourNodeName={}", replica.getState(), replica.getNodeName(), nodeName);
-//                  return false;
-//                }
-//              } else {
-//                if (log.isDebugEnabled()) log.debug("Found  incorrect state {} {} ourNodeName={}", replica.getState(), replica.getNodeName(), nodeName);
-//              }
-//            }
-//
-//            return true;
-//          });
-//        } catch (InterruptedException e) {
-//          ParWork.propagateInterrupt(e);
-//          return;
-//        } catch (TimeoutException e) {
-//          log.error("Timeout", e);
-//        }
-//      }
-//    }
+    if (isZooKeeperAware()) {
+
+      log.info("Waiting to see RECOVERY states for node on startup ...");
+      for (final CoreDescriptor cd : cds) {
+        String collection = cd.getCollectionName();
+        try {
+          getZkController().getZkStateReader().waitForState(collection, 5, TimeUnit.SECONDS, (n, c) -> {
+            if (c == null) {
+              if (log.isDebugEnabled()) log.debug("Found  incorrect state c={}", c);
+              return false;
+            }
+            String nodeName = getZkController().getNodeName();
+            List<Replica> replicas = c.getReplicas();
+            for (Replica replica : replicas) {
+              if (replica.getNodeName().equals(nodeName)) {
+                if (!replica.getState().equals(Replica.State.RECOVERING)) {
+                  if (log.isDebugEnabled()) log.debug("Found  incorrect state {} {} ourNodeName={}", replica.getState(), replica.getNodeName(), nodeName);
+                  return false;
+                }
+              } else {
+                if (log.isDebugEnabled()) log.debug("Found  incorrect state {} {} ourNodeName={}", replica.getState(), replica.getNodeName(), nodeName);
+              }
+            }
+
+            return true;
+          });
+        } catch (InterruptedException e) {
+          ParWork.propagateInterrupt(e);
+          return;
+        } catch (TimeoutException e) {
+          log.error("Timeout", e);
+        }
+      }
+    }
 
     for (final CoreDescriptor cd : cds) {
       if (!cd.isTransient() && cd.isLoadOnStartup()) {
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/RequestSyncShardOp.java b/solr/core/src/java/org/apache/solr/handler/admin/RequestSyncShardOp.java
index 1374559..0da037e 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/RequestSyncShardOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/RequestSyncShardOp.java
@@ -46,7 +46,6 @@ class RequestSyncShardOp implements CoreAdminHandler.CoreAdminOp {
     log.info("I have been requested to sync up my shard");
 
     String cname = params.required().get(CoreAdminParams.CORE);
-    String id = params.required().get("id");
 
     ZkController zkController = it.handler.coreContainer.getZkController();
     if (zkController == null) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index cfcc550..c71babd 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -172,13 +172,12 @@ public class SolrZkClient implements Closeable {
       @Override
       public boolean isClosed() {
         try {
-          if (higherLevelIsClosed != null) {
-            return SolrZkClient.this.higherLevelIsClosed.isClosed() || isClosed;
-          } else {
-            return  isClosed;
-          }
-        } catch (NullPointerException e) {
+
           return isClosed;
+
+        } catch (NullPointerException e) {
+          log.error("ZkClient is null", e);
+          throw e;
         }
       }
     });
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
index 4fbc22e..084a4b3 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkCmdExecutor.java
@@ -59,9 +59,9 @@ public class ZkCmdExecutor {
   @SuppressWarnings("unchecked")
   public static <T> T retryOperation(ZkCmdExecutor zkCmdExecutor, ZkOperation operation, boolean retryOnSessionExp)
       throws KeeperException, InterruptedException {
-//    if (zkCmdExecutor.solrZkClient.isClosed()) {
-//      throw new AlreadyClosedException("SolrZkClient is already closed");
-//    }
+    if (zkCmdExecutor.solrZkClient.isClosed()) {
+      throw new AlreadyClosedException("SolrZkClient is already closed");
+    }
     KeeperException exception = null;
     int tryCnt = 0;
     while (tryCnt < zkCmdExecutor.retryCount) {
@@ -71,13 +71,13 @@ public class ZkCmdExecutor {
         if (!retryOnSessionExp && e instanceof KeeperException.SessionExpiredException) {
           throw e;
         }
-        log.warn(e.getClass().getSimpleName());
+        log.warn("retryOperation", e);
         if (exception == null) {
           exception = e;
         }
-        if (zkCmdExecutor.solrZkClient.isClosed()) {
-          throw e;
-        }
+//        if (zkCmdExecutor.solrZkClient.isClosed()) {
+//          throw e;
+//        }
         zkCmdExecutor.retryDelay(tryCnt);
       }
       tryCnt++;
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
index c172366..bfb382f 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
@@ -302,6 +302,10 @@ public class SolrTestCase extends Assert {
       SysStats.reStartSysStats();
     }
 
+    // other  methods like starting a jetty instance need these too
+    System.setProperty("solr.test.sys.prop1", "propone");
+    System.setProperty("solr.test.sys.prop2", "proptwo");
+
     // random is expensive, you are supposed to cache it
     random = LuceneTestCase.random();
 
@@ -366,7 +370,7 @@ public class SolrTestCase extends Assert {
     System.setProperty("solr.tests.EnumFieldType", "org.apache.solr.schema.EnumFieldType");
     System.setProperty("solr.tests.numeric.dv", "true");
 
-    System.setProperty("managed.schema.mutable", "false");
+    //System.setProperty("managed.schema.mutable", "false");
 
     if (!LuceneTestCase.TEST_NIGHTLY) {
       //TestInjection.randomDelayMaxInCoreCreationInSec = 2;
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
index c632c2c..d11471c 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
@@ -577,10 +577,6 @@ public abstract class SolrTestCaseJ4 extends SolrTestCase {
       System.setProperty("solr.directoryFactory","solr.RAMDirectoryFactory");
     }
 
-    // other  methods like starting a jetty instance need these too
-    System.setProperty("solr.test.sys.prop1", "propone");
-    System.setProperty("solr.test.sys.prop2", "proptwo");
-
     String configFile = getSolrConfigFile();
     if (configFile != null) {
       createCore();