You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/03/12 05:05:22 UTC
[lucene-solr] branch reference_impl updated: @1463 Stress
concurrency fix and tweaks.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl by this push:
new c42cba5 @1463 Stress concurrency fix and tweaks.
c42cba5 is described below
commit c42cba5981b8b0989146224f06cf46f25da61a9d
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Mar 11 23:04:47 2021 -0600
@1463 Stress concurrency fix and tweaks.
Took 1 hour 4 minutes
---
.../java/org/apache/solr/cloud/ZkController.java | 2 +-
.../apache/solr/cloud/overseer/ZkStateWriter.java | 46 +++++++++---------
.../java/org/apache/solr/common/cloud/Replica.java | 4 ++
.../apache/solr/common/cloud/ZkStateReader.java | 54 ++++++++++++++--------
.../apache/solr/cloud/MiniSolrCloudCluster.java | 4 +-
5 files changed, 64 insertions(+), 46 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 8d46016..559af67 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -1325,7 +1325,7 @@ public class ZkController implements Closeable, Runnable {
break;
}
try {
- Replica leader = zkStateReader.getLeaderRetry(collection, shardId, 1500, true);
+ Replica leader = zkStateReader.getLeaderRetry(collection, shardId, 10000, true);
leaderName = leader.getName();
} catch (TimeoutException timeoutException) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 7d5476c..701d367 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -215,14 +215,15 @@ public class ZkStateWriter {
continue;
} else {
if (log.isDebugEnabled()) log.debug("state cmd entry {} asOverseerCmd={}", entry, OverseerAction.get(entry.getKey()));
- String id = entry.getKey();
+ String fullId = entry.getKey();
+ String id = fullId.substring(fullId.indexOf("-") + 1);
String stateString = (String) entry.getValue();
if (log.isDebugEnabled()) {
log.debug("stateString={}", stateString);
}
- long collectionId = Long.parseLong(id.split("-")[0]);
+ long collectionId = Long.parseLong(fullId.split("-")[0]);
String collection = idToCollection.get(collectionId);
if (collection == null) {
log.info("collection for id={} is null", collectionId);
@@ -265,7 +266,7 @@ public class ZkStateWriter {
dirtyState.add(docColl.getName());
blockedNodes.add((String) entry.getValue());
StateUpdate update = new StateUpdate();
- update.id = replica.getId();
+ update.id = replica.getInternalId();
update.state = Replica.State.getShortState(Replica.State.DOWN);
updates.add(update);
@@ -287,7 +288,7 @@ public class ZkStateWriter {
dirtyState.add(docColl.getName());
// blockedNodes.add((String) entry.getValue());
StateUpdate update = new StateUpdate();
- update.id = replica.getId();
+ update.id = replica.getInternalId();
update.state = Replica.State.getShortState(Replica.State.RECOVERING);
updates.add(update);
}
@@ -393,7 +394,7 @@ public class ZkStateWriter {
r.getProperties().remove("leader");
}
}
- updates.getProperties().put(replica.getId(), "l");
+ updates.getProperties().put(replica.getInternalId(), "l");
dirtyState.add(collection);
} else {
Replica.State s = Replica.State.getState(setState);
@@ -401,7 +402,7 @@ public class ZkStateWriter {
if (existingLeader != null && existingLeader.getName().equals(replica.getName())) {
docColl.getSlice(replica).setLeader(null);
}
- updates.getProperties().put(replica.getId(), Replica.State.getShortState(s));
+ updates.getProperties().put(replica.getInternalId(), Replica.State.getShortState(s));
log.debug("set state {} {}", state, replica);
replica.setState(s);
dirtyState.add(collection);
@@ -422,29 +423,26 @@ public class ZkStateWriter {
}
}
+ String coll = entry.getKey();
+ dirtyState.add(coll);
+ Integer ver = trackVersions.get(coll);
+ if (ver == null) {
+ ver = 0;
+ }
+ ZkNodeProps updateMap = stateUpdates.get(coll);
+ if (updateMap == null) {
+ updateMap = new ZkNodeProps();
+ stateUpdates.put(coll, updateMap);
+ }
+ updateMap.getProperties().put("_cs_ver_", ver.toString());
+ for (StateUpdate theUpdate : entry.getValue()) {
+ updateMap.getProperties().put(theUpdate.id, theUpdate.state);
+ }
} finally {
collState.collLock.unlock();
}
}
- for (Map.Entry<String,List<StateUpdate>> update : collStateUpdates.entrySet()) {
-
- String coll = update.getKey();
- dirtyState.add(coll);
- Integer ver = trackVersions.get(coll);
- if (ver == null) {
- ver = 0;
- }
- ZkNodeProps updateMap = stateUpdates.get(coll);
- if (updateMap == null) {
- updateMap = new ZkNodeProps();
- stateUpdates.put(coll, updateMap);
- }
- updateMap.getProperties().put("_cs_ver_", ver.toString());
- for (StateUpdate theUpdate : update.getValue()) {
- updateMap.getProperties().put(theUpdate.id, theUpdate.state);
- }
- }
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
index 957c361..ae9f960 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
@@ -214,6 +214,10 @@ public class Replica extends ZkNodeProps {
return collectionId + "-" + (id == null ? null : id.toString());
}
+ public String getInternalId() {
+ return id.toString();
+ }
+
public Long getCollectionId() {
return collectionId;
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index b10bda3..d52bb80 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -391,13 +391,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
log.error("problem fetching update collection state", e);
return;
}
- String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(name);
- try {
- newState = getAndProcessStateUpdates(name, stateUpdatesPath, false, newState, null);
- } catch (Exception e) {
- log.error("", e);
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
- }
+
if (updateWatchedCollection(name, newState, false)) {
constructState(newState);
}
@@ -428,14 +422,6 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
return;
}
- String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(name);
- try {
- newState = getAndProcessStateUpdates(name, stateUpdatesPath, false, newState, null);
- } catch (Exception e) {
- log.error("", e);
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
- }
-
if (updateWatchedCollection(name, newState, false)) {
updatedCollections.add(newState);
}
@@ -823,8 +809,6 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
try {
DocCollection cdc = getCollectionLive(ZkStateReader.this, collName);
if (cdc != null) {
- String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(collName);
- cdc = getAndProcessStateUpdates(collName, stateUpdatesPath, true, cdc, null);
cdc.setCreatedLazy();
lastUpdateTime = System.nanoTime();
cachedDocCollection = cdc;
@@ -1828,8 +1812,28 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
DocCollection result = null;
try {
log.debug("get and process state updates for {}", coll);
+
+ Stat stat;
+ try {
+ stat = getZkClient().exists(stateUpdatesPath, null,true);
+ if (stat == null) {
+ return docCollection;
+ }
+ } catch (NoNodeException e) {
+ log.info("No node found for {}", stateUpdatesPath);
+ return docCollection;
+ }
+
+ if (docCollection != null && docCollection.hasStateUpdates()) {
+ int oldVersion = (int) docCollection.getStateUpdates().get("_ver_");
+ if (stat.getVersion() < oldVersion) {
+ if (log.isDebugEnabled()) log.debug("Will not apply state updates, they are for an older set of updates {}, ours is now {}", stat.getVersion(), oldVersion);
+ return docCollection;
+ }
+ }
+
byte[] data = null;
- Stat stat = new Stat();
+ stat = new Stat();
try {
data = getZkClient().getData(stateUpdatesPath, null, stat, true, false);
} catch (NoNodeException e) {
@@ -1879,7 +1883,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
state = Replica.State.shortStateToState((String) entry.getValue());
}
- Replica replica = docCollection.getReplicaById(id);
+ Replica replica = docCollection.getReplicaById(docCollection.getId() + "-" + id);
if (log.isDebugEnabled()) log.debug("Got additional state update {} replica={} id={} ids={} {}", state == null ? "leader" : state, replica, id, docCollection.getReplicaByIds());
if (replica != null) {
@@ -2013,6 +2017,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
}
+
} else {
return null;
}
@@ -2028,6 +2033,17 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
ClusterState state = ClusterState.createFromJson(this, stat.getVersion(), data);
ClusterState.CollectionRef collectionRef = state.getCollectionStates().get(coll);
DocCollection docCollection = collectionRef == null ? null : collectionRef.get();
+
+ if (docCollection != null) {
+ String stateUpdatesPath = ZkStateReader.getCollectionStateUpdatesPath(coll);
+ try {
+ docCollection = getAndProcessStateUpdates(coll, stateUpdatesPath, true, docCollection, null);
+ } catch (Exception e) {
+ log.error("", e);
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ }
+ }
+
return docCollection;
}
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
index 6011adb..d47ffc6 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
@@ -858,12 +858,12 @@ public class MiniSolrCloudCluster {
public void waitForActiveCollection(String collection, int shards, int totalReplicas) {
if (collection == null) throw new IllegalArgumentException("null collection");
- waitForActiveCollection(collection, 10, TimeUnit.SECONDS, shards, totalReplicas);
+ waitForActiveCollection(collection, 120, TimeUnit.SECONDS, shards, totalReplicas);
}
public void waitForActiveCollection(String collection, int shards, int totalReplicas, boolean exact) {
- waitForActiveCollection(collection, 10, TimeUnit.SECONDS, shards, totalReplicas, exact);
+ waitForActiveCollection(collection, 120, TimeUnit.SECONDS, shards, totalReplicas, exact);
}
public void waitForJettyToStop(JettySolrRunner runner) throws TimeoutException {