You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2017/03/14 07:37:59 UTC
[2/2] lucene-solr:master: SOLR-9835: Create another replication mode
for SolrCloud
SOLR-9835: Create another replication mode for SolrCloud
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/7830462d
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/7830462d
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/7830462d
Branch: refs/heads/master
Commit: 7830462d4b7da3acefff6353419e71cde62d5fee
Parents: faeb1fe
Author: Cao Manh Dat <da...@apache.org>
Authored: Tue Mar 14 14:37:47 2017 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Tue Mar 14 14:37:47 2017 +0700
----------------------------------------------------------------------
.../solr/hadoop/TreeMergeOutputFormat.java | 3 +-
.../org/apache/solr/cloud/ElectionContext.java | 20 +-
.../cloud/OverseerCollectionMessageHandler.java | 1 +
.../org/apache/solr/cloud/RecoveryStrategy.java | 29 +-
.../apache/solr/cloud/ReplicateFromLeader.java | 124 ++++++
.../org/apache/solr/cloud/ZkController.java | 31 +-
.../org/apache/solr/core/CoreContainer.java | 7 +
.../org/apache/solr/handler/IndexFetcher.java | 34 +-
.../apache/solr/handler/ReplicationHandler.java | 28 +-
.../solr/handler/admin/CollectionsHandler.java | 4 +-
.../org/apache/solr/update/CommitTracker.java | 5 +
.../solr/update/DirectUpdateHandler2.java | 31 +-
.../apache/solr/update/HdfsTransactionLog.java | 50 +++
.../apache/solr/update/SolrIndexSplitter.java | 3 +-
.../org/apache/solr/update/SolrIndexWriter.java | 6 +-
.../org/apache/solr/update/TransactionLog.java | 50 +++
.../org/apache/solr/update/UpdateCommand.java | 1 +
.../java/org/apache/solr/update/UpdateLog.java | 171 +++++++-
.../processor/DistributedUpdateProcessor.java | 38 +-
.../org/apache/solr/util/TestInjection.java | 54 +++
.../conf/schema.xml | 31 ++
.../conf/solrconfig.xml | 48 ++
.../solr/cloud/BasicDistributedZk2Test.java | 6 +
.../solr/cloud/BasicDistributedZkTest.java | 9 +-
.../cloud/ChaosMonkeyNothingIsSafeTest.java | 7 +
.../org/apache/solr/cloud/ForceLeaderTest.java | 6 +
.../apache/solr/cloud/HttpPartitionTest.java | 7 +
.../LeaderInitiatedRecoveryOnCommitTest.java | 7 +
.../solr/cloud/OnlyLeaderIndexesTest.java | 435 +++++++++++++++++++
.../solr/cloud/RecoveryAfterSoftCommitTest.java | 7 +-
.../org/apache/solr/cloud/ShardSplitTest.java | 6 +
.../apache/solr/cloud/TestCloudRecovery.java | 16 +-
.../apache/solr/cloud/TestCollectionAPI.java | 6 +-
.../cloud/hdfs/HdfsBasicDistributedZkTest.java | 7 +-
.../solr/update/TestInPlaceUpdatesDistrib.java | 23 +-
.../solrj/request/CollectionAdminRequest.java | 6 +
.../apache/solr/common/cloud/DocCollection.java | 12 +
.../apache/solr/common/cloud/ZkStateReader.java | 1 +
.../cloud/AbstractFullDistribZkTestBase.java | 14 +-
39 files changed, 1309 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java b/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java
index e3487ad..cac57c3 100644
--- a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java
+++ b/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java
@@ -163,7 +163,8 @@ public class TreeMergeOutputFormat extends FileOutputFormat<Text, NullWritable>
// Set Solr's commit data so the created index is usable by SolrCloud. E.g. Currently SolrCloud relies on
// commitTimeMSec in the commit data to do replication.
- SolrIndexWriter.setCommitData(writer);
+ //TODO no commitUpdateCommand
+ SolrIndexWriter.setCommitData(writer, -1);
timer = new RTimer();
LOG.info("Optimizing Solr: Closing index writer");
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
index d3ad322..223a539 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.Path;
@@ -420,7 +421,24 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
try {
// we must check LIR before registering as leader
checkLIR(coreName, allReplicasInLine);
-
+
+ boolean onlyLeaderIndexes = zkController.getClusterState().getCollection(collection).getRealtimeReplicas() == 1;
+ if (onlyLeaderIndexes) {
+ // stop replicate from old leader
+ zkController.stopReplicationFromLeader(coreName);
+ if (weAreReplacement) {
+ try (SolrCore core = cc.getCore(coreName)) {
+ Future<UpdateLog.RecoveryInfo> future = core.getUpdateHandler().getUpdateLog().recoverFromCurrentLog();
+ if (future != null) {
+ log.info("Replaying tlog before become new leader");
+ future.get();
+ } else {
+ log.info("New leader does not have old tlog to replay");
+ }
+ }
+ }
+ }
+
super.runLeaderProcess(weAreReplacement, 0);
try (SolrCore core = cc.getCore(coreName)) {
if (core != null) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
index 00eb12d..4d64a00 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
@@ -131,6 +131,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
ZkStateReader.REPLICATION_FACTOR, "1",
ZkStateReader.MAX_SHARDS_PER_NODE, "1",
ZkStateReader.AUTO_ADD_REPLICAS, "false",
+ ZkStateReader.REALTIME_REPLICAS, "-1",
DocCollection.RULE, null,
SNITCH, null));
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 8865c08..cb6c69c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -118,7 +118,8 @@ public class RecoveryStrategy extends Thread implements Closeable {
private boolean recoveringAfterStartup;
private CoreContainer cc;
private volatile HttpUriRequest prevSendPreRecoveryHttpUriRequest;
-
+ private boolean onlyLeaderIndexes;
+
protected RecoveryStrategy(CoreContainer cc, CoreDescriptor cd, RecoveryListener recoveryListener) {
this.cc = cc;
this.coreName = cd.getName();
@@ -128,6 +129,8 @@ public class RecoveryStrategy extends Thread implements Closeable {
zkStateReader = zkController.getZkStateReader();
baseUrl = zkController.getBaseUrl();
coreZkNodeName = cd.getCloudDescriptor().getCoreNodeName();
+ String collection = cd.getCloudDescriptor().getCollectionName();
+ onlyLeaderIndexes = zkStateReader.getClusterState().getCollection(collection).getRealtimeReplicas() == 1;
}
final public int getWaitForUpdatesWithStaleStatePauseMilliSeconds() {
@@ -260,7 +263,7 @@ public class RecoveryStrategy extends Thread implements Closeable {
UpdateRequest ureq = new UpdateRequest();
ureq.setParams(new ModifiableSolrParams());
ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
- ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
+ ureq.getParams().set(UpdateParams.OPEN_SEARCHER, onlyLeaderIndexes);
ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(
client);
}
@@ -309,7 +312,8 @@ public class RecoveryStrategy extends Thread implements Closeable {
return;
}
- boolean firstTime = true;
+ // we temporary ignore peersync for realtimeReplicas mode
+ boolean firstTime = !onlyLeaderIndexes;
List<Long> recentVersions;
try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
@@ -361,6 +365,10 @@ public class RecoveryStrategy extends Thread implements Closeable {
}
}
+ if (onlyLeaderIndexes) {
+ zkController.stopReplicationFromLeader(coreName);
+ }
+
Future<RecoveryInfo> replayFuture = null;
while (!successfulRecovery && !isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
try {
@@ -514,6 +522,9 @@ public class RecoveryStrategy extends Thread implements Closeable {
if (successfulRecovery) {
LOG.info("Registering as Active after recovery.");
try {
+ if (onlyLeaderIndexes) {
+ zkController.startReplicationFromLeader(coreName);
+ }
zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
} catch (Exception e) {
LOG.error("Could not publish as ACTIVE after succesful recovery", e);
@@ -587,8 +598,20 @@ public class RecoveryStrategy extends Thread implements Closeable {
LOG.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery));
}
+ public static Runnable testing_beforeReplayBufferingUpdates;
+
final private Future<RecoveryInfo> replay(SolrCore core)
throws InterruptedException, ExecutionException {
+ if (testing_beforeReplayBufferingUpdates != null) {
+ testing_beforeReplayBufferingUpdates.run();
+ }
+ if (onlyLeaderIndexes) {
+ // roll over all updates during buffering to new tlog, make RTG available
+ SolrQueryRequest req = new LocalSolrQueryRequest(core,
+ new ModifiableSolrParams());
+ core.getUpdateHandler().getUpdateLog().copyOverBufferingUpdates(new CommitUpdateCommand(req, false));
+ return null;
+ }
Future<RecoveryInfo> future = core.getUpdateHandler().getUpdateLog().applyBufferedUpdates();
if (future == null) {
// no replay needed\
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
new file mode 100644
index 0000000..d7fded9
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+
+import org.apache.lucene.index.IndexCommit;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrConfig;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.ReplicationHandler;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.update.CommitUpdateCommand;
+import org.apache.solr.update.SolrIndexWriter;
+import org.apache.solr.update.UpdateLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplicateFromLeader {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private CoreContainer cc;
+ private String coreName;
+
+ private ReplicationHandler replicationProcess;
+ private long lastVersion = 0;
+
+ public ReplicateFromLeader(CoreContainer cc, String coreName) {
+ this.cc = cc;
+ this.coreName = coreName;
+ }
+
+ public void startReplication() throws InterruptedException {
+ try (SolrCore core = cc.getCore(coreName)) {
+ if (core == null) {
+ if (cc.isShutDown()) {
+ return;
+ } else {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "SolrCore not found:" + coreName + " in " + cc.getCoreNames());
+ }
+ }
+ SolrConfig.UpdateHandlerInfo uinfo = core.getSolrConfig().getUpdateHandlerInfo();
+ String pollIntervalStr = "00:00:03";
+ if (uinfo.autoCommmitMaxTime != -1) {
+ pollIntervalStr = toPollIntervalStr(uinfo.autoCommmitMaxTime/2);
+ } else if (uinfo.autoSoftCommmitMaxTime != -1) {
+ pollIntervalStr = toPollIntervalStr(uinfo.autoSoftCommmitMaxTime/2);
+ }
+
+ NamedList slaveConfig = new NamedList();
+ slaveConfig.add("fetchFromLeader", true);
+ slaveConfig.add("pollInterval", pollIntervalStr);
+ NamedList replicationConfig = new NamedList();
+ replicationConfig.add("slave", slaveConfig);
+
+ String lastCommitVersion = getCommitVersion(core);
+ if (lastCommitVersion != null) {
+ lastVersion = Long.parseLong(lastCommitVersion);
+ }
+
+ replicationProcess = new ReplicationHandler();
+ replicationProcess.setPollListener((solrCore, pollSuccess) -> {
+ if (pollSuccess) {
+ String commitVersion = getCommitVersion(core);
+ if (commitVersion == null) return;
+ if (Long.parseLong(commitVersion) == lastVersion) return;
+ UpdateLog updateLog = solrCore.getUpdateHandler().getUpdateLog();
+ SolrQueryRequest req = new LocalSolrQueryRequest(core,
+ new ModifiableSolrParams());
+ CommitUpdateCommand cuc = new CommitUpdateCommand(req, false);
+ cuc.setVersion(Long.parseLong(commitVersion));
+ updateLog.copyOverOldUpdates(cuc);
+ lastVersion = Long.parseLong(commitVersion);
+ }
+ });
+ replicationProcess.init(replicationConfig);
+ replicationProcess.inform(core);
+ }
+ }
+
+ public static String getCommitVersion(SolrCore solrCore) {
+ IndexCommit commit = solrCore.getDeletionPolicy().getLatestCommit();
+ try {
+ String commitVersion = commit.getUserData().get(SolrIndexWriter.COMMIT_COMMAND_VERSION);
+ if (commitVersion == null) return null;
+ else return commitVersion;
+ } catch (Exception e) {
+ LOG.warn("Cannot get commit command version from index commit point ",e);
+ return null;
+ }
+ }
+
+ private static String toPollIntervalStr(int ms) {
+ int sec = ms/1000;
+ int hour = sec / 3600;
+ sec = sec % 3600;
+ int min = sec / 60;
+ sec = sec % 60;
+ return hour + ":" + min + ":" + sec;
+ }
+
+ public void stopReplication() {
+ replicationProcess.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 333acd4..a19b351 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -189,6 +189,7 @@ public class ZkController {
private LeaderElector overseerElector;
+ private Map<String, ReplicateFromLeader> replicateFromLeaders = new ConcurrentHashMap<>();
// for now, this can be null in tests, in which case recovery will be inactive, and other features
// may accept defaults or use mocks rather than pulling things from a CoreContainer
@@ -877,7 +878,7 @@ public class ZkController {
coreName, baseUrl, cloudDesc.getCollectionName(), shardId);
ZkNodeProps leaderProps = new ZkNodeProps(props);
-
+
try {
// If we're a preferred leader, insert ourselves at the head of the queue
boolean joinAtHead = false;
@@ -913,9 +914,16 @@ public class ZkController {
// leader election perhaps?
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
-
+ boolean onlyLeaderIndexes = zkStateReader.getClusterState().getCollection(collection).getRealtimeReplicas() == 1;
+ boolean isReplicaInOnlyLeaderIndexes = onlyLeaderIndexes && !isLeader;
+ if (isReplicaInOnlyLeaderIndexes) {
+ String commitVersion = ReplicateFromLeader.getCommitVersion(core);
+ if (commitVersion != null) {
+ ulog.copyOverOldUpdates(Long.parseLong(commitVersion));
+ }
+ }
// we will call register again after zk expiration and on reload
- if (!afterExpiration && !core.isReloaded() && ulog != null) {
+ if (!afterExpiration && !core.isReloaded() && ulog != null && !isReplicaInOnlyLeaderIndexes) {
// disable recovery in case shard is in construction state (for shard splits)
Slice slice = getClusterState().getSlice(collection, shardId);
if (slice.getState() != Slice.State.CONSTRUCTION || !isLeader) {
@@ -934,6 +942,9 @@ public class ZkController {
boolean didRecovery
= checkRecovery(recoverReloadedCores, isLeader, skipRecovery, collection, coreZkNodeName, core, cc, afterExpiration);
if (!didRecovery) {
+ if (isReplicaInOnlyLeaderIndexes) {
+ startReplicationFromLeader(coreName);
+ }
publish(desc, Replica.State.ACTIVE);
}
@@ -948,6 +959,20 @@ public class ZkController {
}
}
+ public void startReplicationFromLeader(String coreName) throws InterruptedException {
+ ReplicateFromLeader replicateFromLeader = new ReplicateFromLeader(cc, coreName);
+ if (replicateFromLeaders.putIfAbsent(coreName, replicateFromLeader) == null) {
+ replicateFromLeader.startReplication();
+ }
+ }
+
+ public void stopReplicationFromLeader(String coreName) {
+ ReplicateFromLeader replicateFromLeader = replicateFromLeaders.remove(coreName);
+ if (replicateFromLeader != null) {
+ replicateFromLeader.stopReplication();
+ }
+ }
+
// timeoutms is the timeout for the first call to get the leader - there is then
// a longer wait to make sure that leader matches our local state
private String getLeader(final CloudDescriptor cloudDesc, int timeoutms) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/core/CoreContainer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index b9597ae..0de671e 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -1137,6 +1137,13 @@ public class CoreContainer {
log.info("Reloading SolrCore '{}' using configuration from {}", cd.getName(), coreConfig.getName());
SolrCore newCore = core.reload(coreConfig);
registerCore(cd.getName(), newCore, false, false);
+ if (getZkController() != null) {
+ boolean onlyLeaderIndexes = getZkController().getClusterState().getCollection(cd.getCollectionName()).getRealtimeReplicas() == 1;
+ if (onlyLeaderIndexes && !cd.getCloudDescriptor().isLeader()) {
+ getZkController().stopReplicationFromLeader(core.getName());
+ getZkController().startReplicationFromLeader(newCore.getName());
+ }
+ }
} catch (SolrCoreState.CoreIsClosedException e) {
throw e;
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index 8634aee..a07496f 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -68,8 +68,11 @@ import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder;
import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
@@ -115,7 +118,7 @@ public class IndexFetcher {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final String masterUrl;
+ private String masterUrl;
final ReplicationHandler replicationHandler;
@@ -150,6 +153,8 @@ public class IndexFetcher {
private boolean useExternalCompression = false;
+ private boolean fetchFromLeader = false;
+
private final HttpClient myHttpClient;
private Integer connTimeout;
@@ -167,11 +172,15 @@ public class IndexFetcher {
public IndexFetcher(final NamedList initArgs, final ReplicationHandler handler, final SolrCore sc) {
solrCore = sc;
+ Object fetchFromLeader = initArgs.get(FETCH_FROM_LEADER);
+ if (fetchFromLeader != null && fetchFromLeader instanceof Boolean) {
+ this.fetchFromLeader = (boolean) fetchFromLeader;
+ }
String masterUrl = (String) initArgs.get(MASTER_URL);
- if (masterUrl == null)
+ if (masterUrl == null && !this.fetchFromLeader)
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"'masterUrl' is required for a slave");
- if (masterUrl.endsWith(ReplicationHandler.PATH)) {
+ if (masterUrl != null && masterUrl.endsWith(ReplicationHandler.PATH)) {
masterUrl = masterUrl.substring(0, masterUrl.length()-12);
LOG.warn("'masterUrl' must be specified without the "+ReplicationHandler.PATH+" suffix");
}
@@ -298,6 +307,15 @@ public class IndexFetcher {
}
try {
+ if (fetchFromLeader) {
+ Replica replica = getLeaderReplica();
+ CloudDescriptor cd = solrCore.getCoreDescriptor().getCloudDescriptor();
+ if (cd.getCoreNodeName().equals(replica.getName())) {
+ return false;
+ }
+ masterUrl = replica.getCoreUrl();
+ LOG.info("Updated masterUrl to " + masterUrl);
+ }
//get the current 'replicateable' index version in the master
NamedList response;
try {
@@ -404,7 +422,7 @@ public class IndexFetcher {
isFullCopyNeeded = true;
}
- if (!isFullCopyNeeded) {
+ if (!isFullCopyNeeded && !fetchFromLeader) {
// a searcher might be using some flushed but not committed segments
// because of soft commits (which open a searcher on IW's data)
// so we need to close the existing searcher on the last commit
@@ -565,6 +583,14 @@ public class IndexFetcher {
}
}
+ private Replica getLeaderReplica() throws InterruptedException {
+ ZkController zkController = solrCore.getCoreDescriptor().getCoreContainer().getZkController();
+ CloudDescriptor cd = solrCore.getCoreDescriptor().getCloudDescriptor();
+ Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
+ cd.getCollectionName(), cd.getShardId());
+ return leaderReplica;
+ }
+
private void cleanup(final SolrCore core, Directory tmpIndexDir,
Directory indexDir, boolean deleteTmpIdxDir, File tmpTlogDir, boolean successfulInstall) throws IOException {
try {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
index cdbadc4..e40b2c3 100644
--- a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
@@ -209,6 +209,11 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
private Long pollIntervalNs;
private String pollIntervalStr;
+ private PollListener pollListener;
+ public interface PollListener {
+ void onComplete(SolrCore solrCore, boolean pollSuccess) throws IOException;
+ }
+
/**
* Disable the timer task for polling
*/
@@ -218,6 +223,10 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
return pollIntervalStr;
}
+ public void setPollListener(PollListener pollListener) {
+ this.pollListener = pollListener;
+ }
+
@Override
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
rsp.setHttpCaching(false);
@@ -1142,7 +1151,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
try {
LOG.debug("Polling for index modifications");
markScheduledExecutionStart();
- doFetch(null, false);
+ boolean pollSuccess = doFetch(null, false);
+ if (pollListener != null) pollListener.onComplete(core, pollSuccess);
} catch (Exception e) {
LOG.error("Exception in fetching index", e);
}
@@ -1328,6 +1338,20 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
});
}
+ public void close() {
+ if (executorService != null) executorService.shutdown();
+ if (pollingIndexFetcher != null) {
+ pollingIndexFetcher.destroy();
+ }
+ if (currentIndexFetcher != null && currentIndexFetcher != pollingIndexFetcher) {
+ currentIndexFetcher.destroy();
+ }
+ ExecutorUtil.shutdownAndAwaitTermination(restoreExecutor);
+ if (restoreFuture != null) {
+ restoreFuture.cancel(false);
+ }
+ }
+
/**
* Register a listener for postcommit/optimize
*
@@ -1680,6 +1704,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
public static final String MASTER_URL = "masterUrl";
+ public static final String FETCH_FROM_LEADER = "fetchFromLeader";
+
public static final String STATUS = "status";
public static final String COMMAND = "command";
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index d7759ca..2e17af6 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -115,6 +115,7 @@ import static org.apache.solr.common.cloud.DocCollection.STATE_FORMAT;
import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
+import static org.apache.solr.common.cloud.ZkStateReader.REALTIME_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
@@ -404,7 +405,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
STATE_FORMAT,
AUTO_ADD_REPLICAS,
RULE,
- SNITCH);
+ SNITCH,
+ REALTIME_REPLICAS);
if (props.get(STATE_FORMAT) == null) {
props.put(STATE_FORMAT, "2");
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/update/CommitTracker.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/CommitTracker.java b/solr/core/src/java/org/apache/solr/update/CommitTracker.java
index 61f0c35..9c09ebe 100644
--- a/solr/core/src/java/org/apache/solr/update/CommitTracker.java
+++ b/solr/core/src/java/org/apache/solr/update/CommitTracker.java
@@ -207,6 +207,11 @@ public final class CommitTracker implements Runnable {
command.openSearcher = openSearcher;
command.waitSearcher = waitSearcher;
command.softCommit = softCommit;
+ if (core.getCoreDescriptor().getCloudDescriptor() != null
+ && core.getCoreDescriptor().getCloudDescriptor().isLeader()
+ && !softCommit) {
+ command.version = core.getUpdateHandler().getUpdateLog().getVersionInfo().getNewClock();
+ }
// no need for command.maxOptimizeSegments = 1; since it is not optimizing
// we increment this *before* calling commit because it was causing a race
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
index 4592bcf..abb5512 100644
--- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
+++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
@@ -45,7 +45,9 @@ import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.util.BytesRefHash;
+import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
@@ -123,6 +125,14 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
commitWithinSoftCommit = updateHandlerInfo.commitWithinSoftCommit;
indexWriterCloseWaitsForMerges = updateHandlerInfo.indexWriterCloseWaitsForMerges;
+ ZkController zkController = core.getCoreDescriptor().getCoreContainer().getZkController();
+ if (zkController != null) {
+ DocCollection dc = zkController.getClusterState().getCollection(core.getCoreDescriptor().getCollectionName());
+ if (dc.getRealtimeReplicas() == 1) {
+ commitWithinSoftCommit = false;
+ commitTracker.setOpenSearcher(true);
+ }
+ }
}
@@ -233,6 +243,11 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
cmd.overwrite = false;
}
try {
+ if ( (cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0) {
+ if (ulog != null) ulog.add(cmd);
+ return 1;
+ }
+
if (cmd.overwrite) {
// Check for delete by query commands newer (i.e. reordered). This
// should always be null on a leader
@@ -404,6 +419,11 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
deleteByIdCommands.increment();
deleteByIdCommandsCumulative.mark();
+ if ( (cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0 ) {
+ if (ulog != null) ulog.delete(cmd);
+ return;
+ }
+
Term deleteTerm = new Term(idField.getName(), cmd.getIndexedId());
// SolrCore.verbose("deleteDocuments",deleteTerm,writer);
RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
@@ -463,6 +483,11 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
deleteByQueryCommandsCumulative.mark();
boolean madeIt=false;
try {
+ if ( (cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0) {
+ if (ulog != null) ulog.deleteByQuery(cmd);
+ madeIt = true;
+ return;
+ }
Query q = getQuery(cmd);
boolean delAll = MatchAllDocsQuery.class == q.getClass();
@@ -563,7 +588,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
log.info("start "+cmd);
RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
try {
- SolrIndexWriter.setCommitData(iw.get());
+ SolrIndexWriter.setCommitData(iw.get(), cmd.getVersion());
iw.get().prepareCommit();
} finally {
iw.decref();
@@ -647,7 +672,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
// SolrCore.verbose("writer.commit() start writer=",writer);
if (writer.hasUncommittedChanges()) {
- SolrIndexWriter.setCommitData(writer);
+ SolrIndexWriter.setCommitData(writer, cmd.getVersion());
writer.commit();
} else {
log.info("No uncommitted changes. Skipping IW.commit.");
@@ -838,7 +863,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
}
// todo: refactor this shared code (or figure out why a real CommitUpdateCommand can't be used)
- SolrIndexWriter.setCommitData(writer);
+ SolrIndexWriter.setCommitData(writer, cmd.getVersion());
writer.commit();
synchronized (solrCoreState.getUpdateLock()) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
index 90f6856..c478935 100644
--- a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
+++ b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
@@ -20,8 +20,10 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -368,6 +370,10 @@ public class HdfsTransactionLog extends TransactionLog {
return new HDFSLogReader(startingPos);
}
+ public LogReader getSortedReader(long startingPos) {
+ return new HDFSSortedLogReader(startingPos);
+ }
+
/** Returns a single threaded reverse reader */
@Override
public ReverseReader getReverseReader() throws IOException {
@@ -477,6 +483,50 @@ public class HdfsTransactionLog extends TransactionLog {
}
+ public class HDFSSortedLogReader extends HDFSLogReader{
+ private long startingPos;
+ private boolean inOrder = true;
+ private TreeMap<Long, Long> versionToPos;
+ Iterator<Long> iterator;
+
+ public HDFSSortedLogReader(long startingPos) {
+ super(startingPos);
+ this.startingPos = startingPos;
+ }
+
+ @Override
+ public Object next() throws IOException, InterruptedException {
+ if (versionToPos == null) {
+ versionToPos = new TreeMap<>();
+ Object o;
+ long pos = startingPos;
+
+ long lastVersion = Long.MIN_VALUE;
+ while ( (o = super.next()) != null) {
+ List entry = (List) o;
+ long version = (Long) entry.get(UpdateLog.VERSION_IDX);
+ version = Math.abs(version);
+ versionToPos.put(version, pos);
+ pos = currentPos();
+
+ if (version < lastVersion) inOrder = false;
+ lastVersion = version;
+ }
+ fis.seek(startingPos);
+ }
+
+ if (inOrder) {
+ return super.next();
+ } else {
+ if (iterator == null) iterator = versionToPos.values().iterator();
+ if (!iterator.hasNext()) return null;
+ long pos = iterator.next();
+ if (pos != currentPos()) fis.seek(pos);
+ return super.next();
+ }
+ }
+ }
+
public class HDFSReverseReader extends ReverseReader {
FSDataFastInputStream fis;
private LogCodec codec = new LogCodec(resolver) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java b/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
index a147b0f..e9950f2 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
@@ -137,7 +137,8 @@ public class SolrIndexSplitter {
// we commit explicitly instead of sending a CommitUpdateCommand through the processor chain
// because the sub-shard cores will just ignore such a commit because the update log is not
// in active state at this time.
- SolrIndexWriter.setCommitData(iw);
+ //TODO no commitUpdateCommand
+ SolrIndexWriter.setCommitData(iw, -1);
iw.commit();
success = true;
} finally {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java b/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java
index 3c0c1a5..6a264f8 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java
@@ -61,6 +61,7 @@ public class SolrIndexWriter extends IndexWriter {
/** Stored into each Lucene commit to record the
* System.currentTimeMillis() when commit was called. */
public static final String COMMIT_TIME_MSEC_KEY = "commitTimeMSec";
+ public static final String COMMIT_COMMAND_VERSION = "commitCommandVer";
private final Object CLOSE_LOCK = new Object();
@@ -183,10 +184,11 @@ public class SolrIndexWriter extends IndexWriter {
@SuppressForbidden(reason = "Need currentTimeMillis, commit time should be used only for debugging purposes, " +
" but currently suspiciously used for replication as well")
- public static void setCommitData(IndexWriter iw) {
- log.info("Calling setCommitData with IW:" + iw.toString());
+ public static void setCommitData(IndexWriter iw, long commitCommandVersion) {
+ log.info("Calling setCommitData with IW:" + iw.toString() + " commitCommandVersion:"+commitCommandVersion);
final Map<String,String> commitData = new HashMap<>();
commitData.put(COMMIT_TIME_MSEC_KEY, String.valueOf(System.currentTimeMillis()));
+ commitData.put(COMMIT_COMMAND_VERSION, String.valueOf(commitCommandVersion));
iw.setLiveCommitData(commitData.entrySet());
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/update/TransactionLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/TransactionLog.java b/solr/core/src/java/org/apache/solr/update/TransactionLog.java
index 5037b45..73328cf 100644
--- a/solr/core/src/java/org/apache/solr/update/TransactionLog.java
+++ b/solr/core/src/java/org/apache/solr/update/TransactionLog.java
@@ -29,9 +29,11 @@ import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.util.BytesRef;
@@ -632,6 +634,10 @@ public class TransactionLog implements Closeable {
return new LogReader(startingPos);
}
+ public LogReader getSortedReader(long startingPos) {
+ return new SortedLogReader(startingPos);
+ }
+
/** Returns a single threaded reverse reader */
public ReverseReader getReverseReader() throws IOException {
return new FSReverseReader();
@@ -715,6 +721,50 @@ public class TransactionLog implements Closeable {
}
+ public class SortedLogReader extends LogReader {
+ private long startingPos;
+ private boolean inOrder = true;
+ private TreeMap<Long, Long> versionToPos;
+ Iterator<Long> iterator;
+
+ public SortedLogReader(long startingPos) {
+ super(startingPos);
+ this.startingPos = startingPos;
+ }
+
+ @Override
+ public Object next() throws IOException, InterruptedException {
+ if (versionToPos == null) {
+ versionToPos = new TreeMap<>();
+ Object o;
+ long pos = startingPos;
+
+ long lastVersion = Long.MIN_VALUE;
+ while ( (o = super.next()) != null) {
+ List entry = (List) o;
+ long version = (Long) entry.get(UpdateLog.VERSION_IDX);
+ version = Math.abs(version);
+ versionToPos.put(version, pos);
+ pos = currentPos();
+
+ if (version < lastVersion) inOrder = false;
+ lastVersion = version;
+ }
+ fis.seek(startingPos);
+ }
+
+ if (inOrder) {
+ return super.next();
+ } else {
+ if (iterator == null) iterator = versionToPos.values().iterator();
+ if (!iterator.hasNext()) return null;
+ long pos = iterator.next();
+ if (pos != currentPos()) fis.seek(pos);
+ return super.next();
+ }
+ }
+ }
+
public abstract class ReverseReader {
/** Returns the next object from the log, or null if none available.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateCommand.java b/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
index 9f01571..b124271 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
@@ -34,6 +34,7 @@ public abstract class UpdateCommand implements Cloneable {
public static int PEER_SYNC = 0x00000004; // update command is a missing update being provided by a peer.
public static int IGNORE_AUTOCOMMIT = 0x00000008; // this update should not count toward triggering of autocommits.
public static int CLEAR_CACHES = 0x00000010; // clear caches associated with the update log. used when applying reordered DBQ updates when doing an add.
+ public static int IGNORE_INDEXWRITER = 0x00000020;
public UpdateCommand(SolrQueryRequest req) {
this.req = req;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/update/UpdateLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index 16eff9c..6a5f407 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -27,6 +27,7 @@ import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -618,7 +619,7 @@ public static final int VERSION_IDX = 1;
}
// only change our caches if we are not buffering
- if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
+ if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0 && (cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) == 0) {
// given that we just did a delete-by-query, we don't know what documents were
// affected and hence we must purge our caches.
openRealtimeSearcher();
@@ -1095,6 +1096,162 @@ public static final int VERSION_IDX = 1;
return cs.submit(replayer, recoveryInfo);
}
+ /**
+ * Replay current tlog, so all updates will be written to index.
+ * This is must do task for a append replica become a new leader.
+ * @return future of this task
+ */
+ public Future<RecoveryInfo> recoverFromCurrentLog() {
+ if (tlog == null) {
+ return null;
+ }
+ map.clear();
+ recoveryInfo = new RecoveryInfo();
+ tlog.incref();
+
+ ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<>(recoveryExecutor);
+ LogReplayer replayer = new LogReplayer(Collections.singletonList(tlog), false, true);
+
+ versionInfo.blockUpdates();
+ try {
+ state = State.REPLAYING;
+ } finally {
+ versionInfo.unblockUpdates();
+ }
+
+ return cs.submit(replayer, recoveryInfo);
+ }
+
+ /**
+ * Block updates, append a commit at current tlog,
+ * then copy over buffer updates to new tlog and bring back ulog to active state.
+ * So any updates which hasn't made it to the index is preserved in the current tlog,
+ * this also make RTG work
+ * @param cuc any updates that have version larger than the version of cuc will be copied over
+ */
+ public void copyOverBufferingUpdates(CommitUpdateCommand cuc) {
+ versionInfo.blockUpdates();
+ try {
+ operationFlags &= ~FLAG_GAP;
+ state = State.ACTIVE;
+ copyAndSwitchToNewTlog(cuc);
+ } finally {
+ versionInfo.unblockUpdates();
+ }
+ }
+
+ /**
+ * Block updates, append a commit at current tlog, then copy over updates to a new tlog.
+ * So any updates which hasn't made it to the index is preserved in the current tlog
+ * @param cuc any updates that have version larger than the version of cuc will be copied over
+ */
+ public void copyOverOldUpdates(CommitUpdateCommand cuc) {
+ versionInfo.blockUpdates();
+ try {
+ copyAndSwitchToNewTlog(cuc);
+ } finally {
+ versionInfo.unblockUpdates();
+ }
+ }
+
+ protected void copyAndSwitchToNewTlog(CommitUpdateCommand cuc) {
+ synchronized (this) {
+ if (tlog == null) return;
+ preCommit(cuc);
+ try {
+ copyOverOldUpdates(cuc.getVersion());
+ } finally {
+ postCommit(cuc);
+ }
+ }
+ }
+
+ /**
+ * Copy over updates from prevTlog or last tlog (in tlog folder) to a new tlog
+ * @param commitVersion any updates that have version larger than the commitVersion will be copied over
+ */
+ public void copyOverOldUpdates(long commitVersion) {
+ TransactionLog oldTlog = prevTlog;
+ if (oldTlog == null && !logs.isEmpty()) {
+ oldTlog = logs.getFirst();
+ }
+ if (oldTlog == null || oldTlog.refcount.get() == 0) {
+ return;
+ }
+
+ try {
+ if (oldTlog.endsWithCommit()) {
+ return;
+ }
+ } catch (IOException e) {
+ log.warn("Exception reading log", e);
+ return;
+ }
+
+ SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core,
+ new ModifiableSolrParams());
+ TransactionLog.LogReader logReader = oldTlog.getReader(0);
+ Object o = null;
+ try {
+ while ( (o = logReader.next()) != null ) {
+ try {
+ List entry = (List)o;
+ int operationAndFlags = (Integer) entry.get(0);
+ int oper = operationAndFlags & OPERATION_MASK;
+ long version = (Long) entry.get(1);
+ if (Math.abs(version) > commitVersion) {
+ switch (oper) {
+ case UpdateLog.UPDATE_INPLACE:
+ case UpdateLog.ADD: {
+ SolrInputDocument sdoc = (SolrInputDocument) entry.get(entry.size() - 1);
+ AddUpdateCommand cmd = new AddUpdateCommand(req);
+ cmd.solrDoc = sdoc;
+ cmd.setVersion(version);
+ cmd.setFlags(UpdateCommand.IGNORE_AUTOCOMMIT);
+ add(cmd);
+ break;
+ }
+ case UpdateLog.DELETE: {
+ byte[] idBytes = (byte[]) entry.get(2);
+ DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+ cmd.setIndexedId(new BytesRef(idBytes));
+ cmd.setVersion(version);
+ cmd.setFlags(UpdateCommand.IGNORE_AUTOCOMMIT);
+ delete(cmd);
+ break;
+ }
+
+ case UpdateLog.DELETE_BY_QUERY: {
+ String query = (String) entry.get(2);
+ DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+ cmd.query = query;
+ cmd.setVersion(version);
+ cmd.setFlags(UpdateCommand.IGNORE_AUTOCOMMIT);
+ deleteByQuery(cmd);
+ break;
+ }
+
+ default:
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
+ }
+ }
+ } catch (ClassCastException e) {
+ log.warn("Unexpected log entry or corrupt log. Entry=" + o, e);
+ }
+ }
+ // Prev tlog will be closed, so nullify prevMap
+ if (prevTlog == oldTlog) {
+ prevMap = null;
+ }
+ } catch (IOException e) {
+ log.error("Exception reading versions from log",e);
+ } catch (InterruptedException e) {
+ log.warn("Exception reading log", e);
+ } finally {
+ if (logReader != null) logReader.close();
+ }
+ }
+
protected void ensureLog() {
if (tlog == null) {
@@ -1482,6 +1639,7 @@ public static final int VERSION_IDX = 1;
boolean activeLog;
boolean finishing = false; // state where we lock out other updates and finish those updates that snuck in before we locked
boolean debug = loglog.isDebugEnabled();
+ boolean inSortedOrder;
public LogReplayer(List<TransactionLog> translogs, boolean activeLog) {
this.translogs = new LinkedList<>();
@@ -1489,6 +1647,11 @@ public static final int VERSION_IDX = 1;
this.activeLog = activeLog;
}
+ public LogReplayer(List<TransactionLog> translogs, boolean activeLog, boolean inSortedOrder) {
+ this(translogs, activeLog);
+ this.inSortedOrder = inSortedOrder;
+ }
+
private SolrQueryRequest req;
@@ -1554,7 +1717,11 @@ public static final int VERSION_IDX = 1;
try {
loglog.warn("Starting log replay " + translog + " active=" + activeLog + " starting pos=" + recoveryInfo.positionOfStart);
long lastStatusTime = System.nanoTime();
- tlogReader = translog.getReader(recoveryInfo.positionOfStart);
+ if (inSortedOrder) {
+ tlogReader = translog.getSortedReader(recoveryInfo.positionOfStart);
+ } else {
+ tlogReader = translog.getReader(recoveryInfo.positionOfStart);
+ }
// NOTE: we don't currently handle a core reload during recovery. This would cause the core
// to change underneath us.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index ec093cf..08ede72 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -279,6 +279,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// this is set to true in the constructor if the next processors in the chain
// are custom and may modify the SolrInputDocument racing with its serialization for replication
private final boolean cloneRequiredOnLeader;
+ private final boolean onlyLeaderIndexes;
public DistributedUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
this(req, rsp, new AtomicUpdateDocumentMerger(req), next);
@@ -324,8 +325,12 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (cloudDesc != null) {
collection = cloudDesc.getCollectionName();
+ ClusterState cstate = zkController.getClusterState();
+ DocCollection coll = cstate.getCollection(collection);
+ onlyLeaderIndexes = coll.getRealtimeReplicas() == 1;
} else {
collection = null;
+ onlyLeaderIndexes = false;
}
boolean shouldClone = false;
@@ -1186,6 +1191,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
checkDeleteByQueries = true;
}
}
+ if (onlyLeaderIndexes && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+ cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
+ }
}
}
@@ -1692,6 +1700,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
return;
}
+ if (onlyLeaderIndexes && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+ cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
+ }
+
doLocalDelete(cmd);
}
}
@@ -1845,6 +1857,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
return true;
}
}
+
+ if (onlyLeaderIndexes && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+ cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
+ }
}
}
@@ -1876,7 +1892,27 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
if (!zkEnabled || req.getParams().getBool(COMMIT_END_POINT, false) || singleLeader) {
- doLocalCommit(cmd);
+ if (onlyLeaderIndexes) {
+ try {
+ Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
+ collection, cloudDesc.getShardId());
+ isLeader = leaderReplica.getName().equals(
+ req.getCore().getCoreDescriptor().getCloudDescriptor()
+ .getCoreNodeName());
+ if (isLeader) {
+ long commitVersion = vinfo.getNewClock();
+ cmd.setVersion(commitVersion);
+ doLocalCommit(cmd);
+ } else {
+ assert TestInjection.waitForInSyncWithLeader(req.getCore(),
+ zkController, collection, cloudDesc.getShardId());
+ }
+ } catch (InterruptedException e) {
+ throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + cloudDesc.getShardId(), e);
+ }
+ } else {
+ doLocalCommit(cmd);
+ }
} else {
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
if (!req.getParams().getBool(COMMIT_END_POINT, false)) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/util/TestInjection.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/util/TestInjection.java b/solr/core/src/java/org/apache/solr/util/TestInjection.java
index 5e4dc75..97291a1 100644
--- a/solr/core/src/java/org/apache/solr/util/TestInjection.java
+++ b/solr/core/src/java/org/apache/solr/util/TestInjection.java
@@ -28,14 +28,28 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.NonExistentCoreException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Pair;
+import org.apache.solr.common.util.SuppressForbidden;
import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.ReplicationHandler;
+import org.apache.solr.update.SolrIndexWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.solr.handler.ReplicationHandler.CMD_DETAILS;
+import static org.apache.solr.handler.ReplicationHandler.COMMAND;
+
/**
* Allows random faults to be injected in running code during test runs.
@@ -118,6 +132,8 @@ public class TestInjection {
public static int randomDelayMaxInCoreCreationInSec = 10;
public static String splitFailureBeforeReplicaCreation = null;
+
+ public static String waitForReplicasInSync = "true:60";
private static Set<Timer> timers = Collections.synchronizedSet(new HashSet<Timer>());
@@ -343,6 +359,44 @@ public class TestInjection {
return true;
}
+
+ @SuppressForbidden(reason = "Need currentTimeMillis, because COMMIT_TIME_MSEC_KEY use currentTimeMillis as value")
+ public static boolean waitForInSyncWithLeader(SolrCore core, ZkController zkController, String collection, String shardId) throws InterruptedException {
+ if (waitForReplicasInSync == null) return true;
+
+ Pair<Boolean,Integer> pair = parseValue(waitForReplicasInSync);
+ boolean enabled = pair.first();
+ if (!enabled) return true;
+ long t = System.currentTimeMillis() - 100;
+ try {
+ for (int i = 0; i < pair.second(); i++) {
+ if (core.isClosed()) return true;
+ Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
+ collection, shardId);
+ try (HttpSolrClient leaderClient = new HttpSolrClient.Builder(leaderReplica.getCoreUrl()).build()) {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CommonParams.QT, ReplicationHandler.PATH);
+ params.set(COMMAND, CMD_DETAILS);
+
+ NamedList<Object> response = leaderClient.request(new QueryRequest(params));
+ long leaderVersion = (long) ((NamedList)response.get("details")).get("indexVersion");
+
+ String localVersion = core.getDeletionPolicy().getLatestCommit().getUserData().get(SolrIndexWriter.COMMIT_TIME_MSEC_KEY);
+ if (localVersion == null && leaderVersion == 0 && !core.getUpdateHandler().getUpdateLog().hasUncommittedChanges()) return true;
+ if (localVersion != null && Long.parseLong(localVersion) == leaderVersion && (leaderVersion >= t || i >= 6)) {
+ return true;
+ } else {
+ Thread.sleep(500);
+ }
+ }
+ }
+
+ } catch (Exception e) {
+ log.error("Exception when wait for replicas in sync with master");
+ }
+
+ return false;
+ }
private static Pair<Boolean,Integer> parseValue(String raw) {
Matcher m = ENABLED_PERCENT.matcher(raw);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/schema.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/schema.xml b/solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/schema.xml
new file mode 100644
index 0000000..31802f9
--- /dev/null
+++ b/solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/schema.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<schema name="minimal" version="1.1">
+
+ <field name="inplace_updatable_int" type="int" indexed="false" stored="false" docValues="true" />
+ <dynamicField name="*" type="string" indexed="true" stored="true"/>
+
+ <!-- for versioning -->
+ <field name="_version_" type="long" indexed="false" stored="false" docValues="true" />
+ <field name="id" type="string" indexed="true" stored="true" docValues="true"/>
+ <uniqueKey>id</uniqueKey>
+
+ <fieldType name="string" class="solr.StrField"/>
+ <fieldType name="int" class="solr.TrieIntField" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
+ <fieldType name="long" class="solr.TrieLongField" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
+</schema>
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/solrconfig.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/solrconfig.xml b/solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/solrconfig.xml
new file mode 100644
index 0000000..8da7d28
--- /dev/null
+++ b/solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/solrconfig.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- Minimal solrconfig.xml with /select, /admin and /update only -->
+
+<config>
+
+ <dataDir>${solr.data.dir:}</dataDir>
+
+ <directoryFactory name="DirectoryFactory"
+ class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
+ <schemaFactory class="ClassicIndexSchemaFactory"/>
+
+ <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+ <updateHandler class="solr.DirectUpdateHandler2">
+ <commitWithin>
+ <softCommit>${solr.commitwithin.softcommit:true}</softCommit>
+ </commitWithin>
+ <updateLog class="${solr.ulog:solr.UpdateLog}"></updateLog>
+ </updateHandler>
+
+ <requestHandler name="/select" class="solr.SearchHandler">
+ <lst name="defaults">
+ <str name="echoParams">explicit</str>
+ <str name="indent">true</str>
+ <str name="df">text</str>
+ </lst>
+
+ </requestHandler>
+</config>
+
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java
index 582c8b4..5eb4b3b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java
+++ b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java
@@ -54,11 +54,17 @@ public class BasicDistributedZk2Test extends AbstractFullDistribZkTestBase {
private static final String SHARD2 = "shard2";
private static final String SHARD1 = "shard1";
private static final String ONE_NODE_COLLECTION = "onenodecollection";
+ private final boolean onlyLeaderIndexes = random().nextBoolean();
public BasicDistributedZk2Test() {
super();
sliceCount = 2;
}
+
+ @Override
+ protected int getRealtimeReplicas() {
+ return onlyLeaderIndexes? 1 : -1;
+ }
@Test
@ShardsFixed(num = 4)
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
index 25c483b..d1dbe9c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
@@ -87,6 +87,8 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String DEFAULT_COLLECTION = "collection1";
+
+ private final boolean onlyLeaderIndexes = random().nextBoolean();
String t1="a_t";
String i1="a_i1";
String tlong = "other_tl1";
@@ -114,7 +116,12 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
pending = new HashSet<>();
}
-
+
+ @Override
+ protected int getRealtimeReplicas() {
+ return onlyLeaderIndexes? 1 : -1;
+ }
+
@Override
protected void setDistributedParams(ModifiableSolrParams params) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
index 4e6122e..628884c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
@@ -55,6 +55,8 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
private static final Integer RUN_LENGTH = Integer.parseInt(System.getProperty("solr.tests.cloud.cm.runlength", "-1"));
+ private final boolean onlyLeaderIndexes = random().nextBoolean();
+
@BeforeClass
public static void beforeSuperClass() {
schemaString = "schema15.xml"; // we need a string id
@@ -109,6 +111,11 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
clientSoTimeout = 5000;
}
+ @Override
+ protected int getRealtimeReplicas() {
+ return onlyLeaderIndexes? 1 : -1;
+ }
+
@Test
public void test() throws Exception {
cloudClient.setSoTimeout(clientSoTimeout);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
index e9e8907..8904ea8 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
@@ -55,6 +55,12 @@ import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
public class ForceLeaderTest extends HttpPartitionTest {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final boolean onlyLeaderIndexes = random().nextBoolean();
+
+ @Override
+ protected int getRealtimeReplicas() {
+ return onlyLeaderIndexes? 1 : -1;
+ }
@Test
@Override
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
index 5ae4c17..01002cf 100644
--- a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
@@ -76,12 +76,19 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
// give plenty of time for replicas to recover when running in slow Jenkins test envs
protected static final int maxWaitSecsToSeeAllActive = 90;
+ private final boolean onlyLeaderIndexes = random().nextBoolean();
+
public HttpPartitionTest() {
super();
sliceCount = 2;
fixShardCount(3);
}
+ @Override
+ protected int getRealtimeReplicas() {
+ return onlyLeaderIndexes? 1 : -1;
+ }
+
/**
* We need to turn off directUpdatesToLeadersOnly due to SOLR-9512
*/
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java
index fd122ad..457b9d9 100644
--- a/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java
@@ -37,6 +37,8 @@ public class LeaderInitiatedRecoveryOnCommitTest extends BasicDistributedZkTest
private static final long sleepMsBeforeHealPartition = 2000L;
+ private final boolean onlyLeaderIndexes = random().nextBoolean();
+
public LeaderInitiatedRecoveryOnCommitTest() {
super();
sliceCount = 1;
@@ -44,6 +46,11 @@ public class LeaderInitiatedRecoveryOnCommitTest extends BasicDistributedZkTest
}
@Override
+ protected int getRealtimeReplicas() {
+ return onlyLeaderIndexes? 1 : -1;
+ }
+
+ @Override
@Test
public void test() throws Exception {
oneShardTest();