You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/01/20 21:51:33 UTC
[lucene-solr] 02/02: @1261 Some cleanup around leader election and
stale state and schemaless.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 07e623177055dc2188efb9e286f5f3116767db4d
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Jan 20 15:50:53 2021 -0600
@1261 Some cleanup around leader election and stale state and schemaless.
---
.../org/apache/solr/cloud/ElectionContext.java | 2 +-
.../java/org/apache/solr/cloud/LeaderElector.java | 50 +++-----
.../apache/solr/cloud/OverseerElectionContext.java | 5 +-
.../org/apache/solr/cloud/RecoveryStrategy.java | 35 +++---
.../solr/cloud/ShardLeaderElectionContext.java | 17 +--
.../solr/cloud/ShardLeaderElectionContextBase.java | 4 +-
.../java/org/apache/solr/cloud/StatePublisher.java | 20 +--
.../java/org/apache/solr/cloud/ZkController.java | 19 ++-
.../java/org/apache/solr/cloud/ZkShardTerms.java | 11 --
.../java/org/apache/solr/core/CoreContainer.java | 35 +++++-
.../src/java/org/apache/solr/core/SolrCore.java | 9 ++
.../java/org/apache/solr/handler/IndexFetcher.java | 4 +-
.../org/apache/solr/handler/SchemaHandler.java | 2 +-
.../apache/solr/handler/admin/PrepRecoveryOp.java | 2 +-
.../handler/component/RealTimeGetComponent.java | 2 +-
.../apache/solr/handler/loader/JavabinLoader.java | 9 +-
.../org/apache/solr/request/SolrQueryRequest.java | 2 -
.../apache/solr/request/SolrQueryRequestBase.java | 6 -
.../java/org/apache/solr/schema/IndexSchema.java | 102 +++++++++-------
.../org/apache/solr/schema/ManagedIndexSchema.java | 135 ++++++++++++---------
.../solr/schema/ManagedIndexSchemaFactory.java | 6 +-
.../apache/solr/schema/ZkIndexSchemaReader.java | 18 +--
.../apache/solr/update/DefaultSolrCoreState.java | 4 +-
.../java/org/apache/solr/update/UpdateCommand.java | 4 +-
.../AddSchemaFieldsUpdateProcessorFactory.java | 2 +-
.../org/apache/solr/cloud/LeaderElectionTest.java | 3 +-
.../solr/cloud/MissingSegmentRecoveryTest.java | 3 +-
.../org/apache/solr/schema/SchemaWatcherTest.java | 2 +-
.../org/apache/solr/schema/TestPointFields.java | 16 ++-
.../apache/solr/common/cloud/ZkStateReader.java | 2 +-
.../ConcurrentUpdateSolrClientBuilderTest.java | 3 +-
31 files changed, 289 insertions(+), 245 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
index e66622d..e0d775e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
@@ -47,7 +47,7 @@ public abstract class ElectionContext {
protected void cancelElection() throws InterruptedException, KeeperException {
}
- abstract void runLeaderProcess(ElectionContext context, boolean weAreReplacement, int pauseBeforeStartMs) throws KeeperException, InterruptedException, IOException;
+ abstract boolean runLeaderProcess(ElectionContext context, boolean weAreReplacement, int pauseBeforeStartMs) throws KeeperException, InterruptedException, IOException;
public void checkIfIamLeaderFired() {}
diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
index 1fd6b69..090dc83 100644
--- a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
+++ b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
@@ -25,7 +25,6 @@ import org.apache.solr.common.cloud.SolrZooKeeper;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.ObjectReleaseTracker;
-import org.apache.solr.core.SolrCore;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
@@ -213,22 +212,15 @@ public class LeaderElector implements Closeable {
}
watcher = new ElectionWatcher(context.leaderSeqPath, watchedNode, context);
- zkClient.exists(watchedNode, watcher);
+ Stat exists = zkClient.exists(watchedNode, watcher);
+ if (exists == null) {
+ state = OUT_OF_ELECTION;
+ return true;
+ }
+
state = WAITING_IN_ELECTION;
if (log.isDebugEnabled()) log.debug("Watching path {} to know if I could be the leader, my node is {}", watchedNode, context.leaderSeqPath);
- if (context instanceof ShardLeaderElectionContext) {
- log.info("Start recovery for core {}", context.leaderProps.getName());
- try (SolrCore core = zkController.getCoreContainer().getCore(context.leaderProps.getName())) {
- if (core != null) {
- // if (!core.getSolrCoreState().isRecoverying()) {
- core.getSolrCoreState().doRecovery(core);
- // }
- } else {
- log.warn("No core found to start recovery with {}", context.leaderProps.getName());
- }
- }
- }
return false;
} catch (KeeperException.SessionExpiredException e) {
state = OUT_OF_ELECTION;
@@ -275,10 +267,13 @@ public class LeaderElector implements Closeable {
throw new IllegalStateException("Already in leader state");
}
- context.runLeaderProcess(context, weAreReplacement,0);
+ boolean success = context.runLeaderProcess(context, weAreReplacement, 0);
-
- state = LEADER;
+ if (success) {
+ state = LEADER;
+ } else {
+ state = OUT_OF_ELECTION;
+ }
}
/**
@@ -436,25 +431,8 @@ public class LeaderElector implements Closeable {
while (tryagain) {
tryagain = checkIfIamLeader(context, replacement);
-
- if (tryagain) {
- try {
- try (SolrCore core = zkController.getCoreContainer().getCore(context.leaderProps.getName())) {
- if (core != null) {
- if (!core.getSolrCoreState().isRecoverying()) {
- core.getSolrCoreState().doRecovery(core);
- }
- }
- }
- } catch (Exception e) {
- log.error("Exception trying to kick off or check for recovery", e);
- }
-
- }
-
}
-
// boolean tryagain = false;
// while (tryagain) {
// tryagain = checkIfIamLeader(context, replacement);
@@ -572,6 +550,7 @@ public class LeaderElector implements Closeable {
}
}
}
+ // we don't kick off recovery here, the leader sync will do that if necessary for its replicas
} catch (AlreadyClosedException | InterruptedException e) {
log.info("Already shutting down");
return;
@@ -620,4 +599,7 @@ public class LeaderElector implements Closeable {
joinElection(true, joinAtHead);
}
+ public boolean isLeader() {
+ return LEADER.equals(state);
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
index db607f0..43feae3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
@@ -50,13 +50,13 @@ final class OverseerElectionContext extends ShardLeaderElectionContextBase {
}
@Override
- void runLeaderProcess(ElectionContext context, boolean weAreReplacement, int pauseBeforeStartMs) throws KeeperException,
+ boolean runLeaderProcess(ElectionContext context, boolean weAreReplacement, int pauseBeforeStartMs) throws KeeperException,
InterruptedException, IOException {
log.info("Running the leader process for Overseer");
if (overseer.isDone()) {
log.info("Already closed, bailing ...");
- return;
+ return false;
}
// TODO: the idea here is that we could clear the Overseer queue
@@ -87,6 +87,7 @@ final class OverseerElectionContext extends ShardLeaderElectionContextBase {
log.info("Will not start Overseer because we are closed");
}
+ return true;
}
public Overseer getOverseer() {
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index c5406a8..2ecf60b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -56,6 +56,7 @@ import org.apache.solr.util.plugin.NamedListInitializedPlugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTIONS_ZKNODE;
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
@@ -180,27 +181,22 @@ public class RecoveryStrategy implements Runnable, Closeable {
final public void close() {
close = true;
- //
- //
- // try {
- // if (prevSendPreRecoveryHttpUriRequest != null) {
- // prevSendPreRecoveryHttpUriRequest.cancel();
- // }
- // prevSendPreRecoveryHttpUriRequest = null;
- // } catch (NullPointerException e) {
- // // expected
- // }
- //
- //
+ try {
+ if (prevSendPreRecoveryHttpUriRequest != null) {
+ prevSendPreRecoveryHttpUriRequest.cancel();
+ }
+ prevSendPreRecoveryHttpUriRequest = null;
+ } catch (NullPointerException e) {
+ // expected
+ }
+
ReplicationHandler finalReplicationHandler = replicationHandler;
if (finalReplicationHandler != null) {
finalReplicationHandler.abortFetch();
}
if (latch != null) {
-
latch.countDown();
-
}
log.warn("Stopping recovery for core=[{}]", coreName);
@@ -345,7 +341,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
try {
try {
Replica leader = zkController.getZkStateReader().getLeaderRetry(coreDescriptor.getCollectionName(), coreDescriptor.getCloudDescriptor().getShardId(), 1500);
- if (leader != null && leader.getName().equals(coreName)) {
+ if (leader != null && leader.getName().equals(coreName) && zkController.getZkClient().exists(COLLECTIONS_ZKNODE + "/" + coreDescriptor.getCollectionName() + "/leaders/" + coreDescriptor.getCloudDescriptor().getShardId() + "/leader")) {
log.info("We are the leader, STOP recovery");
close = true;
return;
@@ -356,10 +352,11 @@ public class RecoveryStrategy implements Runnable, Closeable {
return;
}
} catch (InterruptedException e) {
- log.info("InterruptedException, won't do recovery", e);
+ log.info("InterruptedException", e);
throw new SolrException(ErrorCode.BAD_REQUEST, e);
} catch (TimeoutException e) {
- log.info("Timeout waiting for leader, won't do recovery", e);
+ log.info("Timeout waiting for leader", e);
+ continue;
// throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
@@ -604,8 +601,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
// } catch (NullPointerException e) {
// // okay
// }
-
- // sendPrepRecoveryCmd(leader.getBaseUrl(), leader.getName(), slice);
+ // TODO can we do this with commit on leader
+ sendPrepRecoveryCmd(leader.getBaseUrl(), leader.getName(), zkStateReader.getClusterState().getCollection(coreDescriptor.getCollectionName()).getSlice(cloudDesc.getShardId()));
// we wait a bit so that any updates on the leader
// that started before they saw recovering state
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
index f02e7d3..8256084 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
@@ -93,7 +93,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
* weAreReplacement: has someone else been the leader already?
*/
@Override
- void runLeaderProcess(ElectionContext context, boolean weAreReplacement, int pauseBeforeStart) throws KeeperException,
+ boolean runLeaderProcess(ElectionContext context, boolean weAreReplacement, int pauseBeforeStart) throws KeeperException,
InterruptedException, IOException {
String coreName = leaderProps.getName();
@@ -106,7 +106,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
}
if (core.isClosing() || core.getCoreContainer().isShutDown()) {
log.info("We are closed, will not become leader");
- return;
+ return false;
}
try {
ActionThrottle lt;
@@ -138,7 +138,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
if (zkShardTerms.registered(coreName) && !zkShardTerms.canBecomeLeader(coreName)) {
if (!waitForEligibleBecomeLeaderAfterTimeout(zkShardTerms, coreName, leaderVoteWait)) {
rejoinLeaderElection(core);
- return;
+ return false;
} else {
// only log an error if this replica win the election
setTermToMax = true;
@@ -150,7 +150,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
PeerSync.PeerSyncResult result = null;
boolean success = false;
if (core.getCoreContainer().isShutDown()) {
- return;
+ return false;
}
result = syncStrategy.sync(zkController, core, leaderProps, weAreReplacement);
log.info("Sync strategy sync result {}", result);
@@ -177,7 +177,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
if (result.getOtherHasVersions().orElse(false)) {
log.info("We failed sync, but we have no versions - we can't sync in that case. But others have some versions, so we should not become leader");
rejoinLeaderElection(core);
- return;
+ return false;
} else {
log.info("We failed sync, but we have no versions - we can't sync in that case - we did not find versions on other replicas, so become leader anyway");
success = true;
@@ -204,7 +204,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
if (!success) {
log.info("Sync with potential leader failed, rejoining election ...");
rejoinLeaderElection(core);
- return;
+ return false;
}
if (replicaType == Replica.Type.TLOG) {
@@ -250,13 +250,16 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
// we could not publish ourselves as leader - try and rejoin election
rejoinLeaderElection(core);
+ return false;
}
+
} catch (AlreadyClosedException e) {
log.info("CoreContainer is shutting down, won't become leader");
} finally {
MDCLoggingContext.clear();
}
+ return true;
}
/**
@@ -316,8 +319,6 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
log.info("There may be a better leader candidate than us - will cancel election, rejoin election, and kick off recovery");
leaderElector.retryElection(false);
-
- core.getUpdateHandler().getSolrCoreState().doRecovery(core);
}
public String getShardId() {
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
index aa5cc80..b11c180 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
@@ -138,7 +138,7 @@ class ShardLeaderElectionContextBase extends ElectionContext {
}
@Override
- synchronized void runLeaderProcess(ElectionContext context, boolean weAreReplacement, int pauseBeforeStartMs)
+ synchronized boolean runLeaderProcess(ElectionContext context, boolean weAreReplacement, int pauseBeforeStartMs)
throws KeeperException, InterruptedException, IOException {
// register as leader - if an ephemeral is already there, wait to see if it goes away
@@ -188,7 +188,7 @@ class ShardLeaderElectionContextBase extends ElectionContext {
ParWork.propagateInterrupt(t);
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not register as the leader because creating the ephemeral registration node in ZooKeeper failed: " + errors, t);
}
-
+ return true;
}
@Override
diff --git a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
index 2b83a81..341ffa3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
+++ b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
@@ -30,7 +30,6 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
-import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
@@ -41,6 +40,7 @@ public class StatePublisher implements Closeable {
.getLogger(MethodHandles.lookup().lookupClass());
private final Map<String,String> stateCache = new ConcurrentHashMap<>(32, 0.75f, 4);
+ private final ZkStateReader zkStateReader;
public static class NoOpMessage extends ZkNodeProps {
}
@@ -131,8 +131,9 @@ public class StatePublisher implements Closeable {
}
}
- public StatePublisher(ZkDistributedQueue overseerJobQueue) {
+ public StatePublisher(ZkDistributedQueue overseerJobQueue, ZkStateReader zkStateReader) {
this.overseerJobQueue = overseerJobQueue;
+ this.zkStateReader = zkStateReader;
}
public void submitState(ZkNodeProps stateMessage) {
@@ -142,15 +143,16 @@ public class StatePublisher implements Closeable {
if (operation.equals("state")) {
String core = stateMessage.getStr(ZkStateReader.CORE_NAME_PROP);
String state = stateMessage.getStr(ZkStateReader.STATE_PROP);
+ String collection = stateMessage.getStr(ZkStateReader.COLLECTION_PROP);
-
+ Replica replica = zkStateReader.getClusterState().getCollection(collection).getReplica(core);
String lastState = stateCache.get(core);
// nocommit
-// if (state.equals(lastState) && !Replica.State.ACTIVE.toString().toLowerCase(Locale.ROOT).equals(state)) {
-// log.info("Skipping publish state as {} for {}, because it was the last state published", state, core);
-// // nocommit
-// return;
-// }
+ if (collection != null && replica != null && state.equals(lastState) && replica.getState().toString().equals(state)) {
+ log.info("Skipping publish state as {} for {}, because it was the last state published", state, core);
+ // nocommit
+ return;
+ }
if (core == null || state == null) {
log.error("Nulls in published state");
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Nulls in published state " + stateMessage);
@@ -158,7 +160,7 @@ public class StatePublisher implements Closeable {
stateCache.put(core, state);
} else if (operation.equalsIgnoreCase("downnode")) {
- // nocommit - set all statecache entries for replica to DOWN
+ // set all statecache entries for replica to DOWN
} else {
throw new IllegalArgumentException(stateMessage.toString());
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 6048255..f1c7416 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -430,8 +430,6 @@ public class ZkController implements Closeable, Runnable {
this.overseerJobQueue = overseer.getStateUpdateQueue();
this.overseerCollectionQueue = overseer.getCollectionQueue(zkClient);
this.overseerConfigSetQueue = overseer.getConfigSetQueue(zkClient);
- statePublisher = new StatePublisher(overseerJobQueue);
- statePublisher.start();
boolean isRegistered = SolrLifcycleListener.isRegistered(this);
if (!isRegistered) {
@@ -1156,6 +1154,8 @@ public class ZkController implements Closeable, Runnable {
zkStateReader.createClusterStateWatchersAndUpdate();
+ statePublisher = new StatePublisher(overseerJobQueue, zkStateReader);
+ statePublisher.start();
this.sysPropsCacher = new NodesSysPropsCacher(getSolrCloudManager().getNodeStateProvider(), getNodeName(), zkStateReader);
overseerElector = new LeaderElector(this, new ContextKey("overseer", "overseer"));
@@ -1411,13 +1411,21 @@ public class ZkController implements Closeable, Runnable {
if (log.isDebugEnabled()) log.debug("Wait to see leader for {}, {}", collection, shardId);
Replica leader = null;
for (int i = 0; i < 30; i++) {
+// if (leaderElector.isLeader()) {
+// leader = replica;
+// break;
+// }
+
try {
if (getCoreContainer().isShutDown() || isDcCalled() || isClosed()) {
throw new AlreadyClosedException();
}
leader = zkStateReader.getLeaderRetry(collection, shardId, 500);
- break;
+
+ if (zkClient.exists(COLLECTIONS_ZKNODE + "/" + collection + "/leaders/" + shardId + "/leader")) {
+ break;
+ }
} catch (TimeoutException timeoutException) {
}
@@ -1493,13 +1501,14 @@ public class ZkController implements Closeable, Runnable {
}
}
- desc.getCloudDescriptor().setHasRegistered(true);
-
// the watcher is added to a set so multiple calls of this method will left only one watcher
// nocommit
registerUnloadWatcher(cloudDesc.getCollectionName(), cloudDesc.getShardId(), desc.getName());
log.info("SolrCore Registered, core{} baseUrl={} collection={}, shard={}", coreName, baseUrl, collection, shardId);
+
+ desc.getCloudDescriptor().setHasRegistered(true);
+
return shardId;
} finally {
if (isDcCalled() || isClosed()) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
index 5851483..d9eb9a3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -76,8 +76,6 @@ public class ZkShardTerms implements Closeable {
private final Set<CoreTermWatcher> listeners = ConcurrentHashMap.newKeySet();
private final AtomicBoolean isClosed = new AtomicBoolean(false);
- private final Object termUpdate = new Object();
-
private final AtomicReference<ShardTerms> terms = new AtomicReference<>();
/**
@@ -326,10 +324,7 @@ public class ZkShardTerms implements Closeable {
*/
private boolean saveTerms(ShardTerms newTerms) throws KeeperException, InterruptedException {
byte[] znodeData = Utils.toJSON(newTerms);
- ShardTerms terms = this.terms.get();
- int version = 0;
try {
- version = newTerms.getVersion();
Stat stat = zkClient.setData(znodePath, znodeData, newTerms.getVersion(), true);
ShardTerms newShardTerms = new ShardTerms(newTerms, stat.getVersion());
setNewTerms(newShardTerms);
@@ -341,9 +336,6 @@ public class ZkShardTerms implements Closeable {
if (isClosed.get()) {
throw new AlreadyClosedException();
}
- synchronized (termUpdate) {
- termUpdate.wait(250);
- }
refreshTerms(false);
}
@@ -430,8 +422,5 @@ public class ZkShardTerms implements Closeable {
private void onTermUpdates(ShardTerms newTerms) {
listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(newTerms));
- synchronized (termUpdate) {
- termUpdate.notifyAll();
- }
}
}
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index cba6827..f25ea91 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -32,7 +32,6 @@ import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.SolrHttpClientBuilder;
import org.apache.solr.client.solrj.impl.SolrHttpClientContextBuilder;
-import org.apache.solr.client.solrj.impl.XMLResponseParser;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.util.SolrIdentifierValidator;
import org.apache.solr.cloud.CloudDescriptor;
@@ -89,7 +88,6 @@ import org.apache.solr.metrics.SolrMetricsContext;
import org.apache.solr.pkg.PackageLoader;
import org.apache.solr.request.SolrRequestHandler;
import org.apache.solr.request.SolrRequestInfo;
-import org.apache.solr.rest.schema.FieldTypeXmlAdapter;
import org.apache.solr.search.SolrFieldCacheBean;
import org.apache.solr.security.AuditLoggerPlugin;
import org.apache.solr.security.AuthenticationPlugin;
@@ -149,6 +147,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
/**
@@ -918,7 +917,37 @@ public class CoreContainer implements Closeable {
}
}
if (zkSys != null && zkSys.getZkController() != null) {
- zkSys.getZkController().createEphemeralLiveNode();
+ ParWork.getRootSharedExecutor().submit(() -> {
+// Collection<SolrCore> cores = getCores();
+// for (SolrCore core : cores) {
+// CoreDescriptor desc = core.getCoreDescriptor();
+// String collection = desc.getCollectionName();
+// try {
+// zkSys.getZkController().zkStateReader.waitForState(collection, 10, TimeUnit.SECONDS, (n, c) -> {
+// if (c != null) {
+// List<Replica> replicas = c.getReplicas();
+// for (Replica replica : replicas) {
+// if (replica.getNodeName().equals(zkSys.getZkController().getNodeName())) {
+// if (!replica.getState().equals(Replica.State.DOWN)) {
+// //log.info("Found state {} {}", replica.getState(), replica.getNodeName());
+// return false;
+// }
+// }
+// }
+// }
+// return true;
+// });
+// } catch (InterruptedException e) {
+// ParWork.propagateInterrupt(e);
+// return;
+// } catch (TimeoutException e) {
+// log.error("Timeout", e);
+// }
+// }
+
+ zkSys.getZkController().createEphemeralLiveNode();
+ });
+
}
} finally {
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 8d06233..e363319 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -717,6 +717,12 @@ public final class SolrCore implements SolrInfoBean, Closeable {
if (coreContainer.isShutDown() || isClosed() || closing) {
throw new AlreadyClosedException();
}
+ ReentrantLock schemaLock = null;
+ if (schema instanceof ManagedIndexSchema) {
+ schemaLock = ((ManagedIndexSchema) schema).getSchemaLock();
+ schemaLock.lock();
+ }
+
// only one reload at a time
ReentrantLock lock = getUpdateHandler().getSolrCoreState().getReloadLock();
boolean locked = lock.tryLock();
@@ -802,6 +808,9 @@ public final class SolrCore implements SolrInfoBean, Closeable {
if (lock != null && lock.isHeldByCurrentThread()) {
lock.unlock();
}
+ if (schemaLock != null && schemaLock.isHeldByCurrentThread()) {
+ schemaLock.unlock();
+ }
}
}
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index be65b25..ab4821b 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -619,7 +619,7 @@ public class IndexFetcher {
if (successfulInstall) deleteTmpIdxDir = false;
} else {
terminateAndWaitFsyncService();
- moveIndexFiles(tmpIndexDir, indexDir);
+ successfulInstall = moveIndexFiles(tmpIndexDir, indexDir);
}
if (successfulInstall) {
if (isFullCopyNeeded) {
@@ -649,7 +649,7 @@ public class IndexFetcher {
if (successfulInstall) deleteTmpIdxDir = false;
} else if (successfulInstall) {
terminateAndWaitFsyncService();
- moveIndexFiles(tmpIndexDir, indexDir);
+ successfulInstall = moveIndexFiles(tmpIndexDir, indexDir);
if (!successfulInstall) {
log.error("Move index files failed");
diff --git a/solr/core/src/java/org/apache/solr/handler/SchemaHandler.java b/solr/core/src/java/org/apache/solr/handler/SchemaHandler.java
index 3cde92d..df71dc9 100644
--- a/solr/core/src/java/org/apache/solr/handler/SchemaHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/SchemaHandler.java
@@ -170,7 +170,7 @@ public class SchemaHandler extends RequestHandlerBase implements SolrCoreAware,
log.info("REFRESHING SCHEMA (refreshIfBelowVersion={}, currentVersion={}) before returning version!"
, refreshIfBelowVersion, zkVersion);
ZkIndexSchemaReader zkIndexSchemaReader = managed.getManagedIndexSchemaFactory().getZkIndexSchemaReader();
- managed = (ManagedIndexSchema) zkIndexSchemaReader.updateSchema(null, -1, req.getCore());
+ managed = (ManagedIndexSchema) zkIndexSchemaReader.updateSchema();
zkVersion = managed.getSchemaZkVersion();
}
}
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
index dadaa8b..b31795a 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
@@ -68,7 +68,7 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
AtomicReference<String> errorMessage = new AtomicReference<>();
try {
- coreContainer.getZkController().getZkStateReader().waitForState(collection, 15, TimeUnit.SECONDS, (n, c) -> {
+ coreContainer.getZkController().getZkStateReader().waitForState(collection, 5, TimeUnit.SECONDS, (n, c) -> {
if (c == null) {
log.info("collection not found {}", collection);
return false;
diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
index 160e022..a7d71d1 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
@@ -1144,7 +1144,7 @@ public class RealTimeGetComponent extends SearchComponent
// TODO: more complex response?
rb.rsp.add("sync", success);
- if (!success && rb.req.getCore().getCoreContainer().isZooKeeperAware()) {
+ if (!success && rb.req.getCore().getCoreContainer().isZooKeeperAware() && !rb.req.getCore().getSolrCoreState().isRecoverying()) {
rb.req.getCore().getSolrCoreState().doRecovery(rb.req.getCore().getCoreContainer(), rb.req.getCore().getCoreDescriptor());
}
} catch (IOException e) {
diff --git a/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java b/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
index f785eb2..e8c9642 100644
--- a/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
+++ b/solr/core/src/java/org/apache/solr/handler/loader/JavabinLoader.java
@@ -42,7 +42,6 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
-import org.apache.solr.schema.SchemaField;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.processor.UpdateRequestProcessor;
@@ -84,16 +83,15 @@ public class JavabinLoader extends ContentStreamLoader {
}
UpdateRequest update = null;
JavaBinUpdateRequestCodec.StreamingUpdateHandler handler = new JavaBinUpdateRequestCodec.StreamingUpdateHandler() {
- private AddUpdateCommand addCmd = null;
@Override
public void update(SolrInputDocument document, UpdateRequest updateRequest, Integer commitWithin, Boolean overwrite) {
if (document == null) {
return;
}
- if (addCmd == null) {
- addCmd = getAddCommand(req, updateRequest.getParams());
- }
+
+ AddUpdateCommand addCmd = getAddCommand(req, updateRequest.getParams());
+
addCmd.solrDoc = document;
if (commitWithin != null) {
addCmd.commitWithin = commitWithin;
@@ -109,7 +107,6 @@ public class JavabinLoader extends ContentStreamLoader {
try {
processor.processAdd(addCmd);
- addCmd.clear();
} catch (ZooKeeperException | IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "ERROR adding document " + addCmd.getPrintableId(), e);
}
diff --git a/solr/core/src/java/org/apache/solr/request/SolrQueryRequest.java b/solr/core/src/java/org/apache/solr/request/SolrQueryRequest.java
index 72ee023..2c5090b 100644
--- a/solr/core/src/java/org/apache/solr/request/SolrQueryRequest.java
+++ b/solr/core/src/java/org/apache/solr/request/SolrQueryRequest.java
@@ -86,8 +86,6 @@ public interface SolrQueryRequest extends AutoCloseable {
public IndexSchema getSchema();
/** Replaces the current schema snapshot with the latest from the core. */
- public void updateSchemaToLatest(IndexSchema schema);
-
public void updateSchemaToLatest();
/**
diff --git a/solr/core/src/java/org/apache/solr/request/SolrQueryRequestBase.java b/solr/core/src/java/org/apache/solr/request/SolrQueryRequestBase.java
index 81d022a..c461af3 100644
--- a/solr/core/src/java/org/apache/solr/request/SolrQueryRequestBase.java
+++ b/solr/core/src/java/org/apache/solr/request/SolrQueryRequestBase.java
@@ -139,12 +139,6 @@ public abstract class SolrQueryRequestBase implements SolrQueryRequest, Closeabl
}
@Override
- public void updateSchemaToLatest(IndexSchema schema) {
- this.schema = schema;
- //this.core.setLatestSchema(schema);
- }
-
- @Override
public void updateSchemaToLatest() {
this.schema = core.getLatestSchema();
}
diff --git a/solr/core/src/java/org/apache/solr/schema/IndexSchema.java b/solr/core/src/java/org/apache/solr/schema/IndexSchema.java
index 93796b9..a54a6b6 100644
--- a/solr/core/src/java/org/apache/solr/schema/IndexSchema.java
+++ b/solr/core/src/java/org/apache/solr/schema/IndexSchema.java
@@ -758,8 +758,9 @@ public class IndexSchema {
// in DocumentBuilder.getDoc()
requiredFields.addAll(fieldsWithDefaultValue);
-
- Collections.sort(dynamicFields);
+ synchronized (dynamicFields) {
+ Collections.sort(dynamicFields);
+ }
return explicitRequiredProp;
}
@@ -866,7 +867,9 @@ public class IndexSchema {
addDynamicFieldNoDupCheck(dynFields, field);
}
}
- dynamicFields.addAll(dynFields);
+ synchronized (dynamicFields) {
+ dynamicFields.addAll(dynFields);
+ }
}
private void addDynamicFieldNoDupCheck(List<DynamicField> dFields, SchemaField f) {
@@ -926,28 +929,30 @@ public class IndexSchema {
if (null == destSchemaField || (null == sourceSchemaField && ! sourceIsExplicitFieldGlob)) {
// Go through dynamicFields array only once, collecting info for both source and dest fields, if needed
- for (DynamicField dynamicField : dynamicFields) {
- if (null == sourceSchemaField && ! sourceIsDynamicFieldReference && ! sourceIsExplicitFieldGlob) {
- if (dynamicField.matches(source)) {
- sourceIsDynamicFieldReference = true;
- if ( ! source.equals(dynamicField.getRegex())) {
- sourceDynamicBase = dynamicField;
+ synchronized (dynamicFields) {
+ for (DynamicField dynamicField : dynamicFields) {
+ if (null == sourceSchemaField && !sourceIsDynamicFieldReference && !sourceIsExplicitFieldGlob) {
+ if (dynamicField.matches(source)) {
+ sourceIsDynamicFieldReference = true;
+ if (!source.equals(dynamicField.getRegex())) {
+ sourceDynamicBase = dynamicField;
+ }
}
}
- }
- if (null == destSchemaField) {
- if (dest.equals(dynamicField.getRegex())) {
- destDynamicField = dynamicField;
- destSchemaField = dynamicField.prototype;
- } else if (dynamicField.matches(dest)) {
- destSchemaField = dynamicField.makeSchemaField(dest);
- destDynamicField = new DynamicField(destSchemaField);
- destDynamicBase = dynamicField;
+ if (null == destSchemaField) {
+ if (dest.equals(dynamicField.getRegex())) {
+ destDynamicField = dynamicField;
+ destSchemaField = dynamicField.prototype;
+ } else if (dynamicField.matches(dest)) {
+ destSchemaField = dynamicField.makeSchemaField(dest);
+ destDynamicField = new DynamicField(destSchemaField);
+ destDynamicBase = dynamicField;
+ }
+ }
+ if (null != destSchemaField
+ && (null != sourceSchemaField || sourceIsDynamicFieldReference || sourceIsExplicitFieldGlob)) {
+ break;
}
- }
- if (null != destSchemaField
- && (null != sourceSchemaField || sourceIsDynamicFieldReference || sourceIsExplicitFieldGlob)) {
- break;
}
}
}
@@ -1186,17 +1191,21 @@ public class IndexSchema {
public SchemaField[] getDynamicFieldPrototypes() {
SchemaField[] df = new SchemaField[dynamicFields.size()];
int[] cnt = new int[]{0};
- dynamicFields.forEach(dynamicField -> {
+ synchronized (dynamicFields) {
+ dynamicFields.forEach(dynamicField -> {
df[cnt[0]] = dynamicFields.get(cnt[0]++).prototype;
- });
+ });
+ }
return df;
}
public String getDynamicPattern(String fieldName) {
- for (DynamicField df : dynamicFields) {
- if (df.matches(fieldName)) return df.getRegex();
- }
- return null;
+ synchronized (dynamicFields) {
+ for (DynamicField df : dynamicFields) {
+ if (df.matches(fieldName)) return df.getRegex();
+ }
+ return null;
+ }
}
/**
@@ -1209,9 +1218,10 @@ public class IndexSchema {
if (fields.containsKey(fieldName)) {
return true;
}
-
- for (DynamicField df : dynamicFields) {
- if (fieldName.equals(df.getRegex())) return true;
+ synchronized (dynamicFields) {
+ for (DynamicField df : dynamicFields) {
+ if (fieldName.equals(df.getRegex())) return true;
+ }
}
return false;
@@ -1225,9 +1235,10 @@ public class IndexSchema {
if(fields.containsKey(fieldName)) {
return false;
}
-
- for (DynamicField df : dynamicFields) {
- if (df.matches(fieldName)) return true;
+ synchronized (dynamicFields) {
+ for (DynamicField df : dynamicFields) {
+ if (df.matches(fieldName)) return true;
+ }
}
return false;
@@ -1248,11 +1259,12 @@ public class IndexSchema {
if (f != null) return f;
f = dynamicFieldCache.get(fieldName);
if (f != null) return f;
-
- for (DynamicField df : dynamicFields) {
- if (df.matches(fieldName)) {
- dynamicFieldCache.put(fieldName, f = df.makeSchemaField(fieldName));
- break;
+ synchronized (dynamicFields) {
+ for (DynamicField df : dynamicFields) {
+ if (df.matches(fieldName)) {
+ dynamicFieldCache.put(fieldName, f = df.makeSchemaField(fieldName));
+ break;
+ }
}
}
@@ -1345,15 +1357,19 @@ public class IndexSchema {
* @see #getFieldTypeNoEx
*/
public FieldType getDynamicFieldType(String fieldName) {
- for (DynamicField df : dynamicFields) {
- if (df.matches(fieldName)) return df.prototype.getType();
+ synchronized (dynamicFields) {
+ for (DynamicField df : dynamicFields) {
+ if (df.matches(fieldName)) return df.prototype.getType();
+ }
}
throw new SolrException(ErrorCode.BAD_REQUEST,"undefined field "+fieldName);
}
private FieldType dynFieldType(String fieldName) {
- for (DynamicField df : dynamicFields) {
- if (df.matches(fieldName)) return df.prototype.getType();
+ synchronized (dynamicFields) {
+ for (DynamicField df : dynamicFields) {
+ if (df.matches(fieldName)) return df.prototype.getType();
+ }
}
return null;
}
diff --git a/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java b/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java
index bb4d3c1..bd437b8 100644
--- a/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java
+++ b/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchema.java
@@ -77,6 +77,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
/** Solr-managed schema - non-user-editable, but can be mutable via internal and external REST API requests. */
public final class ManagedIndexSchema extends IndexSchema {
@@ -326,7 +327,11 @@ public final class ManagedIndexSchema extends IndexSchema {
return activeReplicaCoreUrls;
}
- private static class GetZkSchemaVersionCallable extends SolrRequest implements Callable<Integer> {
+ public ReentrantLock getSchemaLock() {
+ return managedIndexSchemaFactory.getSchemaUpdateLock();
+ }
+
+ private static class GetZkSchemaVersionCallable extends SolrRequest implements Callable<Integer> {
private final ConnectionManager.IsClosed isClosed;
private final Http2SolrClient solrClient;
@@ -628,7 +633,9 @@ public final class ManagedIndexSchema extends IndexSchema {
throw new FieldExistsException(ErrorCode.BAD_REQUEST, msg);
}
dFields.add(new DynamicField(newDynamicField));
- newSchema.dynamicFields.addAll(dFields);
+ synchronized (dynamicFields) {
+ newSchema.dynamicFields.addAll(dFields);
+ }
Collection<String> copyFields = copyFieldNames.get(newDynamicField.getName());
if (copyFields != null) {
@@ -671,51 +678,52 @@ public final class ManagedIndexSchema extends IndexSchema {
List<DynamicCopy> dynamicCopyFieldsToRebuild = new ArrayList<>();
List<DynamicCopy> newDynamicCopyFields = new ArrayList<>();
- for (String fieldNamePattern : fieldNamePatterns) {
- DynamicField dynamicField = null;
- int dfPos = 0;
- for ( ; dfPos < newSchema.dynamicFields.size() ; ++dfPos) {
- DynamicField df = newSchema.dynamicFields.get(dfPos);
- if (df.getRegex().equals(fieldNamePattern)) {
- dynamicField = df;
- break;
+ synchronized (dynamicFields) {
+ for (String fieldNamePattern : fieldNamePatterns) {
+ DynamicField dynamicField = null;
+ int dfPos = 0;
+ for (; dfPos < newSchema.dynamicFields.size(); ++dfPos) {
+ DynamicField df = newSchema.dynamicFields.get(dfPos);
+ if (df.getRegex().equals(fieldNamePattern)) {
+ dynamicField = df;
+ break;
+ }
}
- }
- if (null == dynamicField) {
- String msg = "The dynamic field '" + fieldNamePattern
- + "' is not present in this schema, and so cannot be deleted.";
- throw new SolrException(ErrorCode.BAD_REQUEST, msg);
- }
- for (int i = 0 ; i < newSchema.dynamicCopyFields.length ; ++i) {
- DynamicCopy dynamicCopy = newSchema.dynamicCopyFields[i];
- DynamicField destDynamicBase = dynamicCopy.getDestDynamicBase();
- DynamicField sourceDynamicBase = dynamicCopy.getSourceDynamicBase();
- if ((null != destDynamicBase && fieldNamePattern.equals(destDynamicBase.getRegex()))
- || (null != sourceDynamicBase && fieldNamePattern.equals(sourceDynamicBase.getRegex()))
- || dynamicField.matches(dynamicCopy.getRegex())
- || dynamicField.matches(dynamicCopy.getDestFieldName())) {
- dynamicCopyFieldsToRebuild.add(dynamicCopy);
- newSchema.decrementCopyFieldTargetCount(dynamicCopy.getDestination().getPrototype());
- // don't add this dynamic copy field to newDynamicCopyFields - effectively removing it
+ if (null == dynamicField) {
+ String msg = "The dynamic field '" + fieldNamePattern
+ + "' is not present in this schema, and so cannot be deleted.";
+ throw new SolrException(ErrorCode.BAD_REQUEST, msg);
+ }
+ for (int i = 0; i < newSchema.dynamicCopyFields.length; ++i) {
+ DynamicCopy dynamicCopy = newSchema.dynamicCopyFields[i];
+ DynamicField destDynamicBase = dynamicCopy.getDestDynamicBase();
+ DynamicField sourceDynamicBase = dynamicCopy.getSourceDynamicBase();
+ if ((null != destDynamicBase && fieldNamePattern.equals(destDynamicBase.getRegex()))
+ || (null != sourceDynamicBase && fieldNamePattern.equals(sourceDynamicBase.getRegex()))
+ || dynamicField.matches(dynamicCopy.getRegex())
+ || dynamicField.matches(dynamicCopy.getDestFieldName())) {
+ dynamicCopyFieldsToRebuild.add(dynamicCopy);
+ newSchema.decrementCopyFieldTargetCount(dynamicCopy.getDestination().getPrototype());
+ // don't add this dynamic copy field to newDynamicCopyFields - effectively removing it
+ } else {
+ newDynamicCopyFields.add(dynamicCopy);
+ }
+ }
+ if (newSchema.dynamicFields.size() > 1) {
+ newSchema.dynamicFields.remove(dfPos);
} else {
- newDynamicCopyFields.add(dynamicCopy);
+ newSchema.dynamicFields.clear();
}
}
- if (newSchema.dynamicFields.size() > 1) {
- newSchema.dynamicFields.remove(dfPos);
- } else {
- newSchema.dynamicFields.clear();
- }
- }
- // After removing all dynamic fields, rebuild affected dynamic copy fields.
- // This may trigger an exception, if one of the deleted dynamic fields was the only matching source or target.
- if (dynamicCopyFieldsToRebuild.size() > 0) {
- newSchema.dynamicCopyFields = newDynamicCopyFields.toArray(new DynamicCopy[newDynamicCopyFields.size()]);
- for (DynamicCopy dynamicCopy : dynamicCopyFieldsToRebuild) {
- newSchema.registerCopyField(dynamicCopy.getRegex(), dynamicCopy.getDestFieldName(), dynamicCopy.getMaxChars());
+ // After removing all dynamic fields, rebuild affected dynamic copy fields.
+ // This may trigger an exception, if one of the deleted dynamic fields was the only matching source or target.
+ if (dynamicCopyFieldsToRebuild.size() > 0) {
+ newSchema.dynamicCopyFields = newDynamicCopyFields.toArray(new DynamicCopy[newDynamicCopyFields.size()]);
+ for (DynamicCopy dynamicCopy : dynamicCopyFieldsToRebuild) {
+ newSchema.registerCopyField(dynamicCopy.getRegex(), dynamicCopy.getDestFieldName(), dynamicCopy.getMaxChars());
+ }
}
}
-
newSchema.postReadInform();
newSchema.refreshAnalyzers();
} else {
@@ -723,6 +731,7 @@ public final class ManagedIndexSchema extends IndexSchema {
log.error(msg);
throw new SolrException(ErrorCode.SERVER_ERROR, msg);
}
+
return newSchema;
}
@@ -733,11 +742,13 @@ public final class ManagedIndexSchema extends IndexSchema {
if (isMutable) {
DynamicField oldDynamicField = null;
int dfPos = 0;
- for ( ; dfPos < dynamicFields.size() ; ++dfPos) {
- DynamicField dynamicField = dynamicFields.get(dfPos);
- if (dynamicField.getRegex().equals(fieldNamePattern)) {
- oldDynamicField = dynamicField;
- break;
+ synchronized (dynamicFields) {
+ for (; dfPos < dynamicFields.size(); ++dfPos) {
+ DynamicField dynamicField = dynamicFields.get(dfPos);
+ if (dynamicField.getRegex().equals(fieldNamePattern)) {
+ oldDynamicField = dynamicField;
+ break;
+ }
}
}
if (null == oldDynamicField) {
@@ -748,10 +759,12 @@ public final class ManagedIndexSchema extends IndexSchema {
newSchema = shallowCopy(true);
- Iterator<DynamicField> it = newSchema.dynamicFields.iterator();
- while (it.hasNext()) {
- if (it.next().getRegex().equals(fieldNamePattern)) {
- it.remove();
+ synchronized (dynamicFields) {
+ Iterator<DynamicField> it = newSchema.dynamicFields.iterator();
+ while (it.hasNext()) {
+ if (it.next().getRegex().equals(fieldNamePattern)) {
+ it.remove();
+ }
}
}
@@ -1053,10 +1066,12 @@ public final class ManagedIndexSchema extends IndexSchema {
+ "' because it's the field type of field '" + field.getName() + "'.");
}
}
- for (DynamicField dynamicField : dynamicFields) {
- if (dynamicField.getPrototype().getType().getTypeName().equals(name)) {
- throw new SolrException(ErrorCode.BAD_REQUEST, "Can't delete '" + name
- + "' because it's the field type of dynamic field '" + dynamicField.getRegex() + "'.");
+ synchronized (dynamicFields) {
+ for (DynamicField dynamicField : dynamicFields) {
+ if (dynamicField.getPrototype().getType().getTypeName().equals(name)) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Can't delete '" + name
+ + "' because it's the field type of dynamic field '" + dynamicField.getRegex() + "'.");
+ }
}
}
}
@@ -1158,10 +1173,12 @@ public final class ManagedIndexSchema extends IndexSchema {
}
}
// Rebuild dynamic fields of the type being replaced
- for (int i = 0; i < newSchema.dynamicFields.size(); ++i) {
- SchemaField prototype = newSchema.dynamicFields.get(i).getPrototype();
- if (typeName.equals(prototype.getType().getTypeName())) {
- newSchema.dynamicFields.set(i, new DynamicField(SchemaField.create(prototype.getName(), replacementFieldType, prototype.getArgs())));
+ synchronized (dynamicFields) {
+ for (int i = 0; i < newSchema.dynamicFields.size(); ++i) {
+ SchemaField prototype = newSchema.dynamicFields.get(i).getPrototype();
+ if (typeName.equals(prototype.getType().getTypeName())) {
+ newSchema.dynamicFields.set(i, new DynamicField(SchemaField.create(prototype.getName(), replacementFieldType, prototype.getArgs())));
+ }
}
}
// Find dynamic copy fields where the destination field's type is being replaced
@@ -1387,7 +1404,7 @@ public final class ManagedIndexSchema extends IndexSchema {
this.managedSchemaResourceName = managedSchemaResourceName;
this.schemaZkVersion = schemaZkVersion;
this.collection = collection;
- log.info("Copy to new ManagedIndexSchemaFactory with version {}", schemaZkVersion);
+ if (log.isDebugEnabled()) log.debug("Copy to new ManagedIndexSchemaFactory with version {}", schemaZkVersion);
}
/**
diff --git a/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchemaFactory.java b/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchemaFactory.java
index fed467a..9946cc5 100644
--- a/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchemaFactory.java
+++ b/solr/core/src/java/org/apache/solr/schema/ManagedIndexSchemaFactory.java
@@ -450,8 +450,10 @@ public class ManagedIndexSchemaFactory extends IndexSchemaFactory implements Sol
}
public void setSchema(ManagedIndexSchema schema) {
- core.setLatestSchema(schema);
- this.schema = schema;
+ if (schema.getSchemaZkVersion() > this.schema.getSchemaZkVersion()) {
+ this.schema = schema;
+ core.setLatestSchema(schema);
+ }
}
public boolean isMutable() {
diff --git a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
index a82f88f..c942a4a 100644
--- a/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
+++ b/solr/core/src/java/org/apache/solr/schema/ZkIndexSchemaReader.java
@@ -80,7 +80,7 @@ public class ZkIndexSchemaReader implements OnReconnect {
}
});
- updateSchema(schemaWatcher, -1, solrCore);
+ updateSchema();
solrCore.getCoreContainer().getZkController().addOnReconnectListener(this);
}
@@ -122,7 +122,7 @@ public class ZkIndexSchemaReader implements OnReconnect {
}
log.info("A schema change: {}, has occurred - updating schema from ZooKeeper ...", event);
try {
- schemaReader.updateSchema(this, -1, null);
+ schemaReader.updateSchema();
} catch (Exception e) {
log.error("", e);
}
@@ -144,7 +144,7 @@ public class ZkIndexSchemaReader implements OnReconnect {
// }
// package visibility for test purposes
- public IndexSchema updateSchema(Watcher watcher, int version, SolrCore core) throws KeeperException, InterruptedException {
+ public IndexSchema updateSchema() throws KeeperException, InterruptedException {
ManagedIndexSchema newSchema;
ReentrantLock lock = getSchemaUpdateLock();
lock.lock();
@@ -159,11 +159,8 @@ public class ZkIndexSchemaReader implements OnReconnect {
int existsVersion = exists.getVersion();
int v;
- if (core != null) {
- v = ((ManagedIndexSchema) core.getLatestSchema()).getSchemaZkVersion();
- } else {
- v = managedIndexSchemaFactory.getSchema().getSchemaZkVersion();
- }
+
+ v = managedIndexSchemaFactory.getSchema().getSchemaZkVersion();
log.info("Retrieved schema version {} from Zookeeper, existing={} schema={}", existsVersion, v, managedIndexSchemaFactory.getSchema());
@@ -181,9 +178,6 @@ public class ZkIndexSchemaReader implements OnReconnect {
newSchema = new ManagedIndexSchema(managedIndexSchemaFactory, collection, managedIndexSchemaFactory.getConfig(), resourceName, inputSource, managedIndexSchemaFactory.isMutable(), resourceName,
stat.getVersion());
managedIndexSchemaFactory.setSchema(newSchema);
- if (core != null) {
- core.setLatestSchema(newSchema);
- }
long stop = System.nanoTime();
log.info("Finished refreshing schema in {} ms", TimeUnit.MILLISECONDS.convert(stop - start, TimeUnit.NANOSECONDS));
@@ -204,7 +198,7 @@ public class ZkIndexSchemaReader implements OnReconnect {
public void command() {
try {
// force update now as the schema may have changed while our zk session was expired
- updateSchema(schemaWatcher, -1, null);
+ updateSchema();
} catch (Exception exc) {
log.error("Failed to update managed-schema watcher after session expiration due to: {}", exc);
}
diff --git a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
index a877eed..475abe4 100644
--- a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
+++ b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
@@ -363,7 +363,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
if (!locked) {
recoveryWaiting.incrementAndGet();
if (log.isDebugEnabled()) log.debug("Wait for recovery lock");
-
+ cancelRecovery();
while (!recoveryLock.tryLock(250, TimeUnit.MILLISECONDS)) {
if (closed || prepForClose) {
log.warn("Skipping recovery because we are closed");
@@ -486,7 +486,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
// though
// iwLock.writeLock().lock();
if (recoverying) {
- cancelRecovery();
+ cancelRecovery(false, true);
}
try {
closeIndexWriter(closer);
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateCommand.java b/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
index b124271..a91cf25 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
@@ -18,14 +18,13 @@ package org.apache.solr.update;
import org.apache.solr.request.SolrQueryRequest;
-
/** An index update command encapsulated in an object (Command pattern)
*
*
*/
public abstract class UpdateCommand implements Cloneable {
protected SolrQueryRequest req;
- protected long version;
+ protected volatile long version;
protected String route;
protected int flags;
@@ -62,6 +61,7 @@ public abstract class UpdateCommand implements Cloneable {
public long getVersion() {
return version;
}
+
public void setVersion(long version) {
this.version = version;
}
diff --git a/solr/core/src/java/org/apache/solr/update/processor/AddSchemaFieldsUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/AddSchemaFieldsUpdateProcessorFactory.java
index 3053feb..3a26571 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/AddSchemaFieldsUpdateProcessorFactory.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/AddSchemaFieldsUpdateProcessorFactory.java
@@ -509,7 +509,7 @@ public class AddSchemaFieldsUpdateProcessorFactory extends UpdateRequestProcesso
cmd.getReq().updateSchemaToLatest();
} catch (ManagedIndexSchema.SchemaChangedInZkException e) {
try {
- ((ManagedIndexSchema) cmd.getReq().getSchema()).getManagedIndexSchemaFactory().getZkIndexSchemaReader().updateSchema(null, -1, cmd.getReq().getCore());
+ ((ManagedIndexSchema) cmd.getReq().getSchema()).getManagedIndexSchemaFactory().getZkIndexSchemaReader().updateSchema();
cmd.getReq().updateSchemaToLatest();
log.info("Schema changed while processing request ... current latest version {} try={}", ((ManagedIndexSchema) cmd.getReq().getSchema()).getSchemaZkVersion(), cnt);
diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
index 1cf2d3b..abd6edd 100644
--- a/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
@@ -102,13 +102,14 @@ public class LeaderElectionTest extends SolrTestCaseJ4 {
}
@Override
- void runLeaderProcess(ElectionContext context, boolean weAreReplacement, int pauseBeforeStartMs)
+ boolean runLeaderProcess(ElectionContext context, boolean weAreReplacement, int pauseBeforeStartMs)
throws KeeperException, InterruptedException, IOException {
super.runLeaderProcess(context, weAreReplacement, pauseBeforeStartMs);
if (runLeaderDelay > 0) {
log.info("Sleeping for {}ms to simulate leadership takeover delay", runLeaderDelay);
Thread.sleep(runLeaderDelay);
}
+ return true;
}
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/MissingSegmentRecoveryTest.java b/solr/core/src/test/org/apache/solr/cloud/MissingSegmentRecoveryTest.java
index 4e19bf1..c0907f2 100644
--- a/solr/core/src/test/org/apache/solr/cloud/MissingSegmentRecoveryTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/MissingSegmentRecoveryTest.java
@@ -112,7 +112,8 @@ public class MissingSegmentRecoveryTest extends SolrCloudTestCase {
JettySolrRunner jetty = cluster.getReplicaJetty(replica);
jetty.stop();
jetty.start();
-
+
+ cluster.waitForActiveCollection(collection, 1, 2);
QueryResponse resp = cluster.getSolrClient().query(collection, new SolrQuery("*:*"));
assertEquals(10, resp.getResults().getNumFound());
}
diff --git a/solr/core/src/test/org/apache/solr/schema/SchemaWatcherTest.java b/solr/core/src/test/org/apache/solr/schema/SchemaWatcherTest.java
index 5f34c45..321f41b 100644
--- a/solr/core/src/test/org/apache/solr/schema/SchemaWatcherTest.java
+++ b/solr/core/src/test/org/apache/solr/schema/SchemaWatcherTest.java
@@ -44,7 +44,7 @@ public class SchemaWatcherTest {
@Test
public void testProcess() throws Exception {
schemaWatcher.process(new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, "/test"));
- verify(mockSchemaReader).updateSchema(schemaWatcher, -1, null);
+ verify(mockSchemaReader).updateSchema();
}
}
diff --git a/solr/core/src/test/org/apache/solr/schema/TestPointFields.java b/solr/core/src/test/org/apache/solr/schema/TestPointFields.java
index a230461..730f799 100644
--- a/solr/core/src/test/org/apache/solr/schema/TestPointFields.java
+++ b/solr/core/src/test/org/apache/solr/schema/TestPointFields.java
@@ -657,9 +657,11 @@ public class TestPointFields extends SolrTestCaseJ4 {
*/
private static SortedSet<String> dynFieldRegexesForType(final Class<? extends FieldType> clazz) {
SortedSet<String> typesToTest = new TreeSet<>();
- for (DynamicField dynField : h.getCore().getLatestSchema().getDynamicFields()) {
- if (clazz.isInstance(dynField.getPrototype().getType())) {
- typesToTest.add(dynField.getRegex());
+ synchronized (h.getCore().getLatestSchema().getDynamicFields()) {
+ for (DynamicField dynField : h.getCore().getLatestSchema().getDynamicFields()) {
+ if (clazz.isInstance(dynField.getPrototype().getType())) {
+ typesToTest.add(dynField.getRegex());
+ }
}
}
return typesToTest;
@@ -2326,9 +2328,11 @@ public class TestPointFields extends SolrTestCaseJ4 {
}
}
Set<String> typesToTest = new HashSet<>();
- for (DynamicField dynField:h.getCore().getLatestSchema().getDynamicFields()) {
- if (dynField.getPrototype().getType() instanceof PointField) {
- typesToTest.add(dynField.getRegex());
+ synchronized (h.getCore().getLatestSchema().getDynamicFields()) {
+ for (DynamicField dynField : h.getCore().getLatestSchema().getDynamicFields()) {
+ if (dynField.getPrototype().getType() instanceof PointField) {
+ typesToTest.add(dynField.getRegex());
+ }
}
}
assertEquals("Missing types in the test", typesTested, typesToTest);
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 78b860a..ff87426 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -2658,7 +2658,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
if (expectedReplicas == 0 && !exact) {
- log.info("0 replicaes expected and found, return");
+ log.info("0 replicas expected and found, return");
return true;
}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientBuilderTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientBuilderTest.java
index 282b88d..59569a0 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientBuilderTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ConcurrentUpdateSolrClientBuilderTest.java
@@ -50,8 +50,9 @@ public class ConcurrentUpdateSolrClientBuilderTest extends SolrTestCase {
* Test that connection timeout information is passed to the HttpSolrClient that handles non add operations.
*/
@Test(timeout = 10000)
+ @Nightly
public void testSocketTimeoutOnCommit() throws IOException, SolrServerException {
- InetAddress localHost = InetAddress.getLocalHost();
+ InetAddress localHost = InetAddress.getLocalHost(); // this can fail java.net.BindException: Can't assign requested address (Bind failed) (seen on OSX)
try (ServerSocket server = new ServerSocket(0, 1, localHost);
ConcurrentUpdateSolrClient client = new ConcurrentUpdateSolrClient.Builder(
"http://" + localHost.getHostAddress() + ":" + server.getLocalPort() + "/noOneThere")