You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2017/12/15 10:47:17 UTC

lucene-solr:jira/solr-11702: SOLR-11702: Make ZKShardTerms immutable

Repository: lucene-solr
Updated Branches:
  refs/heads/jira/solr-11702 b380191f3 -> 27be8a853


SOLR-11702: Make ZKShardTerms immutable


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/27be8a85
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/27be8a85
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/27be8a85

Branch: refs/heads/jira/solr-11702
Commit: 27be8a853b8e9c9a7e667ccace08ec8c9ee463b2
Parents: b380191
Author: Cao Manh Dat <da...@apache.org>
Authored: Fri Dec 15 17:46:59 2017 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Fri Dec 15 17:46:59 2017 +0700

----------------------------------------------------------------------
 .../solr/cloud/RecoveringCoreTermWatcher.java   |   4 +-
 .../org/apache/solr/cloud/ZkController.java     |   2 +-
 .../org/apache/solr/cloud/ZkShardTerms.java     | 208 +++++++++++--------
 .../org/apache/solr/cloud/ZkShardTermsTest.java |  16 +-
 4 files changed, 132 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27be8a85/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
index 82b5cce..0afa1c8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
@@ -33,12 +33,12 @@ public class RecoveringCoreTermWatcher implements ZkShardTerms.CoreTermWatcher {
   }
 
   @Override
-  public boolean onTermChanged(ZkShardTerms zkShardTerms) {
+  public boolean onTermChanged(ZkShardTerms.Terms terms) {
     if (solrCore.isClosed()) {
       return false;
     }
     String coreNodeName = solrCore.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
-    if (!zkShardTerms.canBecomeLeader(coreNodeName)) {
+    if (!terms.canBecomeLeader(coreNodeName)) {
       log.info("Start recovery on {} because core's term is less than leader's term", coreNodeName);
       solrCore.getUpdateHandler().getSolrCoreState().doRecovery(solrCore.getCoreContainer(), solrCore.getCoreDescriptor());
     }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27be8a85/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 3817e04..5d4a9ac 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -1433,7 +1433,7 @@ public class ZkController {
   public void unregister(String coreName, CoreDescriptor cd) throws Exception {
     final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
     final String collection = cd.getCloudDescriptor().getCollectionName();
-    getShardTerms(collection, cd.getCloudDescriptor().getShardId()).removeTerm(collection, cd);
+    getShardTerms(collection, cd.getCloudDescriptor().getShardId()).removeTerm(cd);
 
     if (Strings.isNullOrEmpty(collection)) {
       log.error("No collection was specified.");

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27be8a85/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
index a406983..46ee583 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -17,23 +17,16 @@
 
 package org.apache.solr.cloud;
 
-import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
-import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
-import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
-import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
-import org.apache.solr.client.solrj.impl.ZkDistribStateManager;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkStateReader;
@@ -43,6 +36,7 @@ import org.apache.solr.core.CoreDescriptor;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,46 +49,36 @@ public class ZkShardTerms implements AutoCloseable{
   private final String collection;
   private final String shard;
   private final String znodePath;
-  private final DistribStateManager stateManager;
+  private final SolrZkClient zkClient;
   private final Set<CoreTermWatcher> listeners = new HashSet<>();
 
-  private Map<String, Long> terms = new HashMap<>();
-  private int version = 0;
+  private Terms terms;
 
   interface CoreTermWatcher {
     // return true if the listener wanna to be triggered in the next time
-    boolean onTermChanged(ZkShardTerms zkShardTerms);
+    boolean onTermChanged(Terms terms);
   }
 
-  public ZkShardTerms(String collection, String shard, SolrZkClient client) {
+  public ZkShardTerms(String collection, String shard, SolrZkClient zkClient) {
     this.znodePath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/terms/" + shard;
     this.collection = collection;
     this.shard = shard;
-    this.stateManager = new ZkDistribStateManager(client);
+    this.zkClient = zkClient;
     ensureTermNodeExist();
     updateTerms();
     ObjectReleaseTracker.track(this);
   }
 
   public boolean ensureTermsIsHigher(String leader, Set<String> replicasInLowerTerms) {
-    while(!isLessThanLeaderTerm(leader, replicasInLowerTerms)) {
-      synchronized (writingLock) {
-        long leaderTerm = terms.get(leader);
-        for (String replica : terms.keySet()) {
-          if (Objects.equals(terms.get(replica), leaderTerm) && !replicasInLowerTerms.contains(replica)) {
-            terms.put(replica, leaderTerm+1);
-          }
-        }
-        if (forceSaveTerms()) return true;
-      }
+    Terms newTerms;
+    while( (newTerms = terms.increaseTerms(leader, replicasInLowerTerms)) != null) {
+      if (forceSaveTerms(newTerms)) return true;
     }
     return false;
   }
 
   public boolean canBecomeLeader(String coreNodeName) {
-    if (terms.isEmpty()) return true;
-    long maxTerm = Collections.max(terms.values());
-    return terms.getOrDefault(coreNodeName, 0L) == maxTerm;
+    return terms.canBecomeLeader(coreNodeName);
   }
 
   public void close() {
@@ -104,8 +88,8 @@ public class ZkShardTerms implements AutoCloseable{
   }
 
   // package private for testing, only used by tests
-  HashMap<String, Long> getTerms() {
-    return new HashMap<>(terms);
+  Map<String, Long> getTerms() {
+    return new HashMap<>(terms.terms);
   }
 
   void addListener(CoreTermWatcher listener) {
@@ -114,44 +98,32 @@ public class ZkShardTerms implements AutoCloseable{
     }
   }
 
-  void removeTerm(String collection, CoreDescriptor cd) {
+  void removeTerm(CoreDescriptor cd) {
     synchronized (listeners) {
       // solrcore already closed
-      listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(this));
+      listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(terms));
     }
-    while (true) {
-      synchronized (writingLock) {
-        terms.remove(cd.getCloudDescriptor().getCoreNodeName());
-        try {
-          if (saveTerms()) break;
-        } catch (NoSuchElementException e) {
-          return;
-        }
+    Terms newTerms;
+    while ( (newTerms = terms.removeTerm(cd.getCloudDescriptor().getCoreNodeName())) != null) {
+      try {
+        if (saveTerms(newTerms)) break;
+      } catch (NoSuchElementException e) {
+        return;
       }
     }
   }
 
   void registerTerm(String replica) {
-    while (!terms.containsKey(replica)) {
-      synchronized (writingLock) {
-        terms.put(replica, 0L);
-        forceSaveTerms();
-      }
+    Terms newTerms;
+    while ( (newTerms = terms.registerTerm(replica)) != null) {
+      if (forceSaveTerms(newTerms)) break;
     }
   }
 
   void setEqualsToMax(String replica) {
-    while (true){
-      synchronized (writingLock) {
-        long maxTerm;
-        try {
-          maxTerm = Collections.max(terms.values());
-        } catch (NoSuchElementException e){
-          maxTerm = 0;
-        }
-        terms.put(replica, maxTerm);
-        if (forceSaveTerms()) break;
-      }
+    Terms newTerms;
+    while ( (newTerms = terms.setEqualsToMax(replica)) != null) {
+      if (forceSaveTerms(newTerms)) break;
     }
   }
 
@@ -161,22 +133,23 @@ public class ZkShardTerms implements AutoCloseable{
     }
   }
 
-  private boolean forceSaveTerms() {
+  private boolean forceSaveTerms(Terms newTerms) {
     try {
-      return saveTerms();
+      return saveTerms(newTerms);
     } catch (NoSuchElementException e) {
       ensureTermNodeExist();
       return false;
     }
   }
 
-  private boolean saveTerms() throws NoSuchElementException {
-    byte[] znodeData = Utils.toJSON(terms);
+  private boolean saveTerms(Terms newTerms) throws NoSuchElementException {
+    byte[] znodeData = Utils.toJSON(newTerms.terms);
     // must retry on conn loss otherwise future election attempts may assume wrong LIR state
     try {
-      stateManager.setData(znodePath, znodeData, version);
+      Stat stat = zkClient.setData(znodePath, znodeData, newTerms.version, true);
+      updateTerms(new Terms(newTerms.terms, stat.getVersion()));
       return true;
-    } catch (BadVersionException e) {
+    } catch (KeeperException.BadVersionException e) {
       log.info("Failed to save terms, version is not match, updating local terms");
       updateTerms();
     } catch (NoSuchElementException e) {
@@ -191,26 +164,26 @@ public class ZkShardTerms implements AutoCloseable{
   private void ensureTermNodeExist() {
     String path = "/collections/"+collection+ "/terms";
     try {
-      if (!stateManager.hasData(path)) {
+      if (!zkClient.exists(path, true)) {
         try {
-          stateManager.makePath(path);
-        } catch (AlreadyExistsException e) {
+          zkClient.makePath(path, true);
+        } catch (KeeperException.NodeExistsException e) {
           // it's okay if another beats us creating the node
         }
       }
       path += "/"+shard;
-      if (!stateManager.hasData(path)) {
+      if (!zkClient.exists(path, true)) {
         try {
           Map<String, Long> initialTerms = new HashMap<>();
-          stateManager.createData(path, Utils.toJSON(initialTerms), CreateMode.PERSISTENT);
-        } catch (AlreadyExistsException e) {
+          zkClient.create(path, Utils.toJSON(initialTerms), CreateMode.PERSISTENT, true);
+        } catch (KeeperException.NodeExistsException e) {
           // it's okay if another beats us creating the node
         }
       }
     }  catch (InterruptedException e) {
       Thread.interrupted();
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error creating shard term node in Zookeeper for collection:" + collection, e);
-    } catch (IOException | KeeperException e) {
+    } catch (KeeperException e) {
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error creating shard term node in Zookeeper for collection:" + collection, e);
     }
   }
@@ -225,32 +198,105 @@ public class ZkShardTerms implements AutoCloseable{
         };
       }
 
-      VersionedData data = stateManager.getData(znodePath, watcher);
-      version = data.getVersion();
-      terms = (Map<String, Long>) Utils.fromJSON(data.getData());
-      onTermUpdates();
+      Stat stat = new Stat();
+      byte[] data = zkClient.getData(znodePath, watcher, stat, true);
+      Terms newTerms = new Terms((Map<String, Long>) Utils.fromJSON(data), stat.getVersion());
+      updateTerms(newTerms);
     } catch (InterruptedException e) {
       Thread.interrupted();
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection:" + collection, e);
-    } catch (IOException | KeeperException e) {
+    } catch (KeeperException e) {
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection:" + collection, e);
     }
   }
 
-  private boolean isLessThanLeaderTerm(String leader, Set<String> replicasInLowerTerms) {
-    for (String replica : replicasInLowerTerms) {
-      if (!terms.containsKey(leader)) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can not find leader's term " + leader);
+  private void updateTerms(Terms newTerms) {
+    boolean isNewer = false;
+    synchronized (writingLock) {
+      if (terms == null || newTerms.version > terms.version) {
+        terms = newTerms;
+        isNewer = true;
       }
-      if (!terms.containsKey(replica)) return false;
-      if (terms.get(leader) <= terms.get(replica)) return false;
     }
-    return true;
+
+    if (isNewer) onTermUpdates(newTerms);
   }
 
-  private void onTermUpdates() {
+  private void onTermUpdates(Terms newTerms) {
     synchronized (listeners) {
-      listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(this));
+      listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(newTerms));
+    }
+  }
+
+  static class Terms {
+    private final Map<String, Long> terms;
+    private final int version;
+
+    public Terms () {
+      this(new HashMap<>(), 0);
+    }
+
+    public Terms(Map<String, Long> terms, int version) {
+      this.terms = terms;
+      this.version = version;
+    }
+
+    boolean canBecomeLeader(String coreNodeName) {
+      if (terms.isEmpty()) return true;
+      long maxTerm = Collections.max(terms.values());
+      return terms.getOrDefault(coreNodeName, 0L) == maxTerm;
+    }
+
+    Terms increaseTerms(String leader, Set<String> replicasInLowerTerms) {
+      if (!terms.containsKey(leader)) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can not find leader's term " + leader);
+      }
+
+      boolean changed = false;
+
+      HashMap<String, Long> newValues = new HashMap<>(terms);
+      long leaderTerm = newValues.get(leader);
+      for (String replica : newValues.keySet()) {
+        if (Objects.equals(newValues.get(replica), leaderTerm)) {
+          if(replicasInLowerTerms.contains(replica)) {
+            changed = true;
+          } else {
+            newValues.put(replica, leaderTerm+1);
+          }
+        }
+      }
+      if (!changed) return null;
+      return new Terms(newValues, version);
+    }
+
+    Terms removeTerm(String coreNodeName) {
+      if (!terms.containsKey(coreNodeName)) return null;
+
+      HashMap<String, Long> newValues = new HashMap<>(terms);
+      newValues.remove(coreNodeName);
+      return new Terms(newValues, version);
+    }
+
+    Terms registerTerm(String coreNodeName) {
+      if (terms.containsKey(coreNodeName)) return null;
+
+      HashMap<String, Long> newValues = new HashMap<>(terms);
+      newValues.put(coreNodeName, 0L);
+      return new Terms(newValues, version);
+    }
+
+    Terms setEqualsToMax(String coreNodeName) {
+      long maxTerm;
+      try {
+        maxTerm = Collections.max(terms.values());
+      } catch (NoSuchElementException e){
+        maxTerm = 0;
+      }
+      if (terms.get(coreNodeName) == maxTerm) return null;
+
+      HashMap<String, Long> newValues = new HashMap<>(terms);
+      newValues.put(coreNodeName, maxTerm);
+      return new Terms(newValues, version);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27be8a85/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
index dc2084a..c6af317 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
@@ -17,37 +17,24 @@
 
 package org.apache.solr.cloud;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 
-import org.apache.calcite.rel.core.Collect;
 import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.core.SolrCore;
 import org.apache.solr.util.TimeOut;
-import org.apache.zookeeper.KeeperException;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 public class ZkShardTermsTest extends SolrCloudTestCase {
 
@@ -80,7 +67,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
     assertArrayEquals(new Long[]{0L, 0L}, terms.values().toArray(new Long[2]));
   }
 
-  public void testRegisterTerm() {
+  public void testRegisterTerm() throws InterruptedException {
     String collection = "registerTerm";
     ZkShardTerms rep1Terms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());
     ZkShardTerms rep2Terms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());
@@ -99,6 +86,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
     rep1Terms.registerTerm("rep1");
     assertEquals(1L, rep1Terms.getTerms().get("rep1").longValue());
 
+    waitFor(1L, () -> rep2Terms.getTerms().get("rep1"));
     rep2Terms.setEqualsToMax("rep2");
     assertEquals(1L, rep2Terms.getTerms().get("rep2").longValue());
     rep2Terms.registerTerm("rep2");