You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/02/25 19:41:19 UTC
[lucene-solr] branch reference_impl_dev updated: @1400 Version
locking and recovery fixes.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl_dev by this push:
new a709fc4 @1400 Version locking and recovery fixes.
a709fc4 is described below
commit a709fc4e7ec2938b3fe856f85583b9f90d48bcd8
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Feb 25 13:26:19 2021 -0600
@1400 Version locking and recovery fixes.
Took 1 hour 23 minutes
---
.../org/apache/solr/cloud/RecoveryStrategy.java | 11 ++--
.../java/org/apache/solr/cloud/StatePublisher.java | 5 +-
.../java/org/apache/solr/cloud/ZkController.java | 7 +--
.../org/apache/solr/update/SolrCmdDistributor.java | 4 +-
.../java/org/apache/solr/update/VersionBucket.java | 30 ++++++++--
.../solr/cloud/ChaosMonkeySafeLeaderTest.java | 70 ++++++++++++++--------
.../apache/solr/cloud/DocValuesNotIndexedTest.java | 2 +-
.../apache/solr/cloud/SolrCloudBridgeTestCase.java | 1 +
.../org/apache/solr/common/cloud/SolrZkClient.java | 25 ++++++--
.../apache/solr/common/cloud/ZkStateReader.java | 9 +--
.../org/apache/solr/cloud/SolrCloudTestCase.java | 29 +++++----
11 files changed, 127 insertions(+), 66 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 6f2eaf4..86ec39a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -612,6 +612,10 @@ public class RecoveryStrategy implements Runnable, Closeable {
return false;
}
+ log.info("Begin buffering updates. core=[{}]", coreName);
+ // recalling buffer updates will drop the old buffer tlog
+ ulog.bufferUpdates();
+
// we wait a bit so that any updates on the leader
// that started before they saw recovering state
// are sure to have finished (see SOLR-7141 for
@@ -649,6 +653,9 @@ public class RecoveryStrategy implements Runnable, Closeable {
// solrcloud_debug
// cloudDebugLog(core, "synced");
+ log.info("Replaying updates buffered during PeerSync.");
+ replay(core);
+
// sync success
successfulRecovery = true;
} else {
@@ -669,10 +676,6 @@ public class RecoveryStrategy implements Runnable, Closeable {
didReplication = true;
try {
- log.info("Begin buffering updates. core=[{}]", coreName);
- // recalling buffer updates will drop the old buffer tlog
- ulog.bufferUpdates();
-
// try {
// if (prevSendPreRecoveryHttpUriRequest != null) {
// prevSendPreRecoveryHttpUriRequest.cancel();
diff --git a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
index b9b84b7..cc36bd2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
+++ b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
@@ -151,8 +151,9 @@ public class StatePublisher implements Closeable {
log.error("Bad state found for publish! {} {}", zkNodeProps, bulkMessage);
return;
}
-
- bulkMessage.getProperties().put(core, collection + "," + Replica.State.getShortState(Replica.State.valueOf(state.toUpperCase(Locale.ROOT))));
+ String line = collection + "," + Replica.State.getShortState(Replica.State.valueOf(state.toUpperCase(Locale.ROOT)));
+ if (log.isDebugEnabled()) log.debug("Bulk publish core={} line={}", core, line);
+ bulkMessage.getProperties().put(core, line);
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 46bf118..24fccf0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -1321,7 +1321,7 @@ public class ZkController implements Closeable, Runnable {
try {
log.info("Waiting to see our entry in state.json {}", desc.getName());
- zkStateReader.waitForState(collection, Integer.getInteger("solr.zkregister.leaderwait", 5000), TimeUnit.MILLISECONDS, (l, c) -> { // MRM TODO: timeout
+ zkStateReader.waitForState(collection, Integer.getInteger("solr.zkregister.leaderwait", 30000), TimeUnit.MILLISECONDS, (l, c) -> { // MRM TODO: timeout
if (c == null) {
return false;
}
@@ -1340,11 +1340,8 @@ public class ZkController implements Closeable, Runnable {
Replica replica = replicaRef.get();
if (replica == null) {
- replica = zkStateReader.getClusterState().getCollection(collection).getReplica(coreName);
- if (replica == null) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Error registering SolrCore, replica=" + coreName + " is removed from clusterstate \n"
- + zkStateReader.getClusterState().getCollectionOrNull(collection));
- }
+ + coll.get());
}
log.info("Register replica - core:{} address:{} collection:{} shard:{} type={}", coreName, baseUrl, collection, shardId, replica.getType());
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index 806f140..023231c 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -123,7 +123,7 @@ public class SolrCmdDistributor implements Closeable {
err.t);
// this can happen in certain situations such as close
- if (isRetry) {
+ if (isRetry && rspCode != -1) {
// if it's a io exception exception, lets try again
if (err.t instanceof SolrServerException) {
if (((SolrServerException) err.t).getRootCause() instanceof IOException && !(((SolrServerException) err.t).getRootCause() instanceof ClosedChannelException)) {
@@ -135,7 +135,7 @@ public class SolrCmdDistributor implements Closeable {
doRetry = true;
}
- if (err.req.retries < maxRetries && doRetry) {
+ if (err.req.retries < maxRetries && doRetry && !isClosed.isClosed()) {
err.req.retries++;
SolrException.log(SolrCmdDistributor.log, "sending update to "
diff --git a/solr/core/src/java/org/apache/solr/update/VersionBucket.java b/solr/core/src/java/org/apache/solr/update/VersionBucket.java
index ff17f4b..4b2d6e3 100644
--- a/solr/core/src/java/org/apache/solr/update/VersionBucket.java
+++ b/solr/core/src/java/org/apache/solr/update/VersionBucket.java
@@ -27,6 +27,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.ParWork;
+import org.apache.solr.common.SolrException;
// TODO: make inner?
// TODO: store the highest possible in the index on a commit (but how to not block adds?)
@@ -63,19 +64,38 @@ public class VersionBucket {
LongAdder adder = new LongAdder();
adder.increment();
blockedIds.put(idBytes, adder);
+ lock.unlock();
} else {
LongAdder adder = blockedIds.get(idBytes);
- adder.increment();
+
+ while (adder.longValue() > 0) {
+ try {
+ lockCondition.awaitNanos(250);
+ } catch (InterruptedException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ }
+ adder = blockedIds.get(idBytes);
+ if (adder == null) {
+ adder = new LongAdder();
+ adder.increment();
+ blockedIds.put(idBytes, adder);
+ }
+ lock.unlock();
}
return function.apply();
} finally {
try {
+ if (!lock.isHeldByCurrentThread()) {
+ lock.lock();
+ }
LongAdder adder = blockedIds.get(idBytes);
- adder.decrement();
- if (adder.longValue() == 0L) {
- blockedIds.remove(idBytes);
+ if (adder != null) {
+ adder.decrement();
+ if (adder.longValue() == 0L) {
+ blockedIds.remove(idBytes);
+ }
}
-
} finally {
if (lock.isHeldByCurrentThread()) lock.unlock();
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
index 040a430..94e975e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java
@@ -21,6 +21,8 @@ import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -30,6 +32,7 @@ import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -44,7 +47,30 @@ public class ChaosMonkeySafeLeaderTest extends SolrCloudBridgeTestCase {
@BeforeClass
public static void beforeSuperClass() throws Exception {
useFactory(null);
- //setErrorHook();
+ // schemaString = "schema15.xml"; // we need a string id
+ System.setProperty("solr.autoCommit.maxTime", "15000");
+ System.setProperty("solr.httpclient.retries", "1");
+ System.setProperty("solr.retries.on.forward", "1");
+ System.setProperty("solr.retries.to.followers", "1");
+ useFactory(null);
+ System.setProperty("solr.suppressDefaultConfigBootstrap", "false");
+
+ createControl = false;
+
+ sliceCount = Integer.parseInt(System.getProperty("solr.tests.cloud.cm.slicecount", "-1"));
+ if (sliceCount == -1) {
+ sliceCount = random().nextInt(TEST_NIGHTLY ? 5 : 3) + 1;
+ }
+
+ replicationFactor = 3;
+
+ // int numShards = Integer.parseInt(System.getProperty("solr.tests.cloud.cm.shardcount", "-1"));
+ // if (numShards == -1) {
+ // // we make sure that there's at least one shard with more than one replica
+ // // so that the ChaosMonkey has something to kill
+ // numShards = sliceCount + random().nextInt(TEST_NIGHTLY ? 12 : 2) + 1;
+ // }
+ numJettys = sliceCount * replicationFactor;
}
@AfterClass
@@ -75,30 +101,6 @@ public class ChaosMonkeySafeLeaderTest extends SolrCloudBridgeTestCase {
public ChaosMonkeySafeLeaderTest() throws Exception {
super();
- // schemaString = "schema15.xml"; // we need a string id
- System.setProperty("solr.autoCommit.maxTime", "15000");
- System.setProperty("solr.httpclient.retries", "1");
- System.setProperty("solr.retries.on.forward", "1");
- System.setProperty("solr.retries.to.followers", "1");
- useFactory(null);
- System.setProperty("solr.suppressDefaultConfigBootstrap", "false");
-
- createControl = true;
-
- sliceCount = Integer.parseInt(System.getProperty("solr.tests.cloud.cm.slicecount", "-1"));
- if (sliceCount == -1) {
- sliceCount = random().nextInt(TEST_NIGHTLY ? 5 : 3) + 1;
- }
-
- replicationFactor = 3;
-
-// int numShards = Integer.parseInt(System.getProperty("solr.tests.cloud.cm.shardcount", "-1"));
-// if (numShards == -1) {
-// // we make sure that there's at least one shard with more than one replica
-// // so that the ChaosMonkey has something to kill
-// numShards = sliceCount + random().nextInt(TEST_NIGHTLY ? 12 : 2) + 1;
-// }
- this.numJettys = sliceCount * replicationFactor;
}
@Test
@@ -109,6 +111,7 @@ public class ChaosMonkeySafeLeaderTest extends SolrCloudBridgeTestCase {
// randomly turn on 1 seconds 'soft' commit
//randomlyEnableAutoSoftCommit();
+ cluster.waitForActiveCollection(COLLECTION, sliceCount, sliceCount * replicationFactor);
tryDelete();
@@ -165,9 +168,24 @@ public class ChaosMonkeySafeLeaderTest extends SolrCloudBridgeTestCase {
}
for (StoppableIndexingThread indexThread : threads) {
- assertTrue( indexThread.getFailCount() < 10);
+ assertTrue(String.valueOf(indexThread.getFailCount()), indexThread.getFailCount() < 10);
}
+ cluster.getSolrClient().getZkStateReader().waitForState(COLLECTION, 10, TimeUnit.SECONDS, (liveNodes, collectionState) -> {
+ if (collectionState == null) return false;
+ Collection<Slice> slices = collectionState.getSlices();
+ for (Slice slice : slices) {
+ for (Replica replica : slice.getReplicas()) {
+ if (cluster.getSolrClient().getZkStateReader().isNodeLive(replica.getNodeName())) {
+ if (replica.getState() != Replica.State.ACTIVE) {
+ return false;
+ }
+ }
+ }
+ }
+ return true;
+ });
+
commit();
// MRM TODO: make test fail on compare fail
diff --git a/solr/core/src/test/org/apache/solr/cloud/DocValuesNotIndexedTest.java b/solr/core/src/test/org/apache/solr/cloud/DocValuesNotIndexedTest.java
index 80e875916..7e83704 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DocValuesNotIndexedTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DocValuesNotIndexedTest.java
@@ -327,7 +327,7 @@ public class DocValuesNotIndexedTest extends SolrCloudTestCase {
List<Group> fieldCommandGroups = fieldCommand.getValues();
- if (prop.getName().startsWith("intGSF") && fieldCommandGroups.size() == 3) { // TODO: why?
+ if (prop.getName().endsWith("GSF") && fieldCommandGroups.size() == 3) { // TODO: why?
// this can rarely end up with 3 instead of 4
expected = 3;
assertEquals("Did not find the expected number of groups for field " + prop.getName(), expected, fieldCommandGroups.size());
diff --git a/solr/core/src/test/org/apache/solr/cloud/SolrCloudBridgeTestCase.java b/solr/core/src/test/org/apache/solr/cloud/SolrCloudBridgeTestCase.java
index c681684..fe2f545 100644
--- a/solr/core/src/test/org/apache/solr/cloud/SolrCloudBridgeTestCase.java
+++ b/solr/core/src/test/org/apache/solr/cloud/SolrCloudBridgeTestCase.java
@@ -250,6 +250,7 @@ public abstract class SolrCloudBridgeTestCase extends SolrCloudTestCase {
}
clients.clear();
controlClient = null;
+ cluster = null;
super.tearDown();
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index a4502c6..a288fe7 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -297,6 +297,11 @@ public class SolrZkClient implements Closeable {
return exists(path, watcher, true);
}
+ public Stat exists(final String path, final Watcher watcher, boolean retryOnConnLoss)
+ throws KeeperException, InterruptedException {
+ return exists(path, watcher, retryOnConnLoss, true);
+ }
+
/**
* Return the stat of the node of the given path. Return null if no such a
* node exists.
@@ -314,11 +319,11 @@ public class SolrZkClient implements Closeable {
* @throws InterruptedException If the server transaction is interrupted.
* @throws IllegalArgumentException if an invalid path is specified
*/
- public Stat exists(final String path, final Watcher watcher, boolean retryOnConnLoss)
+ public Stat exists(final String path, final Watcher watcher, boolean retryOnConnLoss, boolean retryOnSessionExpiration)
throws KeeperException, InterruptedException {
ZooKeeper keeper = connManager.getKeeper();
if (retryOnConnLoss) {
- return ZkCmdExecutor.retryOperation(zkCmdExecutor, () -> keeper.exists(path, watcher == null ? null : wrapWatcher(watcher)));
+ return ZkCmdExecutor.retryOperation(zkCmdExecutor, () -> keeper.exists(path, watcher == null ? null : wrapWatcher(watcher)), retryOnSessionExpiration);
} else {
return keeper.exists(path, watcher == null ? null : wrapWatcher(watcher));
}
@@ -360,9 +365,14 @@ public class SolrZkClient implements Closeable {
public List<String> getChildren(final String path, final Watcher watcher, Stat stat, boolean retryOnConnLoss)
throws KeeperException, InterruptedException {
+ return getChildren(path, watcher, stat, retryOnConnLoss, true);
+ }
+
+ public List<String> getChildren(final String path, final Watcher watcher, Stat stat, boolean retryOnConnLoss, boolean retrySessionExpiration)
+ throws KeeperException, InterruptedException {
ZooKeeper keeper = connManager.getKeeper();
if (retryOnConnLoss) {
- return ZkCmdExecutor.retryOperation(zkCmdExecutor, () -> keeper.getChildren(path, watcher == null ? null : wrapWatcher(watcher), stat));
+ return ZkCmdExecutor.retryOperation(zkCmdExecutor, () -> keeper.getChildren(path, watcher == null ? null : wrapWatcher(watcher), stat), retrySessionExpiration);
} else {
return keeper.getChildren(path, watcher == null ? null : wrapWatcher(watcher));
}
@@ -372,17 +382,22 @@ public class SolrZkClient implements Closeable {
return getData(path, watcher, stat, true);
}
+ public byte[] getData(final String path, final Watcher watcher, final Stat stat, boolean retryOnConnLoss)
+ throws KeeperException, InterruptedException {
+ return getData(path, watcher, stat, retryOnConnLoss, true);
+ }
+
/**
* Returns node's data
*/
- public byte[] getData(final String path, final Watcher watcher, final Stat stat, boolean retryOnConnLoss)
+ public byte[] getData(final String path, final Watcher watcher, final Stat stat, boolean retryOnConnLoss, boolean retryOnSessionExpiration)
throws KeeperException, InterruptedException {
ZooKeeper keeper = connManager.getKeeper();
if (retryOnConnLoss && zkCmdExecutor != null) {
if (keeper == null) {
throw new IllegalStateException();
}
- return ZkCmdExecutor.retryOperation(zkCmdExecutor, () -> keeper.getData(path, watcher == null ? null : wrapWatcher(watcher), stat));
+ return ZkCmdExecutor.retryOperation(zkCmdExecutor, () -> keeper.getData(path, watcher == null ? null : wrapWatcher(watcher), stat), retryOnSessionExpiration);
} else {
return keeper.getData(path, watcher == null ? null : wrapWatcher(watcher), stat);
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 3c78e21..7da7290 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -664,7 +664,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
private void refreshCollectionList() throws KeeperException, InterruptedException {
List<String> children = null;
try {
- children = zkClient.getChildren(COLLECTIONS_ZKNODE, null, true);
+ children = zkClient.getChildren(COLLECTIONS_ZKNODE, null, null,true, false);
} catch (KeeperException.NoNodeException e) {
log.warn("Error fetching collection names: [{}]", e.getMessage());
// fall through
@@ -808,7 +808,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
try {
Stat stat = new Stat();
- List<String> nodeList = zkClient.getChildren(LIVE_NODES_ZKNODE, null, stat, true);
+ List<String> nodeList = zkClient.getChildren(LIVE_NODES_ZKNODE, null, stat, true, false);
this.liveNodesVersion = stat.getCversion();
newLiveNodes = new TreeSet<>(nodeList);
} catch (KeeperException.NoNodeException e) {
@@ -1473,7 +1473,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
byte[] data = null;
try {
- data = getZkClient().getData(stateUpdatesPath, null, null, true);
+ data = getZkClient().getData(stateUpdatesPath, null, null, true, false);
} catch (NoNodeException e) {
log.info("No node found for {}", stateUpdatesPath);
return;
@@ -1788,6 +1788,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
/**
* Watches /collections children .
*/
+ // MRM TODO: persistent watch
class CollectionsChildWatcher implements Watcher, Closeable {
@Override
@@ -1936,7 +1937,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
int version = 0;
if (exists != null) {
- Stat stateStat = zkClient.exists(collectionPath, null, true);
+ Stat stateStat = zkClient.exists(collectionPath, null, true, false);
if (stateStat != null) {
version = stateStat.getVersion();
if (log.isDebugEnabled()) log.debug("version for cs is {}", version);
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java b/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
index c33996c..f4b45ae 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
@@ -107,7 +107,6 @@ public class SolrCloudTestCase extends SolrTestCase {
}
}
-
@BeforeClass
public static void beforeSolrCloudTestCase() throws Exception {
qtp = getQtp();
@@ -116,12 +115,29 @@ public class SolrCloudTestCase extends SolrTestCase {
@AfterClass
public static void afterSolrCloudTestCase() throws Exception {
+ if (cluster != null) {
+ try {
+ cluster.shutdown();
+ } finally {
+ cluster = null;
+ }
+ }
if (qtp != null) {
IOUtils.closeQuietly(qtp);
qtp = null;
}
}
+ public static void shutdownCluster() throws Exception {
+ if (cluster != null) {
+ try {
+ cluster.shutdown();
+ } finally {
+ cluster = null;
+ }
+ }
+ }
+
/**
* Builder class for a MiniSolrCloudCluster
*/
@@ -304,17 +320,6 @@ public class SolrCloudTestCase extends SolrTestCase {
return new Builder(nodeCount, SolrTestUtil.createTempDir());
}
- @AfterClass
- public static void shutdownCluster() throws Exception {
- if (cluster != null) {
- try {
- cluster.shutdown();
- } finally {
- cluster = null;
- }
- }
- }
-
/* Cluster helper methods ************************************/
/**