You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/11/10 18:52:10 UTC

[lucene-solr] branch reference_impl_dev updated: @1139 Only write out the collections that have been updated.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl_dev by this push:
     new d2fa7c2  @1139 Only write out the collections that have been updated.
d2fa7c2 is described below

commit d2fa7c2eb00db74431c901e115cf80033604fe26
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Tue Nov 10 12:51:43 2020 -0600

    @1139 Only write out the collections that have been updated.
---
 settings.gradle                                    |   3 +
 .../client/solrj/embedded/JettySolrRunner.java     |   2 +-
 .../src/java/org/apache/solr/cloud/Overseer.java   |   5 +-
 .../solr/cloud/api/collections/AddReplicaCmd.java  |   2 +-
 .../cloud/api/collections/CreateCollectionCmd.java |   2 +-
 .../apache/solr/cloud/overseer/ZkStateWriter.java  | 143 +++++++++++----------
 .../org/apache/solr/common/cloud/SolrZkClient.java |   4 +-
 7 files changed, 86 insertions(+), 75 deletions(-)

diff --git a/settings.gradle b/settings.gradle
index e6f1795..4f39831 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -86,3 +86,6 @@ include "solr:benchmark"
 
 include "solr:benchmark"
 
+
+include "solr:benchmark"
+
diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
index 36aa894..8b911b4 100644
--- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
+++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
@@ -596,7 +596,7 @@ public class JettySolrRunner implements Closeable {
               if (!success) {
                 log.warn("Timedout waiting to see {} node in zk", ZkStateReader.COLLECTIONS_ZKNODE);
               }
-              log.info("Done waiting on latch");
+              if (log.isDebugEnabled()) log.debug("Done waiting on latch");
             } catch (InterruptedException e) {
               ParWork.propagateInterrupt(e);
               throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index a74ecee..9335299 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -846,8 +846,9 @@ public class Overseer implements SolrCloseable {
 
         try {
           List<String> items = setWatch();
-
-          processQueueItems(items);
+          if (items.size() > 0) {
+            processQueueItems(items);
+          }
         } catch (Exception e) {
           log.error("Exception during overseer queue queue processing", e);
         }
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
index 33b987a..679445d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
@@ -353,7 +353,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
   public static CreateReplica assignReplicaDetails(DocCollection coll,
                                                  ZkNodeProps message, ReplicaPosition replicaPosition) {
 
-    log.info("assignReplicaDetails {} {} {}", message, replicaPosition, coll);
+    log.info("assignReplicaDetails {} {}", message, replicaPosition);
 
     boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index 2cf5a41..a3bfab0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -233,7 +233,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
         if (log.isDebugEnabled()) log.debug("Sending state update to populate clusterstate with new replica {}", props);
 
         clusterState = new AddReplicaCmd(ocmh, true).call(clusterState, props, results).clusterState;
-        log.info("CreateCollectionCmd after add replica clusterstate={}", clusterState);
+        // log.info("CreateCollectionCmd after add replica clusterstate={}", clusterState);
 
         //clusterState = new SliceMutator(cloudManager).addReplica(clusterState, props);
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index edcc02f..832c835 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -18,7 +18,9 @@ package org.apache.solr.cloud.overseer;
 
 import java.lang.invoke.MethodHandles;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -35,6 +37,7 @@ import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.BoundedTreeSet;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -61,6 +64,7 @@ public class ZkStateWriter {
 
   private volatile ClusterState cs;
   private boolean dirty;
+  private Set<String> collectionsToWrite = new HashSet<>();
 
   public ZkStateWriter(ZkStateReader zkStateReader, Stats stats) {
     assert zkStateReader != null;
@@ -105,6 +109,9 @@ public class ZkStateWriter {
           }
         }
       });
+
+
+      collectionsToWrite.addAll(clusterState.getCollectionsMap().keySet());
       Collection<DocCollection> collections = cs.getCollectionsMap().values();
       for (DocCollection collection : collections) {
         if (clusterState.getCollectionOrNull(collection.getName()) == null) {
@@ -216,7 +223,7 @@ public class ZkStateWriter {
           }
         }
       });
-
+      collectionsToWrite.addAll(clusterState.getCollectionsMap().keySet());
     }
 
     if (stateUpdate) {
@@ -254,89 +261,91 @@ public class ZkStateWriter {
 
     // wait to see our last publish version has propagated
     cs.forEachCollection(collection -> {
-      Integer v = null;
-      try {
-        //System.out.println("waiting to see state " + prevVersion);
-        v = trackVersions.get(collection.getName());
-        if (v == null) v = 0;
-        if (v == 0) return;
-        Integer version = v;
+      if (collectionsToWrite.contains(collection.getName())) {
+        Integer v = null;
         try {
-          log.debug("wait to see last published version for collection {} {}", collection.getName(), v);
-          reader.waitForState(collection.getName(), 5, TimeUnit.SECONDS, (l, col) -> {
-                      if (col == null) {
-                        return true;
-                      }
-//                          if (col != null) {
-//                            log.info("the version " + col.getZNodeVersion());
-//                          }
-            if (col != null && col.getZNodeVersion() >= version) {
-              if (log.isDebugEnabled()) log.debug("Waited for ver: {}", col.getZNodeVersion() + 1);
-              // System.out.println("found the version");
-              return true;
-            }
-            return false;
-          });
-        } catch (InterruptedException e) {
-          ParWork.propagateInterrupt(e);
-          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+          //System.out.println("waiting to see state " + prevVersion);
+          v = trackVersions.get(collection.getName());
+          if (v == null) v = 0;
+          if (v == 0) return;
+          Integer version = v;
+          try {
+            log.debug("wait to see last published version for collection {} {}", collection.getName(), v);
+            reader.waitForState(collection.getName(), 5, TimeUnit.SECONDS, (l, col) -> {
+              if (col == null) {
+                return true;
+              }
+              //                          if (col != null) {
+              //                            log.info("the version " + col.getZNodeVersion());
+              //                          }
+              if (col != null && col.getZNodeVersion() >= version) {
+                if (log.isDebugEnabled()) log.debug("Waited for ver: {}", col.getZNodeVersion() + 1);
+                // System.out.println("found the version");
+                return true;
+              }
+              return false;
+            });
+          } catch (InterruptedException e) {
+            ParWork.propagateInterrupt(e);
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+          }
+        } catch (TimeoutException e) {
+          log.warn("Timeout waiting to see written cluster state come back " + v);
         }
-      } catch (TimeoutException e) {
-        log.warn("Timeout waiting to see written cluster state come back " + v);
       }
 
     });
 
 
     cs.forEachCollection(collection -> {
+      if (collectionsToWrite.contains(collection.getName())) {
+        String name = collection.getName();
+        String path = ZkStateReader.getCollectionPath(collection.getName());
+        if (log.isDebugEnabled()) log.debug("process {}", collection);
+        Stat stat = new Stat();
+        boolean success = false;
+        try {
 
-      String name = collection.getName();
-      String path = ZkStateReader.getCollectionPath(collection.getName());
-      if (log.isDebugEnabled()) log.debug("process {}", collection);
-      Stat stat = new Stat();
-      boolean success = false;
-      try {
-
+          byte[] data = Utils.toJSON(singletonMap(name, collection));
 
-        byte[] data = Utils.toJSON(singletonMap(name, collection));
+          if (log.isDebugEnabled()) log.debug("Write state.json prevVersion={} bytes={} col={}", collection.getZNodeVersion(), data.length, collection);
 
-        if (log.isDebugEnabled()) log.debug("Write state.json prevVersion={} bytes={} col={}", collection.getZNodeVersion(), data.length, collection);
+          try {
+            int version = collection.getZNodeVersion();
+            Integer v = trackVersions.get(collection.getName());
+            if (v != null) {
+              version = v;
+            }
 
-        try {
-          int version = collection.getZNodeVersion();
-          Integer v = trackVersions.get(collection.getName());
-          if (v != null) {
-            version = v;
+            reader.getZkClient().setData(path, data, version == 0 ? -1 : version, true);
+
+            trackVersions.put(collection.getName(), version + 1);
+          } catch (KeeperException.NoNodeException e) {
+            if (log.isDebugEnabled()) log.debug("No node found for state.json", e);
+            trackVersions.remove(collection.getName());
+            // likely deleted
+          } catch (KeeperException.BadVersionException bve) {
+            lastFailedException.set(bve);
+            failedUpdates.put(collection.getName(), collection);
+            stat = reader.getZkClient().exists(path, null);
+            // this is a tragic error, we must disallow usage of this instance
+            log.warn("Tried to update the cluster state using version={} but we where rejected, found {}", collection.getZNodeVersion(), stat.getVersion(), bve);
           }
-
-
-          reader.getZkClient().setData(path, data, version == 0 ? -1 : version, true);
-
-          trackVersions.put(collection.getName(), version + 1);
-        } catch (KeeperException.NoNodeException e) {
-          if (log.isDebugEnabled()) log.debug("No node found for state.json", e);
-          trackVersions.remove(collection.getName());
-          // likely deleted
-        } catch (KeeperException.BadVersionException bve) {
-          lastFailedException.set(bve);
-          failedUpdates.put(collection.getName(), collection);
-          stat = reader.getZkClient().exists(path, null);
-          // this is a tragic error, we must disallow usage of this instance
-          log.warn("Tried to update the cluster state using version={} but we where rejected, found {}", collection.getZNodeVersion(), stat.getVersion(), bve);
+          if (log.isDebugEnabled()) log.debug("Set version for local collection {} to {}", collection.getName(), collection.getZNodeVersion() + 1);
+        } catch (InterruptedException | AlreadyClosedException e) {
+          log.info("We have been closed or one of our resources has, bailing {}", e.getClass().getSimpleName() + ":" + e.getMessage());
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+        } catch (KeeperException.SessionExpiredException e) {
+          log.error("", e);
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+        } catch (Exception e) {
+          log.error("Failed processing update=" + collection, e);
         }
-        if (log.isDebugEnabled()) log.debug("Set version for local collection {} to {}", collection.getName(), collection.getZNodeVersion() + 1);
-      } catch (InterruptedException | AlreadyClosedException e) {
-        log.info("We have been closed or one of our resources has, bailing {}", e.getClass().getSimpleName() + ":" + e.getMessage());
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
-      } catch (KeeperException.SessionExpiredException e) {
-        log.error("", e);
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
-      } catch (Exception e) {
-        log.error("Failed processing update=" + collection, e);
       }
     });
-
+    
     dirty = false;
+    collectionsToWrite.clear();
 
     // nocommit - harden against failures and exceptions
 
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index f352a88..b7a607e 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -777,7 +777,6 @@ public class SolrZkClient implements Closeable {
     log.info("delete paths {} wait={}", paths, wait);
     CountDownLatch latch = null;
     if (wait) {
-      log.info("setup countdown latch {}", paths.size());
       latch = new CountDownLatch(paths.size());
     }
     for (String path : paths) {
@@ -809,7 +808,6 @@ public class SolrZkClient implements Closeable {
     if (wait) {
       boolean success;
       try {
-        log.info("watch on countdownlatch {}", "15s");
         success = latch.await(15, TimeUnit.SECONDS);
         log.info("done waiting on latch, success={}", success);
       } catch (InterruptedException e) {
@@ -824,7 +822,7 @@ public class SolrZkClient implements Closeable {
         throw e;
       }
     }
-    log.error("done with delete {} {}", paths, wait);
+    if (log.isDebugEnabled()) log.debug("done with delete {} {}", paths, wait);
   }
 
   // Calls setData for a list of existing paths in parallel