You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/07/18 23:06:15 UTC

[lucene-solr] branch reference_impl updated: @236 - Start putting some things in order

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl by this push:
     new 490eaf4  @236 - Start putting some things in order
490eaf4 is described below

commit 490eaf4d9825cc84e5e02b01ce8b7975658e30d8
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Sat Jul 18 18:05:56 2020 -0500

    @236 - Start putting some things in order
---
 solr/core/src/java/org/apache/solr/api/ApiBag.java |  2 +-
 .../src/java/org/apache/solr/cloud/Overseer.java   | 60 ++++++++++++----------
 .../apache/solr/cloud/overseer/ZkStateWriter.java  | 55 ++++++++++----------
 .../java/org/apache/solr/update/CommitTracker.java |  8 ++-
 4 files changed, 68 insertions(+), 57 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/api/ApiBag.java b/solr/core/src/java/org/apache/solr/api/ApiBag.java
index 172e03e..0577228 100644
--- a/solr/core/src/java/org/apache/solr/api/ApiBag.java
+++ b/solr/core/src/java/org/apache/solr/api/ApiBag.java
@@ -69,7 +69,7 @@ public class ApiBag {
   /**Register a POJO annotated with {@link EndPoint}
    * @param o the instance to be used for invocations
    */
-  public synchronized List<Api> registerObject(Object o) {
+  public List<Api> registerObject(Object o) {
     List<Api> l = AnnotatedApi.getApis(o);
     for (Api api : l) {
       register(api, Collections.EMPTY_MAP);
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 4cfa32f..5aa96ae 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -198,6 +198,7 @@ public class Overseer implements SolrCloseable {
     private final ZkDistributedQueue workQueue;
 
     private volatile boolean isClosed = false;
+    private int lastVersion;
 
     public ClusterStateUpdater(final ZkStateReader reader, final String myId, Stats zkStats) {
       this.zkClient = reader.getZkClient();
@@ -317,7 +318,7 @@ public class Overseer implements SolrCloseable {
           LinkedList<Pair<String, byte[]>> queue = null;
           try {
             // We do not need to filter any nodes here cause all processed nodes are removed once we flush clusterstate
-            queue = new LinkedList<>(stateUpdateQueue.peekElements(1000, 10000L, (x) -> true));
+            queue = new LinkedList<>(stateUpdateQueue.peekElements(1000, 2000L, (x) -> true));
           } catch (InterruptedException | AlreadyClosedException e) {
             ParWork.propegateInterrupt(e);
             return;
@@ -373,13 +374,15 @@ public class Overseer implements SolrCloseable {
             return;
           } catch (Exception e) {
             log.error("Unexpected error in Overseer state update loop", e);
-            try {
-              Thread.sleep(1000);
-            } catch (InterruptedException interruptedException) {
-              ParWork.propegateInterrupt(e);
-              return;
+            if (!isClosed()) {
+              try {
+                Thread.sleep(1000);
+              } catch (InterruptedException interruptedException) {
+                ParWork.propegateInterrupt(e);
+                return;
+              }
+              continue;
             }
-            continue;
           }
         }
       } finally {
@@ -416,33 +419,29 @@ public class Overseer implements SolrCloseable {
       return false;
     }
 
-    private ClusterState processQueueItem(ZkNodeProps message, final ClusterState clusterState, ZkStateWriter zkStateWriter, boolean enableBatching, ZkStateWriter.ZkWriteCallback callback) throws Exception {
+    private ClusterState processQueueItem(ZkNodeProps message, ClusterState clusterState, ZkStateWriter zkStateWriter, boolean enableBatching, ZkStateWriter.ZkWriteCallback callback) throws Exception {
       if (log.isDebugEnabled()) log.debug("Consume state update from queue {}", message);
-      assert clusterState != null;
-      AtomicReference<ClusterState> state = new AtomicReference<>();
+     // assert clusterState != null;
 
-      final String operation = message.getStr(QUEUE_OPERATION);
-      if (operation == null) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Message missing " + QUEUE_OPERATION + ":" + message);
-      }
-      AtomicBoolean stop = new AtomicBoolean(false);
+      ClusterState cs = null;
+    //  if (clusterState.getZNodeVersion() == 0 || clusterState.getZNodeVersion() > lastVersion) {
 
-      // ### expert use
-      ParWork.getExecutor().invokeAll(Collections.singleton(() -> {
+
+        final String operation = message.getStr(QUEUE_OPERATION);
+        if (operation == null) {
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Message missing " + QUEUE_OPERATION + ":" + message);
+        }
 
         List<ZkWriteCommand> zkWriteOps = processMessage(clusterState, message, operation);
         ZkStateWriter zkStateWriter1 = new ZkStateWriter(zkController.getZkStateReader(), new Stats());
-        ClusterState cs = zkStateWriter1.enqueueUpdate(clusterState, zkWriteOps,
+        cs = zkStateWriter1.enqueueUpdate(clusterState, zkWriteOps,
                 () -> {
                   // log.info("on write callback");
                 });
-        state.set(cs);
-        return null;
+        lastVersion = cs.getZNodeVersion();
+    //  }
 
-
-      }));
-
-      return (state.get() != null ? state.get() : clusterState);
+      return cs;
     }
 
     private List<ZkWriteCommand> processMessage(ClusterState clusterState,
@@ -563,6 +562,11 @@ public class Overseer implements SolrCloseable {
     @Override
     public void close() throws IOException {
       thread.close();
+      try {
+        join(10000);
+      } catch (InterruptedException e) {
+        throw new RuntimeException("Interrupted waiting to close");
+      }
       this.isClosed = true;
     }
 
@@ -839,7 +843,7 @@ public class Overseer implements SolrCloseable {
 
   @Override
   public boolean isClosed() {
-    return closed;
+    return closed || zkController.getCoreContainer().isShutDown();
   }
 
   void doClose() {
@@ -849,18 +853,18 @@ public class Overseer implements SolrCloseable {
     try (ParWork closer = new ParWork(this, true)) {
 
       closer.collect(() -> {
-        IOUtils.closeQuietly(ccThread);
         ccThread.interrupt();
+        IOUtils.closeQuietly(ccThread);
       });
 
       closer.collect(() -> {
-        IOUtils.closeQuietly(updaterThread);
         updaterThread.interrupt();
+        IOUtils.closeQuietly(updaterThread);
       });
 
       closer.collect(() -> {
-        IOUtils.closeQuietly(triggerThread);
         triggerThread.interrupt();
+        IOUtils.closeQuietly(triggerThread);
       });
 
       closer.addCollect("OverseerInternals");
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 9a6a75b..dac61f4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -194,13 +194,13 @@ public class ZkStateWriter {
                       prevState.getZNodeVersion());
             }
 
-           // assert c.getStateFormat() > 1;
+            // assert c.getStateFormat() > 1;
             // stat = reader.getZkClient().getCurator().checkExists().forPath(path);
 
             prevVersion = prevState.getCollection(c.getName()).getZNodeVersion();
-            Map<String,Slice> existingSlices = prevState.getCollection(c.getName()).getSlicesMap();
+            Map<String, Slice> existingSlices = prevState.getCollection(c.getName()).getSlicesMap();
 
-            Map<String,Slice> newSliceMap = new HashMap<>(existingSlices.size() + 1);
+            Map<String, Slice> newSliceMap = new HashMap<>(existingSlices.size() + 1);
 
             if (log.isDebugEnabled()) {
               log.debug("Existing slices {}", existingSlices);
@@ -214,12 +214,13 @@ public class ZkStateWriter {
               log.debug("Add collection {}", c);
             }
 
+            DocCollection finalC = c;
             prevState.getCollection(c.getName()).getSlicesMap().forEach((sliceId, slice) -> {
               Collection<Replica> replicas = slice.getReplicas();
 
-              Map<String,Replica> newReplicas = new HashMap<>();
+              Map<String, Replica> newReplicas = new HashMap<>();
 
-              Map<String,Object> newProps = new HashMap<>();
+              Map<String, Object> newProps = new HashMap<>();
 
               newProps.putAll(slice.getProperties());
 
@@ -232,11 +233,11 @@ public class ZkStateWriter {
 
               replicas.forEach((replica) -> newReplicas.put(replica.getName(), replica));
 
-              c.getSlice(sliceId).getReplicas().forEach((replica) -> {
+              finalC.getSlice(sliceId).getReplicas().forEach((replica) -> {
                 newReplicas.put(replica.getName(), replica);
               });
 
-              Slice newSlice = new Slice(sliceId, newReplicas, newProps, c.getName());
+              Slice newSlice = new Slice(sliceId, newReplicas, newProps, finalC.getName());
               newSliceMap.put(sliceId, newSlice);
 
             });
@@ -250,19 +251,19 @@ public class ZkStateWriter {
             LinkedHashMap collStates = new LinkedHashMap<>(prevState.getCollectionsMap());
             collStates.put(name, new ClusterState.CollectionRef(newCollection));
             newClusterState = new ClusterState(prevState.getLiveNodes(), collStates, prevVersion);
-
+            c = newClusterState.getCollection(name);
             byte[] data = Utils.toJSON(singletonMap(c.getName(), newCollection));
 
             if (log.isDebugEnabled()) {
               log.debug("Write state.json bytes={} cs={}", data.length, newClusterState);
             }
-           // stat = reader.getZkClient().getCurator().setData().withVersion(prevVersion).forPath(path, data);
-            stat =  reader.getZkClient().setData(path, data, prevVersion, true);
+            // stat = reader.getZkClient().getCurator().setData().withVersion(prevVersion).forPath(path, data);
+            stat = reader.getZkClient().setData(path, data, prevVersion, true);
           } else {
             if (log.isDebugEnabled()) {
               log.debug("writePendingUpdates() - going to create_collection {}", path);
             }
-         //   assert c.getStateFormat() > 1;
+            //   assert c.getStateFormat() > 1;
             DocCollection newCollection = new DocCollection(name, c.getSlicesMap(), c.getProperties(), c.getRouter(),
                     0, path);
 
@@ -279,15 +280,16 @@ public class ZkStateWriter {
             try {
               prevVersion = 0;
               reader.getZkClient().create(path, data, CreateMode.PERSISTENT, true);
-            } catch(KeeperException.NodeExistsException e) {
-              stat =  reader.getZkClient().setData(path, data, -1, true);
+            } catch (KeeperException.NodeExistsException e) {
+              stat = reader.getZkClient().setData(path, data, -1, true);
             }
           }
 
         } catch (Exception e) {
           if (e instanceof KeeperException.BadVersionException) {
             // nocommit invalidState = true;
-            if (log.isDebugEnabled()) log.debug("Tried to update the cluster state using version={} but we where rejected, currently at {}", prevVersion, ((KeeperException.BadVersionException) e).getMessage(), e);
+            if (log.isDebugEnabled())
+              log.debug("Tried to update the cluster state using version={} but we where rejected, currently at {}", prevVersion, ((KeeperException.BadVersionException) e).getMessage(), e);
             throw (KeeperException.BadVersionException) e;
           }
           ParWork.propegateInterrupt(e);
@@ -298,19 +300,20 @@ public class ZkStateWriter {
 
         updates.clear();
         // numUpdates = 0;
-        try {
-          reader.waitForState(c.getName(), 5, TimeUnit.SECONDS,
-                  (l, col) -> {
-                    if (col != null && col.getZNodeVersion() > prevState.getZNodeVersion()) {
-                      if (log.isDebugEnabled()) log.debug("Waited for ver: {}", col.getZNodeVersion());
-                      return true;
-                    }
-                    return false;
-                  });
-        } catch (TimeoutException e) {
-          throw new RuntimeException(e);
+        if (c != null) {
+          try {
+            reader.waitForState(c.getName(), 5, TimeUnit.SECONDS,
+                    (l, col) -> {
+                      if (col != null && col.getZNodeVersion() > prevState.getZNodeVersion()) {
+                        if (log.isDebugEnabled()) log.debug("Waited for ver: {}", col.getZNodeVersion());
+                        return true;
+                      }
+                      return false;
+                    });
+          } catch (TimeoutException e) {
+            throw new RuntimeException(e);
+          }
         }
-
       }
 
       // assert newClusterState.getZNodeVersion() >= 0;
diff --git a/solr/core/src/java/org/apache/solr/update/CommitTracker.java b/solr/core/src/java/org/apache/solr/update/CommitTracker.java
index 71f4079..d673a72 100644
--- a/solr/core/src/java/org/apache/solr/update/CommitTracker.java
+++ b/solr/core/src/java/org/apache/solr/update/CommitTracker.java
@@ -80,7 +80,8 @@ public final class CommitTracker implements Runnable, Closeable {
   private static final boolean WAIT_SEARCHER = true;
 
   private String name;
-  
+  private volatile boolean closed;
+
   public CommitTracker(String name, SolrCore core, int docsUpperBound, int timeUpperBound, long tLogFileSizeUpperBound,
                        boolean openSearcher, boolean softCommit) {
     this.core = core;
@@ -103,6 +104,7 @@ public final class CommitTracker implements Runnable, Closeable {
   }
   
   public synchronized void close() {
+    this.closed = true;
     if (pending != null) {
       pending.cancel(false);
       pending = null;
@@ -164,7 +166,9 @@ public final class CommitTracker implements Runnable, Closeable {
       // log.info("###scheduling for " + commitMaxTime);
 
       // schedule our new commit
-      pending = scheduler.schedule(this, commitMaxTime, TimeUnit.MILLISECONDS);
+      if (!closed) {
+        pending = scheduler.schedule(this, commitMaxTime, TimeUnit.MILLISECONDS);
+      }
     }
   }