You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2017/12/15 10:47:17 UTC
lucene-solr:jira/solr-11702: SOLR-11702: Make ZKShardTerms immutable
Repository: lucene-solr
Updated Branches:
refs/heads/jira/solr-11702 b380191f3 -> 27be8a853
SOLR-11702: Make ZKShardTerms immutable
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/27be8a85
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/27be8a85
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/27be8a85
Branch: refs/heads/jira/solr-11702
Commit: 27be8a853b8e9c9a7e667ccace08ec8c9ee463b2
Parents: b380191
Author: Cao Manh Dat <da...@apache.org>
Authored: Fri Dec 15 17:46:59 2017 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Fri Dec 15 17:46:59 2017 +0700
----------------------------------------------------------------------
.../solr/cloud/RecoveringCoreTermWatcher.java | 4 +-
.../org/apache/solr/cloud/ZkController.java | 2 +-
.../org/apache/solr/cloud/ZkShardTerms.java | 208 +++++++++++--------
.../org/apache/solr/cloud/ZkShardTermsTest.java | 16 +-
4 files changed, 132 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27be8a85/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
index 82b5cce..0afa1c8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
@@ -33,12 +33,12 @@ public class RecoveringCoreTermWatcher implements ZkShardTerms.CoreTermWatcher {
}
@Override
- public boolean onTermChanged(ZkShardTerms zkShardTerms) {
+ public boolean onTermChanged(ZkShardTerms.Terms terms) {
if (solrCore.isClosed()) {
return false;
}
String coreNodeName = solrCore.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
- if (!zkShardTerms.canBecomeLeader(coreNodeName)) {
+ if (!terms.canBecomeLeader(coreNodeName)) {
log.info("Start recovery on {} because core's term is less than leader's term", coreNodeName);
solrCore.getUpdateHandler().getSolrCoreState().doRecovery(solrCore.getCoreContainer(), solrCore.getCoreDescriptor());
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27be8a85/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 3817e04..5d4a9ac 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -1433,7 +1433,7 @@ public class ZkController {
public void unregister(String coreName, CoreDescriptor cd) throws Exception {
final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
final String collection = cd.getCloudDescriptor().getCollectionName();
- getShardTerms(collection, cd.getCloudDescriptor().getShardId()).removeTerm(collection, cd);
+ getShardTerms(collection, cd.getCloudDescriptor().getShardId()).removeTerm(cd);
if (Strings.isNullOrEmpty(collection)) {
log.error("No collection was specified.");
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27be8a85/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
index a406983..46ee583 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -17,23 +17,16 @@
package org.apache.solr.cloud;
-import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
-import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
-import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
-import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
-import org.apache.solr.client.solrj.impl.ZkDistribStateManager;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
@@ -43,6 +36,7 @@ import org.apache.solr.core.CoreDescriptor;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,46 +49,36 @@ public class ZkShardTerms implements AutoCloseable{
private final String collection;
private final String shard;
private final String znodePath;
- private final DistribStateManager stateManager;
+ private final SolrZkClient zkClient;
private final Set<CoreTermWatcher> listeners = new HashSet<>();
- private Map<String, Long> terms = new HashMap<>();
- private int version = 0;
+ private Terms terms;
interface CoreTermWatcher {
// return true if the listener wanna to be triggered in the next time
- boolean onTermChanged(ZkShardTerms zkShardTerms);
+ boolean onTermChanged(Terms terms);
}
- public ZkShardTerms(String collection, String shard, SolrZkClient client) {
+ public ZkShardTerms(String collection, String shard, SolrZkClient zkClient) {
this.znodePath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/terms/" + shard;
this.collection = collection;
this.shard = shard;
- this.stateManager = new ZkDistribStateManager(client);
+ this.zkClient = zkClient;
ensureTermNodeExist();
updateTerms();
ObjectReleaseTracker.track(this);
}
public boolean ensureTermsIsHigher(String leader, Set<String> replicasInLowerTerms) {
- while(!isLessThanLeaderTerm(leader, replicasInLowerTerms)) {
- synchronized (writingLock) {
- long leaderTerm = terms.get(leader);
- for (String replica : terms.keySet()) {
- if (Objects.equals(terms.get(replica), leaderTerm) && !replicasInLowerTerms.contains(replica)) {
- terms.put(replica, leaderTerm+1);
- }
- }
- if (forceSaveTerms()) return true;
- }
+ Terms newTerms;
+ while( (newTerms = terms.increaseTerms(leader, replicasInLowerTerms)) != null) {
+ if (forceSaveTerms(newTerms)) return true;
}
return false;
}
public boolean canBecomeLeader(String coreNodeName) {
- if (terms.isEmpty()) return true;
- long maxTerm = Collections.max(terms.values());
- return terms.getOrDefault(coreNodeName, 0L) == maxTerm;
+ return terms.canBecomeLeader(coreNodeName);
}
public void close() {
@@ -104,8 +88,8 @@ public class ZkShardTerms implements AutoCloseable{
}
// package private for testing, only used by tests
- HashMap<String, Long> getTerms() {
- return new HashMap<>(terms);
+ Map<String, Long> getTerms() {
+ return new HashMap<>(terms.terms);
}
void addListener(CoreTermWatcher listener) {
@@ -114,44 +98,32 @@ public class ZkShardTerms implements AutoCloseable{
}
}
- void removeTerm(String collection, CoreDescriptor cd) {
+ void removeTerm(CoreDescriptor cd) {
synchronized (listeners) {
// solrcore already closed
- listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(this));
+ listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(terms));
}
- while (true) {
- synchronized (writingLock) {
- terms.remove(cd.getCloudDescriptor().getCoreNodeName());
- try {
- if (saveTerms()) break;
- } catch (NoSuchElementException e) {
- return;
- }
+ Terms newTerms;
+ while ( (newTerms = terms.removeTerm(cd.getCloudDescriptor().getCoreNodeName())) != null) {
+ try {
+ if (saveTerms(newTerms)) break;
+ } catch (NoSuchElementException e) {
+ return;
}
}
}
void registerTerm(String replica) {
- while (!terms.containsKey(replica)) {
- synchronized (writingLock) {
- terms.put(replica, 0L);
- forceSaveTerms();
- }
+ Terms newTerms;
+ while ( (newTerms = terms.registerTerm(replica)) != null) {
+ if (forceSaveTerms(newTerms)) break;
}
}
void setEqualsToMax(String replica) {
- while (true){
- synchronized (writingLock) {
- long maxTerm;
- try {
- maxTerm = Collections.max(terms.values());
- } catch (NoSuchElementException e){
- maxTerm = 0;
- }
- terms.put(replica, maxTerm);
- if (forceSaveTerms()) break;
- }
+ Terms newTerms;
+ while ( (newTerms = terms.setEqualsToMax(replica)) != null) {
+ if (forceSaveTerms(newTerms)) break;
}
}
@@ -161,22 +133,23 @@ public class ZkShardTerms implements AutoCloseable{
}
}
- private boolean forceSaveTerms() {
+ private boolean forceSaveTerms(Terms newTerms) {
try {
- return saveTerms();
+ return saveTerms(newTerms);
} catch (NoSuchElementException e) {
ensureTermNodeExist();
return false;
}
}
- private boolean saveTerms() throws NoSuchElementException {
- byte[] znodeData = Utils.toJSON(terms);
+ private boolean saveTerms(Terms newTerms) throws NoSuchElementException {
+ byte[] znodeData = Utils.toJSON(newTerms.terms);
// must retry on conn loss otherwise future election attempts may assume wrong LIR state
try {
- stateManager.setData(znodePath, znodeData, version);
+ Stat stat = zkClient.setData(znodePath, znodeData, newTerms.version, true);
+ updateTerms(new Terms(newTerms.terms, stat.getVersion()));
return true;
- } catch (BadVersionException e) {
+ } catch (KeeperException.BadVersionException e) {
log.info("Failed to save terms, version is not match, updating local terms");
updateTerms();
} catch (NoSuchElementException e) {
@@ -191,26 +164,26 @@ public class ZkShardTerms implements AutoCloseable{
private void ensureTermNodeExist() {
String path = "/collections/"+collection+ "/terms";
try {
- if (!stateManager.hasData(path)) {
+ if (!zkClient.exists(path, true)) {
try {
- stateManager.makePath(path);
- } catch (AlreadyExistsException e) {
+ zkClient.makePath(path, true);
+ } catch (KeeperException.NodeExistsException e) {
// it's okay if another beats us creating the node
}
}
path += "/"+shard;
- if (!stateManager.hasData(path)) {
+ if (!zkClient.exists(path, true)) {
try {
Map<String, Long> initialTerms = new HashMap<>();
- stateManager.createData(path, Utils.toJSON(initialTerms), CreateMode.PERSISTENT);
- } catch (AlreadyExistsException e) {
+ zkClient.create(path, Utils.toJSON(initialTerms), CreateMode.PERSISTENT, true);
+ } catch (KeeperException.NodeExistsException e) {
// it's okay if another beats us creating the node
}
}
} catch (InterruptedException e) {
Thread.interrupted();
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error creating shard term node in Zookeeper for collection:" + collection, e);
- } catch (IOException | KeeperException e) {
+ } catch (KeeperException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error creating shard term node in Zookeeper for collection:" + collection, e);
}
}
@@ -225,32 +198,105 @@ public class ZkShardTerms implements AutoCloseable{
};
}
- VersionedData data = stateManager.getData(znodePath, watcher);
- version = data.getVersion();
- terms = (Map<String, Long>) Utils.fromJSON(data.getData());
- onTermUpdates();
+ Stat stat = new Stat();
+ byte[] data = zkClient.getData(znodePath, watcher, stat, true);
+ Terms newTerms = new Terms((Map<String, Long>) Utils.fromJSON(data), stat.getVersion());
+ updateTerms(newTerms);
} catch (InterruptedException e) {
Thread.interrupted();
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection:" + collection, e);
- } catch (IOException | KeeperException e) {
+ } catch (KeeperException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection:" + collection, e);
}
}
- private boolean isLessThanLeaderTerm(String leader, Set<String> replicasInLowerTerms) {
- for (String replica : replicasInLowerTerms) {
- if (!terms.containsKey(leader)) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can not find leader's term " + leader);
+ private void updateTerms(Terms newTerms) {
+ boolean isNewer = false;
+ synchronized (writingLock) {
+ if (terms == null || newTerms.version > terms.version) {
+ terms = newTerms;
+ isNewer = true;
}
- if (!terms.containsKey(replica)) return false;
- if (terms.get(leader) <= terms.get(replica)) return false;
}
- return true;
+
+ if (isNewer) onTermUpdates(newTerms);
}
- private void onTermUpdates() {
+ private void onTermUpdates(Terms newTerms) {
synchronized (listeners) {
- listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(this));
+ listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(newTerms));
+ }
+ }
+
+ static class Terms {
+ private final Map<String, Long> terms;
+ private final int version;
+
+ public Terms () {
+ this(new HashMap<>(), 0);
+ }
+
+ public Terms(Map<String, Long> terms, int version) {
+ this.terms = terms;
+ this.version = version;
+ }
+
+ boolean canBecomeLeader(String coreNodeName) {
+ if (terms.isEmpty()) return true;
+ long maxTerm = Collections.max(terms.values());
+ return terms.getOrDefault(coreNodeName, 0L) == maxTerm;
+ }
+
+ Terms increaseTerms(String leader, Set<String> replicasInLowerTerms) {
+ if (!terms.containsKey(leader)) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can not find leader's term " + leader);
+ }
+
+ boolean changed = false;
+
+ HashMap<String, Long> newValues = new HashMap<>(terms);
+ long leaderTerm = newValues.get(leader);
+ for (String replica : newValues.keySet()) {
+ if (Objects.equals(newValues.get(replica), leaderTerm)) {
+ if(replicasInLowerTerms.contains(replica)) {
+ changed = true;
+ } else {
+ newValues.put(replica, leaderTerm+1);
+ }
+ }
+ }
+ if (!changed) return null;
+ return new Terms(newValues, version);
+ }
+
+ Terms removeTerm(String coreNodeName) {
+ if (!terms.containsKey(coreNodeName)) return null;
+
+ HashMap<String, Long> newValues = new HashMap<>(terms);
+ newValues.remove(coreNodeName);
+ return new Terms(newValues, version);
+ }
+
+ Terms registerTerm(String coreNodeName) {
+ if (terms.containsKey(coreNodeName)) return null;
+
+ HashMap<String, Long> newValues = new HashMap<>(terms);
+ newValues.put(coreNodeName, 0L);
+ return new Terms(newValues, version);
+ }
+
+ Terms setEqualsToMax(String coreNodeName) {
+ long maxTerm;
+ try {
+ maxTerm = Collections.max(terms.values());
+ } catch (NoSuchElementException e){
+ maxTerm = 0;
+ }
+ if (terms.get(coreNodeName) == maxTerm) return null;
+
+ HashMap<String, Long> newValues = new HashMap<>(terms);
+ newValues.put(coreNodeName, maxTerm);
+ return new Terms(newValues, version);
}
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27be8a85/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
index dc2084a..c6af317 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
@@ -17,37 +17,24 @@
package org.apache.solr.cloud;
-import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
-import org.apache.calcite.rel.core.Collect;
import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.core.SolrCore;
import org.apache.solr.util.TimeOut;
-import org.apache.zookeeper.KeeperException;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
public class ZkShardTermsTest extends SolrCloudTestCase {
@@ -80,7 +67,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
assertArrayEquals(new Long[]{0L, 0L}, terms.values().toArray(new Long[2]));
}
- public void testRegisterTerm() {
+ public void testRegisterTerm() throws InterruptedException {
String collection = "registerTerm";
ZkShardTerms rep1Terms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());
ZkShardTerms rep2Terms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());
@@ -99,6 +86,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
rep1Terms.registerTerm("rep1");
assertEquals(1L, rep1Terms.getTerms().get("rep1").longValue());
+ waitFor(1L, () -> rep2Terms.getTerms().get("rep1"));
rep2Terms.setEqualsToMax("rep2");
assertEquals(1L, rep2Terms.getTerms().get("rep2").longValue());
rep2Terms.registerTerm("rep2");