You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/03/12 05:05:22 UTC

[lucene-solr] branch reference_impl updated: @1463 Stress concurrency fix and tweaks.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl by this push:
     new c42cba5  @1463 Stress concurrency fix and tweaks.
c42cba5 is described below

commit c42cba5981b8b0989146224f06cf46f25da61a9d
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Mar 11 23:04:47 2021 -0600

    @1463 Stress concurrency fix and tweaks.
    
    Took 1 hour 4 minutes
---
 .../java/org/apache/solr/cloud/ZkController.java   |  2 +-
 .../apache/solr/cloud/overseer/ZkStateWriter.java  | 46 +++++++++---------
 .../java/org/apache/solr/common/cloud/Replica.java |  4 ++
 .../apache/solr/common/cloud/ZkStateReader.java    | 54 ++++++++++++++--------
 .../apache/solr/cloud/MiniSolrCloudCluster.java    |  4 +-
 5 files changed, 64 insertions(+), 46 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 8d46016..559af67 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -1325,7 +1325,7 @@ public class ZkController implements Closeable, Runnable {
           break;
         }
         try {
-          Replica leader = zkStateReader.getLeaderRetry(collection, shardId, 1500, true);
+          Replica leader = zkStateReader.getLeaderRetry(collection, shardId, 10000, true);
           leaderName = leader.getName();
 
         } catch (TimeoutException timeoutException) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 7d5476c..701d367 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -215,14 +215,15 @@ public class ZkStateWriter {
                     continue;
                   } else {
                     if (log.isDebugEnabled()) log.debug("state cmd entry {} asOverseerCmd={}", entry, OverseerAction.get(entry.getKey()));
-                    String id = entry.getKey();
+                    String fullId = entry.getKey();
+                    String id = fullId.substring(fullId.indexOf("-") + 1);
 
                     String stateString = (String) entry.getValue();
                     if (log.isDebugEnabled()) {
                       log.debug("stateString={}", stateString);
                     }
 
-                    long collectionId = Long.parseLong(id.split("-")[0]);
+                    long collectionId = Long.parseLong(fullId.split("-")[0]);
                     String collection = idToCollection.get(collectionId);
                     if (collection == null) {
                       log.info("collection for id={} is null", collectionId);
@@ -265,7 +266,7 @@ public class ZkStateWriter {
                           dirtyState.add(docColl.getName());
                           blockedNodes.add((String) entry.getValue());
                           StateUpdate update = new StateUpdate();
-                          update.id = replica.getId();
+                          update.id = replica.getInternalId();
                           update.state = Replica.State.getShortState(Replica.State.DOWN);
                           updates.add(update);
 
@@ -287,7 +288,7 @@ public class ZkStateWriter {
                           dirtyState.add(docColl.getName());
                           //   blockedNodes.add((String) entry.getValue());
                           StateUpdate update = new StateUpdate();
-                          update.id = replica.getId();
+                          update.id = replica.getInternalId();
                           update.state = Replica.State.getShortState(Replica.State.RECOVERING);
                           updates.add(update);
                         }
@@ -393,7 +394,7 @@ public class ZkStateWriter {
                           r.getProperties().remove("leader");
                         }
                       }
-                      updates.getProperties().put(replica.getId(), "l");
+                      updates.getProperties().put(replica.getInternalId(), "l");
                       dirtyState.add(collection);
                     } else {
                       Replica.State s = Replica.State.getState(setState);
@@ -401,7 +402,7 @@ public class ZkStateWriter {
                       if (existingLeader != null && existingLeader.getName().equals(replica.getName())) {
                         docColl.getSlice(replica).setLeader(null);
                       }
-                      updates.getProperties().put(replica.getId(), Replica.State.getShortState(s));
+                      updates.getProperties().put(replica.getInternalId(), Replica.State.getShortState(s));
                       log.debug("set state {} {}", state, replica);
                       replica.setState(s);
                       dirtyState.add(collection);
@@ -422,29 +423,26 @@ public class ZkStateWriter {
                 }
               }
 
+              String coll = entry.getKey();
+              dirtyState.add(coll);
+              Integer ver = trackVersions.get(coll);
+              if (ver == null) {
+                ver = 0;
+              }
+              ZkNodeProps updateMap = stateUpdates.get(coll);
+              if (updateMap == null) {
+                updateMap = new ZkNodeProps();
+                stateUpdates.put(coll, updateMap);
+              }
+              updateMap.getProperties().put("_cs_ver_", ver.toString());
+              for (StateUpdate theUpdate : entry.getValue()) {
+                updateMap.getProperties().put(theUpdate.id, theUpdate.state);
+              }
 
             } finally {
               collState.collLock.unlock();
             }
           }
-          for (Map.Entry<String,List<StateUpdate>> update : collStateUpdates.entrySet()) {
-
-            String coll = update.getKey();
-            dirtyState.add(coll);
-            Integer ver = trackVersions.get(coll);
-            if (ver == null) {
-              ver = 0;
-            }
-            ZkNodeProps updateMap = stateUpdates.get(coll);
-            if (updateMap == null) {
-              updateMap = new ZkNodeProps();
-              stateUpdates.put(coll, updateMap);
-            }
-            updateMap.getProperties().put("_cs_ver_", ver.toString());
-            for (StateUpdate theUpdate : update.getValue()) {
-              updateMap.getProperties().put(theUpdate.id, theUpdate.state);
-            }
-          }
         }
 
 
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
index 957c361..ae9f960 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
@@ -214,6 +214,10 @@ public class Replica extends ZkNodeProps {
     return collectionId + "-" + (id == null ? null : id.toString());
   }
 
+  public String getInternalId() {
+    return id.toString();
+  }
+
   public Long getCollectionId() {
     return collectionId;
   }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index b10bda3..d52bb80 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -391,13 +391,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
           log.error("problem fetching update collection state", e);
           return;
         }
-        String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(name);
-        try {
-          newState = getAndProcessStateUpdates(name, stateUpdatesPath, false, newState, null);
-        } catch (Exception e) {
-          log.error("", e);
-          throw new SolrException(ErrorCode.SERVER_ERROR, e);
-        }
+
         if (updateWatchedCollection(name, newState, false)) {
           constructState(newState);
         }
@@ -428,14 +422,6 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
         return;
       }
 
-      String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(name);
-      try {
-        newState = getAndProcessStateUpdates(name, stateUpdatesPath, false, newState, null);
-      } catch (Exception e) {
-        log.error("", e);
-        throw new SolrException(ErrorCode.SERVER_ERROR, e);
-      }
-
       if (updateWatchedCollection(name, newState, false)) {
         updatedCollections.add(newState);
       }
@@ -823,8 +809,6 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
           try {
             DocCollection cdc = getCollectionLive(ZkStateReader.this, collName);
             if (cdc != null) {
-              String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(collName);
-              cdc = getAndProcessStateUpdates(collName, stateUpdatesPath, true, cdc, null);
               cdc.setCreatedLazy();
               lastUpdateTime = System.nanoTime();
               cachedDocCollection = cdc;
@@ -1828,8 +1812,28 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     DocCollection result = null;
     try {
       log.debug("get and process state updates for {}", coll);
+
+      Stat stat;
+      try {
+        stat = getZkClient().exists(stateUpdatesPath, null,true);
+        if (stat == null) {
+          return docCollection;
+        }
+      } catch (NoNodeException e) {
+        log.info("No node found for {}", stateUpdatesPath);
+        return docCollection;
+      }
+
+      if (docCollection != null && docCollection.hasStateUpdates()) {
+        int oldVersion = (int) docCollection.getStateUpdates().get("_ver_");
+        if (stat.getVersion() < oldVersion) {
+          if (log.isDebugEnabled()) log.debug("Will not apply state updates, they are for an older set of updates {}, ours is now {}", stat.getVersion(), oldVersion);
+          return docCollection;
+        }
+      }
+
       byte[] data = null;
-      Stat stat = new Stat();
+      stat = new Stat();
       try {
         data = getZkClient().getData(stateUpdatesPath, null, stat, true, false);
       } catch (NoNodeException e) {
@@ -1879,7 +1883,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
               state = Replica.State.shortStateToState((String) entry.getValue());
             }
 
-            Replica replica = docCollection.getReplicaById(id);
+            Replica replica = docCollection.getReplicaById(docCollection.getId() + "-" + id);
             if (log.isDebugEnabled()) log.debug("Got additional state update {} replica={} id={} ids={} {}", state == null ? "leader" : state, replica, id, docCollection.getReplicaByIds());
 
             if (replica != null) {
@@ -2013,6 +2017,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
         }
 
       }
+
     } else {
       return null;
     }
@@ -2028,6 +2033,17 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     ClusterState state = ClusterState.createFromJson(this, stat.getVersion(), data);
     ClusterState.CollectionRef collectionRef = state.getCollectionStates().get(coll);
     DocCollection docCollection = collectionRef == null ? null : collectionRef.get();
+
+    if (docCollection != null) {
+      String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(coll);
+      try {
+        docCollection = getAndProcessStateUpdates(coll, stateUpdatesPath, true, docCollection, null);
+      } catch (Exception e) {
+        log.error("", e);
+        throw new SolrException(ErrorCode.SERVER_ERROR, e);
+      }
+    }
+
     return docCollection;
   }
 
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
index 6011adb..d47ffc6 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
@@ -858,12 +858,12 @@ public class MiniSolrCloudCluster {
 
   public void waitForActiveCollection(String collection, int shards, int totalReplicas) {
     if (collection == null) throw new IllegalArgumentException("null collection");
-    waitForActiveCollection(collection,  10, TimeUnit.SECONDS, shards, totalReplicas);
+    waitForActiveCollection(collection,  120, TimeUnit.SECONDS, shards, totalReplicas);
   }
 
   public void waitForActiveCollection(String collection, int shards, int totalReplicas, boolean exact) {
 
-    waitForActiveCollection(collection,  10, TimeUnit.SECONDS, shards, totalReplicas, exact);
+    waitForActiveCollection(collection,  120, TimeUnit.SECONDS, shards, totalReplicas, exact);
   }
 
   public void waitForJettyToStop(JettySolrRunner runner) throws TimeoutException {