You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2020/08/25 07:59:49 UTC

[lucene-solr] 02/03: Move tracking to coreDescriptor

This is an automated email from the ASF dual-hosted git repository.

datcm pushed a commit to branch datcm/bloomberg-abdicate-leadership
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 3d058672fdde7b1a637be60cdd5482c45b720ba7
Author: Cao Manh Dat <da...@apache.org>
AuthorDate: Tue Aug 25 14:55:00 2020 +0700

    Move tracking to coreDescriptor
---
 .../java/org/apache/solr/core/CoreContainer.java   | 83 --------------------
 .../java/org/apache/solr/core/CoreDescriptor.java  | 37 +++++++++
 .../solr/handler/admin/CoreAdminOperation.java     | 91 +++++++++++++---------
 .../java/org/apache/solr/servlet/HttpSolrCall.java |  4 +-
 .../apache/solr/common/params/CoreAdminParams.java |  1 +
 5 files changed, 93 insertions(+), 123 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 711b5c7..c537791 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -261,89 +261,6 @@ public class CoreContainer {
   private ExecutorService coreContainerAsyncTaskExecutor = ExecutorUtil.newMDCAwareCachedThreadPool("Core Container Async Task");
 
   private final Object lock = new Object();
-  private Set<String> pausedUpdatesCores = Collections.synchronizedSet(new HashSet<>());
-  private Map<String, AtomicInteger> currentCoreUpdateCounters = new HashMap<>();
-
-  public boolean startTrackUpdateRequest(String coreName) {
-    synchronized (lock) {
-      if (pausedUpdatesCores.contains(coreName)) {
-        return false;
-      }
-      if (!currentCoreUpdateCounters.containsKey(coreName)) {
-        currentCoreUpdateCounters.put(coreName, new AtomicInteger());
-      }
-      currentCoreUpdateCounters.get(coreName).incrementAndGet();
-      return true;
-    }
-  }
-
-  public void endTrackUpdateRequest(String coreName) {
-    synchronized (lock) {
-      currentCoreUpdateCounters.get(coreName).decrementAndGet();
-    }
-  }
-
-  public void pauseUpdateFor(String coreName) {
-    synchronized (lock) {
-      pausedUpdatesCores.add(coreName);
-    }
-  }
-
-  public void unpauseUpdateFor(String coreName) {
-    synchronized (lock) {
-      pausedUpdatesCores.remove(coreName);
-    }
-  }
-
-  public void waitForEmptyUpdates(String coreName) {
-    AtomicInteger counter = currentCoreUpdateCounters.get(coreName);
-    if (counter == null)
-      return;
-
-    while (counter.get() != 0) {
-      log.info("Datcm waiting for finish pending updates:{}", counter.get());
-      try {
-        Thread.sleep(300);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        return;
-      }
-    }
-
-  }
-
-  public void abdicateLeadership(String coreName) {
-    CoreDescriptor cd = getCoreDescriptor(coreName);
-    if (!isZooKeeperAware() || cd.getCloudDescriptor() == null || !cd.getCloudDescriptor().isLeader())
-      return;
-
-    try {
-      synchronized (lock) {
-        pausedUpdatesCores.add(coreName);
-      }
-      AtomicInteger counter = currentCoreUpdateCounters.get(coreName);
-      while (counter != null && counter.get() != 0) {
-        try {
-          Thread.sleep(300);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          return;
-        }
-      }
-      try {
-        zkSys.zkController.rejoinShardLeaderElection(cd, false);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        log.error("Exception on abdicate leadership of core:{}", coreName, e);
-      } catch (Exception e) {
-        log.error("Exception on abdicate leadership of core:{}", coreName, e);
-      }
-    } finally {
-      synchronized (lock) {
-        pausedUpdatesCores.remove(coreName);
-      }
-    }
-  }
 
   private enum CoreInitFailedAction {fromleader, none}
 
diff --git a/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java b/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java
index 7b3428d..a51d979 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -141,6 +142,11 @@ public class CoreDescriptor {
   /** The properties for this core, substitutable by resource loaders */
   protected final Properties substitutableProperties = new Properties();
 
+  /** If true then all update requests will be refused */
+  protected volatile boolean pausedAcceptingIndexing = false;
+
+  protected final AtomicInteger inflightUpdatesCounter = new AtomicInteger();
+
   /** TESTS ONLY */
   public CoreDescriptor(String name, Path instanceDir, CoreContainer coreContainer, String... coreProps) {
     this(name, instanceDir, toMap(coreProps), coreContainer.getContainerProperties(), coreContainer.getZkController());
@@ -334,6 +340,37 @@ public class CoreDescriptor {
     return Boolean.parseBoolean(tmp);
   }
 
+  public void setPausedAcceptingIndexing(boolean pausedAcceptingIndexing) {
+    this.pausedAcceptingIndexing = pausedAcceptingIndexing;
+  }
+
+  public boolean isPausedAcceptingIndexing() {
+    return pausedAcceptingIndexing;
+  }
+
+  public boolean incInflightUpdateCounter() {
+    if (isPausedAcceptingIndexing()) {
+      return false;
+    }
+    inflightUpdatesCounter.incrementAndGet();
+    return true;
+  }
+
+  public void decInFlightUpdateCounter() {
+    inflightUpdatesCounter.decrementAndGet();
+  }
+
+  public void waitForNoInflightUpdates() {
+    while (inflightUpdatesCounter.get() != 0) {
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        return;
+      }
+    }
+  }
+
   public boolean isTransient() {
     String tmp = coreProperties.getProperty(CORE_TRANSIENT, "false");
     return PropertiesUtil.toBoolean(tmp);
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
index 2b852a4..6c0021b 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
@@ -16,26 +16,14 @@
  */
 package org.apache.solr.handler.admin;
 
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.nio.file.Path;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.solr.cloud.CloudDescriptor;
-import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.ZkController;
-import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
@@ -48,7 +36,7 @@ import org.apache.solr.core.SolrInfoBean;
 import org.apache.solr.core.snapshots.SolrSnapshotManager;
 import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager;
 import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager.SnapshotMetaData;
-import org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminOp;
+import org.apache.solr.handler.admin.CoreAdminHandler.*;
 import org.apache.solr.metrics.SolrMetricManager;
 import org.apache.solr.search.SolrIndexSearcher;
 import org.apache.solr.update.UpdateLog;
@@ -59,6 +47,16 @@ import org.apache.solr.util.TestInjection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
 import static org.apache.solr.common.params.CommonParams.NAME;
 import static org.apache.solr.common.params.CoreAdminParams.COLLECTION;
 import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.*;
@@ -238,38 +236,55 @@ enum CoreAdminOperation implements CoreAdminOp {
       }
     }
   }),
+
   ABDICATE_LEADERSHIP_OP(ABDICATE_LEADERSHIP, it -> {
     CoreContainer cc = it.handler.coreContainer;
     if (!cc.isZooKeeperAware()) {
       throw new SolrException(ErrorCode.BAD_REQUEST, "This api only works in SolrCloud env");
     }
-    for (String coreName: cc.getAllCoreNames()) {
-        try {
-          CoreDescriptor cd = cc.getCoreDescriptor(coreName);
-          if (cd == null || cd.getCloudDescriptor() == null || !cd.getCloudDescriptor().isLeader()) {
-            continue;
-          }
-
-          ClusterState clusterState = cc.getZkController().getClusterState();
-          DocCollection docCollection = clusterState.getCollection(cd.getCollectionName());
-          Slice slice = docCollection.getSlice(cd.getCloudDescriptor().getShardId());
-          if (slice.getReplicas().size() < 2) {
-            log().info("Skipping abdicate leadership of core:{} since it is the only one replicas in the shard", coreName);
-          }
-          if (slice.getReplicas(replica -> replica.isActive(clusterState.getLiveNodes())).size() < 2) {
-            log().info("Skipping abdicate leadership of core:{} since it is the only one active replica", coreName);
-          }
+    String cores = it.req.getParams().get("cores");
+    Set<String> selectedCores = null;
+    if (cores != null) {
+      selectedCores = new HashSet<>(Arrays.asList(cores.split(",")));
+    }
+    for (CoreDescriptor cd: cc.getCoreDescriptors()) {
+      if (cd == null || cd.getCloudDescriptor() == null || !cd.getCloudDescriptor().isLeader()) {
+        continue;
+      }
+      if (selectedCores != null && !selectedCores.contains(cd.getName())) {
+        continue;
+      }
 
-          cc.pauseUpdateFor(coreName);
-          cc.waitForEmptyUpdates(coreName);
-          cc.getZkController().rejoinShardLeaderElection(cd, false);
-        } finally {
-          cc.unpauseUpdateFor(coreName);
-          // after this point this core can still receive more updates since it did not explicitly
-          // unset its leader role from cluster state, but that fine since it already unset its leader role locally
-          // (by setting cloudDescriptor.isLeader() flag) so it gonna keep refusing updates or routing update requests
-          // to a new leader
+      try {
+        ClusterState clusterState = cc.getZkController().getClusterState();
+        DocCollection docCollection = clusterState.getCollection(cd.getCollectionName());
+        Slice slice = docCollection.getSlice(cd.getCloudDescriptor().getShardId());
+        if (slice.getReplicas().size() < 2) {
+          log().info("Skipping abdicate leadership of core:{} since it is the only one replicas in the shard", cd.getName());
+        }
+        if (slice.getReplicas(replica -> replica.isActive(clusterState.getLiveNodes())).size() < 2) {
+          log().info("Skipping abdicate leadership of core:{} since it is the only one active replica", cd.getName());
         }
+
+        cd.setPausedAcceptingIndexing(true);
+        cd.waitForNoInflightUpdates();
+        cc.getZkController().rejoinShardLeaderElection(cd, false);
+      } finally {
+        cd.setPausedAcceptingIndexing(false);
+        // after this point this core can still receive more updates since it did not explicitly
+        // unset its leader role from cluster state, but that fine since it already unset its leader role locally
+        // (by setting cloudDescriptor.isLeader() flag) so it gonna keep refusing updates or routing update requests
+        // to a new leader
+      }
+    }
+  }),
+
+  UNPAUSE_INDEXING_OP(UNPAUSE_INDEXING, it -> {
+    CoreContainer cc = it.handler.coreContainer;
+    for (CoreDescriptor cd : cc.getCoreDescriptors()) {
+      if (cd != null) {
+        cd.setPausedAcceptingIndexing(false);
+      }
     }
   }),
 
diff --git a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
index 895955d..27fa106 100644
--- a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
+++ b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
@@ -572,7 +572,7 @@ public class HttpSolrCall {
           remoteQuery(coreUrl + path, resp);
           return RETURN;
         case PROCESS:
-          if (handler instanceof UpdateRequestHandler && !cores.startTrackUpdateRequest(core.getName())) {
+          if (handler instanceof UpdateRequestHandler && !core.getCoreDescriptor().incInflightUpdateCounter()) {
             throw new SolrException(ErrorCode.SERVER_ERROR, "Updates are temporary paused for core:"+core.getName());
           }
           try {
@@ -611,7 +611,7 @@ public class HttpSolrCall {
             return RETURN;
           } finally {
             if (handler instanceof UpdateRequestHandler) {
-              cores.endTrackUpdateRequest(core.getName());
+              core.getCoreDescriptor().decInFlightUpdateCounter();
             }
           }
         default: return action;
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
index daff1fe..ebee497 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
@@ -154,6 +154,7 @@ public abstract class CoreAdminParams
     REQUESTAPPLYUPDATES,
     OVERSEEROP,
     ABDICATE_LEADERSHIP,
+    UNPAUSE_INDEXING,
     REQUESTSTATUS(true),
     REJOINLEADERELECTION,
     //internal API used by force shard leader election