You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2017/03/14 07:37:59 UTC

[2/2] lucene-solr:master: SOLR-9835: Create another replication mode for SolrCloud

SOLR-9835: Create another replication mode for SolrCloud


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/7830462d
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/7830462d
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/7830462d

Branch: refs/heads/master
Commit: 7830462d4b7da3acefff6353419e71cde62d5fee
Parents: faeb1fe
Author: Cao Manh Dat <da...@apache.org>
Authored: Tue Mar 14 14:37:47 2017 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Tue Mar 14 14:37:47 2017 +0700

----------------------------------------------------------------------
 .../solr/hadoop/TreeMergeOutputFormat.java      |   3 +-
 .../org/apache/solr/cloud/ElectionContext.java  |  20 +-
 .../cloud/OverseerCollectionMessageHandler.java |   1 +
 .../org/apache/solr/cloud/RecoveryStrategy.java |  29 +-
 .../apache/solr/cloud/ReplicateFromLeader.java  | 124 ++++++
 .../org/apache/solr/cloud/ZkController.java     |  31 +-
 .../org/apache/solr/core/CoreContainer.java     |   7 +
 .../org/apache/solr/handler/IndexFetcher.java   |  34 +-
 .../apache/solr/handler/ReplicationHandler.java |  28 +-
 .../solr/handler/admin/CollectionsHandler.java  |   4 +-
 .../org/apache/solr/update/CommitTracker.java   |   5 +
 .../solr/update/DirectUpdateHandler2.java       |  31 +-
 .../apache/solr/update/HdfsTransactionLog.java  |  50 +++
 .../apache/solr/update/SolrIndexSplitter.java   |   3 +-
 .../org/apache/solr/update/SolrIndexWriter.java |   6 +-
 .../org/apache/solr/update/TransactionLog.java  |  50 +++
 .../org/apache/solr/update/UpdateCommand.java   |   1 +
 .../java/org/apache/solr/update/UpdateLog.java  | 171 +++++++-
 .../processor/DistributedUpdateProcessor.java   |  38 +-
 .../org/apache/solr/util/TestInjection.java     |  54 +++
 .../conf/schema.xml                             |  31 ++
 .../conf/solrconfig.xml                         |  48 ++
 .../solr/cloud/BasicDistributedZk2Test.java     |   6 +
 .../solr/cloud/BasicDistributedZkTest.java      |   9 +-
 .../cloud/ChaosMonkeyNothingIsSafeTest.java     |   7 +
 .../org/apache/solr/cloud/ForceLeaderTest.java  |   6 +
 .../apache/solr/cloud/HttpPartitionTest.java    |   7 +
 .../LeaderInitiatedRecoveryOnCommitTest.java    |   7 +
 .../solr/cloud/OnlyLeaderIndexesTest.java       | 435 +++++++++++++++++++
 .../solr/cloud/RecoveryAfterSoftCommitTest.java |   7 +-
 .../org/apache/solr/cloud/ShardSplitTest.java   |   6 +
 .../apache/solr/cloud/TestCloudRecovery.java    |  16 +-
 .../apache/solr/cloud/TestCollectionAPI.java    |   6 +-
 .../cloud/hdfs/HdfsBasicDistributedZkTest.java  |   7 +-
 .../solr/update/TestInPlaceUpdatesDistrib.java  |  23 +-
 .../solrj/request/CollectionAdminRequest.java   |   6 +
 .../apache/solr/common/cloud/DocCollection.java |  12 +
 .../apache/solr/common/cloud/ZkStateReader.java |   1 +
 .../cloud/AbstractFullDistribZkTestBase.java    |  14 +-
 39 files changed, 1309 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java b/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java
index e3487ad..cac57c3 100644
--- a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java
+++ b/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java
@@ -163,7 +163,8 @@ public class TreeMergeOutputFormat extends FileOutputFormat<Text, NullWritable>
 
         // Set Solr's commit data so the created index is usable by SolrCloud. E.g. Currently SolrCloud relies on
         // commitTimeMSec in the commit data to do replication.
-        SolrIndexWriter.setCommitData(writer);
+        //TODO no commitUpdateCommand
+        SolrIndexWriter.setCommitData(writer, -1);
 
         timer = new RTimer();
         LOG.info("Optimizing Solr: Closing index writer");

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
index d3ad322..223a539 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.fs.Path;
@@ -420,7 +421,24 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
         try {
           // we must check LIR before registering as leader
           checkLIR(coreName, allReplicasInLine);
-          
+
+          boolean onlyLeaderIndexes = zkController.getClusterState().getCollection(collection).getRealtimeReplicas() == 1;
+          if (onlyLeaderIndexes) {
+            // stop replicate from old leader
+            zkController.stopReplicationFromLeader(coreName);
+            if (weAreReplacement) {
+              try (SolrCore core = cc.getCore(coreName)) {
+                Future<UpdateLog.RecoveryInfo> future = core.getUpdateHandler().getUpdateLog().recoverFromCurrentLog();
+                if (future != null) {
+                  log.info("Replaying tlog before become new leader");
+                  future.get();
+                } else {
+                  log.info("New leader does not have old tlog to replay");
+                }
+              }
+            }
+          }
+
           super.runLeaderProcess(weAreReplacement, 0);
           try (SolrCore core = cc.getCore(coreName)) {
             if (core != null) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
index 00eb12d..4d64a00 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
@@ -131,6 +131,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
       ZkStateReader.REPLICATION_FACTOR, "1",
       ZkStateReader.MAX_SHARDS_PER_NODE, "1",
       ZkStateReader.AUTO_ADD_REPLICAS, "false",
+      ZkStateReader.REALTIME_REPLICAS, "-1",
       DocCollection.RULE, null,
       SNITCH, null));
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 8865c08..cb6c69c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -118,7 +118,8 @@ public class RecoveryStrategy extends Thread implements Closeable {
   private boolean recoveringAfterStartup;
   private CoreContainer cc;
   private volatile HttpUriRequest prevSendPreRecoveryHttpUriRequest;
-  
+  private boolean onlyLeaderIndexes;
+
   protected RecoveryStrategy(CoreContainer cc, CoreDescriptor cd, RecoveryListener recoveryListener) {
     this.cc = cc;
     this.coreName = cd.getName();
@@ -128,6 +129,8 @@ public class RecoveryStrategy extends Thread implements Closeable {
     zkStateReader = zkController.getZkStateReader();
     baseUrl = zkController.getBaseUrl();
     coreZkNodeName = cd.getCloudDescriptor().getCoreNodeName();
+    String collection = cd.getCloudDescriptor().getCollectionName();
+    onlyLeaderIndexes = zkStateReader.getClusterState().getCollection(collection).getRealtimeReplicas() == 1;
   }
 
   final public int getWaitForUpdatesWithStaleStatePauseMilliSeconds() {
@@ -260,7 +263,7 @@ public class RecoveryStrategy extends Thread implements Closeable {
       UpdateRequest ureq = new UpdateRequest();
       ureq.setParams(new ModifiableSolrParams());
       ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
-      ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
+      ureq.getParams().set(UpdateParams.OPEN_SEARCHER, onlyLeaderIndexes);
       ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(
           client);
     }
@@ -309,7 +312,8 @@ public class RecoveryStrategy extends Thread implements Closeable {
       return;
     }
 
-    boolean firstTime = true;
+    // we temporary ignore peersync for realtimeReplicas mode
+    boolean firstTime = !onlyLeaderIndexes;
 
     List<Long> recentVersions;
     try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
@@ -361,6 +365,10 @@ public class RecoveryStrategy extends Thread implements Closeable {
       }
     }
 
+    if (onlyLeaderIndexes) {
+      zkController.stopReplicationFromLeader(coreName);
+    }
+
     Future<RecoveryInfo> replayFuture = null;
     while (!successfulRecovery && !isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
       try {
@@ -514,6 +522,9 @@ public class RecoveryStrategy extends Thread implements Closeable {
         if (successfulRecovery) {
           LOG.info("Registering as Active after recovery.");
           try {
+            if (onlyLeaderIndexes) {
+              zkController.startReplicationFromLeader(coreName);
+            }
             zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
           } catch (Exception e) {
             LOG.error("Could not publish as ACTIVE after succesful recovery", e);
@@ -587,8 +598,20 @@ public class RecoveryStrategy extends Thread implements Closeable {
     LOG.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery));
   }
 
+  public static Runnable testing_beforeReplayBufferingUpdates;
+
   final private Future<RecoveryInfo> replay(SolrCore core)
       throws InterruptedException, ExecutionException {
+    if (testing_beforeReplayBufferingUpdates != null) {
+      testing_beforeReplayBufferingUpdates.run();
+    }
+    if (onlyLeaderIndexes) {
+      // roll over all updates during buffering to new tlog, make RTG available
+      SolrQueryRequest req = new LocalSolrQueryRequest(core,
+          new ModifiableSolrParams());
+      core.getUpdateHandler().getUpdateLog().copyOverBufferingUpdates(new CommitUpdateCommand(req, false));
+      return null;
+    }
     Future<RecoveryInfo> future = core.getUpdateHandler().getUpdateLog().applyBufferedUpdates();
     if (future == null) {
       // no replay needed\

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
new file mode 100644
index 0000000..d7fded9
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+
+import org.apache.lucene.index.IndexCommit;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrConfig;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.ReplicationHandler;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.update.CommitUpdateCommand;
+import org.apache.solr.update.SolrIndexWriter;
+import org.apache.solr.update.UpdateLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplicateFromLeader {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private CoreContainer cc;
+  private String coreName;
+
+  private ReplicationHandler replicationProcess;
+  private long lastVersion = 0;
+
+  public ReplicateFromLeader(CoreContainer cc, String coreName) {
+    this.cc = cc;
+    this.coreName = coreName;
+  }
+
+  public void startReplication() throws InterruptedException {
+    try (SolrCore core = cc.getCore(coreName)) {
+      if (core == null) {
+        if (cc.isShutDown()) {
+          return;
+        } else {
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "SolrCore not found:" + coreName + " in " + cc.getCoreNames());
+        }
+      }
+      SolrConfig.UpdateHandlerInfo uinfo = core.getSolrConfig().getUpdateHandlerInfo();
+      String pollIntervalStr = "00:00:03";
+      if (uinfo.autoCommmitMaxTime != -1) {
+        pollIntervalStr = toPollIntervalStr(uinfo.autoCommmitMaxTime/2);
+      } else if (uinfo.autoSoftCommmitMaxTime != -1) {
+        pollIntervalStr = toPollIntervalStr(uinfo.autoSoftCommmitMaxTime/2);
+      }
+
+      NamedList slaveConfig = new NamedList();
+      slaveConfig.add("fetchFromLeader", true);
+      slaveConfig.add("pollInterval", pollIntervalStr);
+      NamedList replicationConfig = new NamedList();
+      replicationConfig.add("slave", slaveConfig);
+
+      String lastCommitVersion = getCommitVersion(core);
+      if (lastCommitVersion != null) {
+        lastVersion = Long.parseLong(lastCommitVersion);
+      }
+
+      replicationProcess = new ReplicationHandler();
+      replicationProcess.setPollListener((solrCore, pollSuccess) -> {
+        if (pollSuccess) {
+          String commitVersion = getCommitVersion(core);
+          if (commitVersion == null) return;
+          if (Long.parseLong(commitVersion) == lastVersion) return;
+          UpdateLog updateLog = solrCore.getUpdateHandler().getUpdateLog();
+          SolrQueryRequest req = new LocalSolrQueryRequest(core,
+              new ModifiableSolrParams());
+          CommitUpdateCommand cuc = new CommitUpdateCommand(req, false);
+          cuc.setVersion(Long.parseLong(commitVersion));
+          updateLog.copyOverOldUpdates(cuc);
+          lastVersion = Long.parseLong(commitVersion);
+        }
+      });
+      replicationProcess.init(replicationConfig);
+      replicationProcess.inform(core);
+    }
+  }
+
+  public static String getCommitVersion(SolrCore solrCore) {
+    IndexCommit commit = solrCore.getDeletionPolicy().getLatestCommit();
+    try {
+      String commitVersion = commit.getUserData().get(SolrIndexWriter.COMMIT_COMMAND_VERSION);
+      if (commitVersion == null) return null;
+      else return commitVersion;
+    } catch (Exception e) {
+      LOG.warn("Cannot get commit command version from index commit point ",e);
+      return null;
+    }
+  }
+
+  private static String toPollIntervalStr(int ms) {
+    int sec = ms/1000;
+    int hour = sec / 3600;
+    sec = sec % 3600;
+    int min = sec / 60;
+    sec = sec % 60;
+    return hour + ":" + min + ":" + sec;
+  }
+
+  public void stopReplication() {
+    replicationProcess.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 333acd4..a19b351 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -189,6 +189,7 @@ public class ZkController {
 
   private LeaderElector overseerElector;
 
+  private Map<String, ReplicateFromLeader> replicateFromLeaders = new ConcurrentHashMap<>();
 
   // for now, this can be null in tests, in which case recovery will be inactive, and other features
   // may accept defaults or use mocks rather than pulling things from a CoreContainer
@@ -877,7 +878,7 @@ public class ZkController {
           coreName, baseUrl, cloudDesc.getCollectionName(), shardId);
       
       ZkNodeProps leaderProps = new ZkNodeProps(props);
-      
+
       try {
         // If we're a preferred leader, insert ourselves at the head of the queue
         boolean joinAtHead = false;
@@ -913,9 +914,16 @@ public class ZkController {
         // leader election perhaps?
         
         UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
-        
+        boolean onlyLeaderIndexes = zkStateReader.getClusterState().getCollection(collection).getRealtimeReplicas() == 1;
+        boolean isReplicaInOnlyLeaderIndexes = onlyLeaderIndexes && !isLeader;
+        if (isReplicaInOnlyLeaderIndexes) {
+          String commitVersion = ReplicateFromLeader.getCommitVersion(core);
+          if (commitVersion != null) {
+            ulog.copyOverOldUpdates(Long.parseLong(commitVersion));
+          }
+        }
         // we will call register again after zk expiration and on reload
-        if (!afterExpiration && !core.isReloaded() && ulog != null) {
+        if (!afterExpiration && !core.isReloaded() && ulog != null && !isReplicaInOnlyLeaderIndexes) {
           // disable recovery in case shard is in construction state (for shard splits)
           Slice slice = getClusterState().getSlice(collection, shardId);
           if (slice.getState() != Slice.State.CONSTRUCTION || !isLeader) {
@@ -934,6 +942,9 @@ public class ZkController {
         boolean didRecovery
             = checkRecovery(recoverReloadedCores, isLeader, skipRecovery, collection, coreZkNodeName, core, cc, afterExpiration);
         if (!didRecovery) {
+          if (isReplicaInOnlyLeaderIndexes) {
+            startReplicationFromLeader(coreName);
+          }
           publish(desc, Replica.State.ACTIVE);
         }
         
@@ -948,6 +959,20 @@ public class ZkController {
     }
   }
 
+  public void startReplicationFromLeader(String coreName) throws InterruptedException {
+    ReplicateFromLeader replicateFromLeader = new ReplicateFromLeader(cc, coreName);
+    if (replicateFromLeaders.putIfAbsent(coreName, replicateFromLeader) == null) {
+      replicateFromLeader.startReplication();
+    }
+  }
+
+  public void stopReplicationFromLeader(String coreName) {
+    ReplicateFromLeader replicateFromLeader = replicateFromLeaders.remove(coreName);
+    if (replicateFromLeader != null) {
+      replicateFromLeader.stopReplication();
+    }
+  }
+
   // timeoutms is the timeout for the first call to get the leader - there is then
   // a longer wait to make sure that leader matches our local state
   private String getLeader(final CloudDescriptor cloudDesc, int timeoutms) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/core/CoreContainer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index b9597ae..0de671e 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -1137,6 +1137,13 @@ public class CoreContainer {
         log.info("Reloading SolrCore '{}' using configuration from {}", cd.getName(), coreConfig.getName());
         SolrCore newCore = core.reload(coreConfig);
         registerCore(cd.getName(), newCore, false, false);
+        if (getZkController() != null) {
+          boolean onlyLeaderIndexes = getZkController().getClusterState().getCollection(cd.getCollectionName()).getRealtimeReplicas() == 1;
+          if (onlyLeaderIndexes && !cd.getCloudDescriptor().isLeader()) {
+            getZkController().stopReplicationFromLeader(core.getName());
+            getZkController().startReplicationFromLeader(newCore.getName());
+          }
+        }
       } catch (SolrCoreState.CoreIsClosedException e) {
         throw e;
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index 8634aee..a07496f 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -68,8 +68,11 @@ import org.apache.solr.client.solrj.impl.HttpClientUtil;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder;
 import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.ExecutorUtil;
@@ -115,7 +118,7 @@ public class IndexFetcher {
 
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private final String masterUrl;
+  private String masterUrl;
 
   final ReplicationHandler replicationHandler;
 
@@ -150,6 +153,8 @@ public class IndexFetcher {
 
   private boolean useExternalCompression = false;
 
+  private boolean fetchFromLeader = false;
+
   private final HttpClient myHttpClient;
 
   private Integer connTimeout;
@@ -167,11 +172,15 @@ public class IndexFetcher {
 
   public IndexFetcher(final NamedList initArgs, final ReplicationHandler handler, final SolrCore sc) {
     solrCore = sc;
+    Object fetchFromLeader = initArgs.get(FETCH_FROM_LEADER);
+    if (fetchFromLeader != null && fetchFromLeader instanceof Boolean) {
+      this.fetchFromLeader = (boolean) fetchFromLeader;
+    }
     String masterUrl = (String) initArgs.get(MASTER_URL);
-    if (masterUrl == null)
+    if (masterUrl == null && !this.fetchFromLeader)
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
               "'masterUrl' is required for a slave");
-    if (masterUrl.endsWith(ReplicationHandler.PATH)) {
+    if (masterUrl != null && masterUrl.endsWith(ReplicationHandler.PATH)) {
       masterUrl = masterUrl.substring(0, masterUrl.length()-12);
       LOG.warn("'masterUrl' must be specified without the "+ReplicationHandler.PATH+" suffix");
     }
@@ -298,6 +307,15 @@ public class IndexFetcher {
     }
 
     try {
+      if (fetchFromLeader) {
+        Replica replica = getLeaderReplica();
+        CloudDescriptor cd = solrCore.getCoreDescriptor().getCloudDescriptor();
+        if (cd.getCoreNodeName().equals(replica.getName())) {
+          return false;
+        }
+        masterUrl = replica.getCoreUrl();
+        LOG.info("Updated masterUrl to " + masterUrl);
+      }
       //get the current 'replicateable' index version in the master
       NamedList response;
       try {
@@ -404,7 +422,7 @@ public class IndexFetcher {
           isFullCopyNeeded = true;
         }
 
-        if (!isFullCopyNeeded) {
+        if (!isFullCopyNeeded && !fetchFromLeader) {
           // a searcher might be using some flushed but not committed segments
           // because of soft commits (which open a searcher on IW's data)
           // so we need to close the existing searcher on the last commit
@@ -565,6 +583,14 @@ public class IndexFetcher {
     }
   }
 
+  private Replica getLeaderReplica() throws InterruptedException {
+    ZkController zkController = solrCore.getCoreDescriptor().getCoreContainer().getZkController();
+    CloudDescriptor cd = solrCore.getCoreDescriptor().getCloudDescriptor();
+    Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
+        cd.getCollectionName(), cd.getShardId());
+    return leaderReplica;
+  }
+
   private void cleanup(final SolrCore core, Directory tmpIndexDir,
       Directory indexDir, boolean deleteTmpIdxDir, File tmpTlogDir, boolean successfulInstall) throws IOException {
     try {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
index cdbadc4..e40b2c3 100644
--- a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
@@ -209,6 +209,11 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
   private Long pollIntervalNs;
   private String pollIntervalStr;
 
+  private PollListener pollListener;
+  public interface PollListener {
+    void onComplete(SolrCore solrCore, boolean pollSuccess) throws IOException;
+  }
+
   /**
    * Disable the timer task for polling
    */
@@ -218,6 +223,10 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
     return pollIntervalStr;
   }
 
+  public void setPollListener(PollListener pollListener) {
+    this.pollListener = pollListener;
+  }
+
   @Override
   public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
     rsp.setHttpCaching(false);
@@ -1142,7 +1151,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
       try {
         LOG.debug("Polling for index modifications");
         markScheduledExecutionStart();
-        doFetch(null, false);
+        boolean pollSuccess = doFetch(null, false);
+        if (pollListener != null) pollListener.onComplete(core, pollSuccess);
       } catch (Exception e) {
         LOG.error("Exception in fetching index", e);
       }
@@ -1328,6 +1338,20 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
     });
   }
 
+  public void close() {
+    if (executorService != null) executorService.shutdown();
+    if (pollingIndexFetcher != null) {
+      pollingIndexFetcher.destroy();
+    }
+    if (currentIndexFetcher != null && currentIndexFetcher != pollingIndexFetcher) {
+      currentIndexFetcher.destroy();
+    }
+    ExecutorUtil.shutdownAndAwaitTermination(restoreExecutor);
+    if (restoreFuture != null) {
+      restoreFuture.cancel(false);
+    }
+  }
+
   /**
    * Register a listener for postcommit/optimize
    *
@@ -1680,6 +1704,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
 
   public static final String MASTER_URL = "masterUrl";
 
+  public static final String FETCH_FROM_LEADER = "fetchFromLeader";
+
   public static final String STATUS = "status";
 
   public static final String COMMAND = "command";

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index d7759ca..2e17af6 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -115,6 +115,7 @@ import static org.apache.solr.common.cloud.DocCollection.STATE_FORMAT;
 import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS;
 import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
+import static org.apache.solr.common.cloud.ZkStateReader.REALTIME_REPLICAS;
 import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
@@ -404,7 +405,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
           STATE_FORMAT,
           AUTO_ADD_REPLICAS,
           RULE,
-          SNITCH);
+          SNITCH,
+          REALTIME_REPLICAS);
 
       if (props.get(STATE_FORMAT) == null) {
         props.put(STATE_FORMAT, "2");

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/update/CommitTracker.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/CommitTracker.java b/solr/core/src/java/org/apache/solr/update/CommitTracker.java
index 61f0c35..9c09ebe 100644
--- a/solr/core/src/java/org/apache/solr/update/CommitTracker.java
+++ b/solr/core/src/java/org/apache/solr/update/CommitTracker.java
@@ -207,6 +207,11 @@ public final class CommitTracker implements Runnable {
       command.openSearcher = openSearcher;
       command.waitSearcher = waitSearcher;
       command.softCommit = softCommit;
+      if (core.getCoreDescriptor().getCloudDescriptor() != null
+          && core.getCoreDescriptor().getCloudDescriptor().isLeader()
+          && !softCommit) {
+        command.version = core.getUpdateHandler().getUpdateLog().getVersionInfo().getNewClock();
+      }
       // no need for command.maxOptimizeSegments = 1; since it is not optimizing
 
       // we increment this *before* calling commit because it was causing a race

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
index 4592bcf..abb5512 100644
--- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
+++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
@@ -45,7 +45,9 @@ import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.search.TermQuery;
 import org.apache.lucene.util.BytesRefHash;
+import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.SimpleOrderedMap;
@@ -123,6 +125,14 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
     commitWithinSoftCommit = updateHandlerInfo.commitWithinSoftCommit;
     indexWriterCloseWaitsForMerges = updateHandlerInfo.indexWriterCloseWaitsForMerges;
 
+    ZkController zkController = core.getCoreDescriptor().getCoreContainer().getZkController();
+    if (zkController != null) {
+      DocCollection dc = zkController.getClusterState().getCollection(core.getCoreDescriptor().getCollectionName());
+      if (dc.getRealtimeReplicas() == 1) {
+        commitWithinSoftCommit = false;
+        commitTracker.setOpenSearcher(true);
+      }
+    }
 
   }
   
@@ -233,6 +243,11 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
       cmd.overwrite = false;
     }
     try {
+      if ( (cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0) {
+        if (ulog != null) ulog.add(cmd);
+        return 1;
+      }
+
       if (cmd.overwrite) {
         // Check for delete by query commands newer (i.e. reordered). This
         // should always be null on a leader
@@ -404,6 +419,11 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
     deleteByIdCommands.increment();
     deleteByIdCommandsCumulative.mark();
 
+    if ( (cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0 ) {
+      if (ulog != null) ulog.delete(cmd);
+      return;
+    }
+
     Term deleteTerm = new Term(idField.getName(), cmd.getIndexedId());
     // SolrCore.verbose("deleteDocuments",deleteTerm,writer);
     RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
@@ -463,6 +483,11 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
     deleteByQueryCommandsCumulative.mark();
     boolean madeIt=false;
     try {
+      if ( (cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0) {
+        if (ulog != null) ulog.deleteByQuery(cmd);
+        madeIt = true;
+        return;
+      }
       Query q = getQuery(cmd);
       
       boolean delAll = MatchAllDocsQuery.class == q.getClass();
@@ -563,7 +588,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
       log.info("start "+cmd);
       RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
       try {
-        SolrIndexWriter.setCommitData(iw.get());
+        SolrIndexWriter.setCommitData(iw.get(), cmd.getVersion());
         iw.get().prepareCommit();
       } finally {
         iw.decref();
@@ -647,7 +672,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
           // SolrCore.verbose("writer.commit() start writer=",writer);
 
           if (writer.hasUncommittedChanges()) {
-            SolrIndexWriter.setCommitData(writer);
+            SolrIndexWriter.setCommitData(writer, cmd.getVersion());
             writer.commit();
           } else {
             log.info("No uncommitted changes. Skipping IW.commit.");
@@ -838,7 +863,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
           }
 
           // todo: refactor this shared code (or figure out why a real CommitUpdateCommand can't be used)
-          SolrIndexWriter.setCommitData(writer);
+          SolrIndexWriter.setCommitData(writer, cmd.getVersion());
           writer.commit();
 
           synchronized (solrCoreState.getUpdateLock()) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
index 90f6856..c478935 100644
--- a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
+++ b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
@@ -20,8 +20,10 @@ import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -368,6 +370,10 @@ public class HdfsTransactionLog extends TransactionLog {
     return new HDFSLogReader(startingPos);
   }
 
+  public LogReader getSortedReader(long startingPos) {
+    return new HDFSSortedLogReader(startingPos);
+  }
+
   /** Returns a single threaded reverse reader */
   @Override
   public ReverseReader getReverseReader() throws IOException {
@@ -477,6 +483,50 @@ public class HdfsTransactionLog extends TransactionLog {
 
   }
 
+  public class HDFSSortedLogReader extends HDFSLogReader{
+    private long startingPos;
+    private boolean inOrder = true;
+    private TreeMap<Long, Long> versionToPos;
+    Iterator<Long> iterator;
+
+    public HDFSSortedLogReader(long startingPos) {
+      super(startingPos);
+      this.startingPos = startingPos;
+    }
+
+    @Override
+    public Object next() throws IOException, InterruptedException {
+      if (versionToPos == null) {
+        versionToPos = new TreeMap<>();
+        Object o;
+        long pos = startingPos;
+
+        long lastVersion = Long.MIN_VALUE;
+        while ( (o = super.next()) != null) {
+          List entry = (List) o;
+          long version = (Long) entry.get(UpdateLog.VERSION_IDX);
+          version = Math.abs(version);
+          versionToPos.put(version, pos);
+          pos = currentPos();
+
+          if (version < lastVersion) inOrder = false;
+          lastVersion = version;
+        }
+        fis.seek(startingPos);
+      }
+
+      if (inOrder) {
+        return super.next();
+      } else {
+        if (iterator == null) iterator = versionToPos.values().iterator();
+        if (!iterator.hasNext()) return null;
+        long pos = iterator.next();
+        if (pos != currentPos()) fis.seek(pos);
+        return super.next();
+      }
+    }
+  }
+
   public class HDFSReverseReader extends ReverseReader {
     FSDataFastInputStream fis;
     private LogCodec codec = new LogCodec(resolver) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java b/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
index a147b0f..e9950f2 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
@@ -137,7 +137,8 @@ public class SolrIndexSplitter {
         // we commit explicitly instead of sending a CommitUpdateCommand through the processor chain
         // because the sub-shard cores will just ignore such a commit because the update log is not
         // in active state at this time.
-        SolrIndexWriter.setCommitData(iw);
+        //TODO no commitUpdateCommand
+        SolrIndexWriter.setCommitData(iw, -1);
         iw.commit();
         success = true;
       } finally {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java b/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java
index 3c0c1a5..6a264f8 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java
@@ -61,6 +61,7 @@ public class SolrIndexWriter extends IndexWriter {
   /** Stored into each Lucene commit to record the
    *  System.currentTimeMillis() when commit was called. */
   public static final String COMMIT_TIME_MSEC_KEY = "commitTimeMSec";
+  public static final String COMMIT_COMMAND_VERSION = "commitCommandVer";
 
   private final Object CLOSE_LOCK = new Object();
   
@@ -183,10 +184,11 @@ public class SolrIndexWriter extends IndexWriter {
 
   @SuppressForbidden(reason = "Need currentTimeMillis, commit time should be used only for debugging purposes, " +
       " but currently suspiciously used for replication as well")
-  public static void setCommitData(IndexWriter iw) {
-    log.info("Calling setCommitData with IW:" + iw.toString());
+  public static void setCommitData(IndexWriter iw, long commitCommandVersion) {
+    log.info("Calling setCommitData with IW:" + iw.toString() + " commitCommandVersion:"+commitCommandVersion);
     final Map<String,String> commitData = new HashMap<>();
     commitData.put(COMMIT_TIME_MSEC_KEY, String.valueOf(System.currentTimeMillis()));
+    commitData.put(COMMIT_COMMAND_VERSION, String.valueOf(commitCommandVersion));
     iw.setLiveCommitData(commitData.entrySet());
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/update/TransactionLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/TransactionLog.java b/solr/core/src/java/org/apache/solr/update/TransactionLog.java
index 5037b45..73328cf 100644
--- a/solr/core/src/java/org/apache/solr/update/TransactionLog.java
+++ b/solr/core/src/java/org/apache/solr/update/TransactionLog.java
@@ -29,9 +29,11 @@ import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.lucene.util.BytesRef;
@@ -632,6 +634,10 @@ public class TransactionLog implements Closeable {
     return new LogReader(startingPos);
   }
 
+  public LogReader getSortedReader(long startingPos) {
+    return new SortedLogReader(startingPos);
+  }
+
   /** Returns a single threaded reverse reader */
   public ReverseReader getReverseReader() throws IOException {
     return new FSReverseReader();
@@ -715,6 +721,50 @@ public class TransactionLog implements Closeable {
 
   }
 
+  public class SortedLogReader extends LogReader {
+    private long startingPos;
+    private boolean inOrder = true;
+    private TreeMap<Long, Long> versionToPos;
+    Iterator<Long> iterator;
+
+    public SortedLogReader(long startingPos) {
+      super(startingPos);
+      this.startingPos = startingPos;
+    }
+
+    @Override
+    public Object next() throws IOException, InterruptedException {
+      if (versionToPos == null) {
+        versionToPos = new TreeMap<>();
+        Object o;
+        long pos = startingPos;
+
+        long lastVersion = Long.MIN_VALUE;
+        while ( (o = super.next()) != null) {
+          List entry = (List) o;
+          long version = (Long) entry.get(UpdateLog.VERSION_IDX);
+          version = Math.abs(version);
+          versionToPos.put(version, pos);
+          pos = currentPos();
+
+          if (version < lastVersion) inOrder = false;
+          lastVersion = version;
+        }
+        fis.seek(startingPos);
+      }
+
+      if (inOrder) {
+        return super.next();
+      } else {
+        if (iterator == null) iterator = versionToPos.values().iterator();
+        if (!iterator.hasNext()) return null;
+        long pos = iterator.next();
+        if (pos != currentPos()) fis.seek(pos);
+        return super.next();
+      }
+    }
+  }
+
   public abstract class ReverseReader {
 
     /** Returns the next object from the log, or null if none available.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateCommand.java b/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
index 9f01571..b124271 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
@@ -34,6 +34,7 @@ public abstract class UpdateCommand implements Cloneable {
   public static int PEER_SYNC    = 0x00000004; // update command is a missing update being provided by a peer.
   public static int IGNORE_AUTOCOMMIT = 0x00000008; // this update should not count toward triggering of autocommits.
   public static int CLEAR_CACHES = 0x00000010; // clear caches associated with the update log.  used when applying reordered DBQ updates when doing an add.
+  public static int IGNORE_INDEXWRITER = 0x00000020;
 
   public UpdateCommand(SolrQueryRequest req) {
     this.req = req;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/update/UpdateLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index 16eff9c..6a5f407 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -27,6 +27,7 @@ import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -618,7 +619,7 @@ public static final int VERSION_IDX = 1;
       }
 
       // only change our caches if we are not buffering
-      if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
+      if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0 && (cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) == 0) {
         // given that we just did a delete-by-query, we don't know what documents were
         // affected and hence we must purge our caches.
         openRealtimeSearcher();
@@ -1095,6 +1096,162 @@ public static final int VERSION_IDX = 1;
     return cs.submit(replayer, recoveryInfo);
   }
 
+  /**
+   * Replay current tlog, so all updates will be written to index.
+   * This is must do task for a append replica become a new leader.
+   * @return future of this task
+   */
+  public Future<RecoveryInfo> recoverFromCurrentLog() {
+    if (tlog == null) {
+      return null;
+    }
+    map.clear();
+    recoveryInfo = new RecoveryInfo();
+    tlog.incref();
+
+    ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<>(recoveryExecutor);
+    LogReplayer replayer = new LogReplayer(Collections.singletonList(tlog), false, true);
+
+    versionInfo.blockUpdates();
+    try {
+      state = State.REPLAYING;
+    } finally {
+      versionInfo.unblockUpdates();
+    }
+
+    return cs.submit(replayer, recoveryInfo);
+  }
+
+  /**
+   * Block updates, append a commit at current tlog,
+   * then copy over buffer updates to new tlog and bring back ulog to active state.
+   * So any updates which hasn't made it to the index is preserved in the current tlog,
+   * this also make RTG work
+   * @param cuc any updates that have version larger than the version of cuc will be copied over
+   */
+  public void copyOverBufferingUpdates(CommitUpdateCommand cuc) {
+    versionInfo.blockUpdates();
+    try {
+      operationFlags &= ~FLAG_GAP;
+      state = State.ACTIVE;
+      copyAndSwitchToNewTlog(cuc);
+    } finally {
+      versionInfo.unblockUpdates();
+    }
+  }
+
+  /**
+   * Block updates, append a commit at current tlog, then copy over updates to a new tlog.
+   * So any updates which hasn't made it to the index is preserved in the current tlog
+   * @param cuc any updates that have version larger than the version of cuc will be copied over
+   */
+  public void copyOverOldUpdates(CommitUpdateCommand cuc) {
+    versionInfo.blockUpdates();
+    try {
+      copyAndSwitchToNewTlog(cuc);
+    } finally {
+      versionInfo.unblockUpdates();
+    }
+  }
+
+  protected void copyAndSwitchToNewTlog(CommitUpdateCommand cuc) {
+    synchronized (this) {
+      if (tlog == null) return;
+      preCommit(cuc);
+      try {
+        copyOverOldUpdates(cuc.getVersion());
+      } finally {
+        postCommit(cuc);
+      }
+    }
+  }
+
+  /**
+   * Copy over updates from prevTlog or last tlog (in tlog folder) to a new tlog
+   * @param commitVersion any updates that have version larger than the commitVersion will be copied over
+   */
+  public void copyOverOldUpdates(long commitVersion) {
+    TransactionLog oldTlog = prevTlog;
+    if (oldTlog == null && !logs.isEmpty()) {
+      oldTlog = logs.getFirst();
+    }
+    if (oldTlog == null || oldTlog.refcount.get() == 0) {
+      return;
+    }
+
+    try {
+      if (oldTlog.endsWithCommit()) {
+        return;
+      }
+    } catch (IOException e) {
+      log.warn("Exception reading log", e);
+      return;
+    }
+
+    SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core,
+        new ModifiableSolrParams());
+    TransactionLog.LogReader logReader = oldTlog.getReader(0);
+    Object o = null;
+    try {
+      while ( (o = logReader.next()) != null ) {
+        try {
+          List entry = (List)o;
+          int operationAndFlags = (Integer) entry.get(0);
+          int oper = operationAndFlags & OPERATION_MASK;
+          long version = (Long) entry.get(1);
+          if (Math.abs(version) > commitVersion) {
+            switch (oper) {
+              case UpdateLog.UPDATE_INPLACE:
+              case UpdateLog.ADD: {
+                SolrInputDocument sdoc = (SolrInputDocument) entry.get(entry.size() - 1);
+                AddUpdateCommand cmd = new AddUpdateCommand(req);
+                cmd.solrDoc = sdoc;
+                cmd.setVersion(version);
+                cmd.setFlags(UpdateCommand.IGNORE_AUTOCOMMIT);
+                add(cmd);
+                break;
+              }
+              case UpdateLog.DELETE: {
+                byte[] idBytes = (byte[]) entry.get(2);
+                DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+                cmd.setIndexedId(new BytesRef(idBytes));
+                cmd.setVersion(version);
+                cmd.setFlags(UpdateCommand.IGNORE_AUTOCOMMIT);
+                delete(cmd);
+                break;
+              }
+
+              case UpdateLog.DELETE_BY_QUERY: {
+                String query = (String) entry.get(2);
+                DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+                cmd.query = query;
+                cmd.setVersion(version);
+                cmd.setFlags(UpdateCommand.IGNORE_AUTOCOMMIT);
+                deleteByQuery(cmd);
+                break;
+              }
+
+              default:
+                throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
+            }
+          }
+        } catch (ClassCastException e) {
+          log.warn("Unexpected log entry or corrupt log.  Entry=" + o, e);
+        }
+      }
+      // Prev tlog will be closed, so nullify prevMap
+      if (prevTlog == oldTlog) {
+        prevMap = null;
+      }
+    } catch (IOException e) {
+      log.error("Exception reading versions from log",e);
+    } catch (InterruptedException e) {
+      log.warn("Exception reading log", e);
+    } finally {
+      if (logReader != null) logReader.close();
+    }
+  }
+
 
   protected void ensureLog() {
     if (tlog == null) {
@@ -1482,6 +1639,7 @@ public static final int VERSION_IDX = 1;
     boolean activeLog;
     boolean finishing = false;  // state where we lock out other updates and finish those updates that snuck in before we locked
     boolean debug = loglog.isDebugEnabled();
+    boolean inSortedOrder;
 
     public LogReplayer(List<TransactionLog> translogs, boolean activeLog) {
       this.translogs = new LinkedList<>();
@@ -1489,6 +1647,11 @@ public static final int VERSION_IDX = 1;
       this.activeLog = activeLog;
     }
 
+    public LogReplayer(List<TransactionLog> translogs, boolean activeLog, boolean inSortedOrder) {
+      this(translogs, activeLog);
+      this.inSortedOrder = inSortedOrder;
+    }
+
 
 
     private SolrQueryRequest req;
@@ -1554,7 +1717,11 @@ public static final int VERSION_IDX = 1;
       try {
         loglog.warn("Starting log replay " + translog + " active=" + activeLog + " starting pos=" + recoveryInfo.positionOfStart);
         long lastStatusTime = System.nanoTime();
-        tlogReader = translog.getReader(recoveryInfo.positionOfStart);
+        if (inSortedOrder) {
+          tlogReader = translog.getSortedReader(recoveryInfo.positionOfStart);
+        } else {
+          tlogReader = translog.getReader(recoveryInfo.positionOfStart);
+        }
 
         // NOTE: we don't currently handle a core reload during recovery.  This would cause the core
         // to change underneath us.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index ec093cf..08ede72 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -279,6 +279,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
   // this is set to true in the constructor if the next processors in the chain
   // are custom and may modify the SolrInputDocument racing with its serialization for replication
   private final boolean cloneRequiredOnLeader;
+  private final boolean onlyLeaderIndexes;
 
   public DistributedUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
     this(req, rsp, new AtomicUpdateDocumentMerger(req), next);
@@ -324,8 +325,12 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     
     if (cloudDesc != null) {
       collection = cloudDesc.getCollectionName();
+      ClusterState cstate = zkController.getClusterState();
+      DocCollection coll = cstate.getCollection(collection);
+      onlyLeaderIndexes = coll.getRealtimeReplicas() == 1;
     } else {
       collection = null;
+      onlyLeaderIndexes = false;
     }
 
     boolean shouldClone = false;
@@ -1186,6 +1191,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
                 checkDeleteByQueries = true;
               }
             }
+            if (onlyLeaderIndexes && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+              cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
+            }
           }
         }
         
@@ -1692,6 +1700,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
             return;
           }
 
+          if (onlyLeaderIndexes && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+            cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
+          }
+
           doLocalDelete(cmd);
         }
       }
@@ -1845,6 +1857,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
                 return true;
               }
             }
+
+            if (onlyLeaderIndexes && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+              cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
+            }
           }
         }
 
@@ -1876,7 +1892,27 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     }
     
     if (!zkEnabled || req.getParams().getBool(COMMIT_END_POINT, false) || singleLeader) {
-      doLocalCommit(cmd);
+      if (onlyLeaderIndexes) {
+        try {
+          Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
+              collection, cloudDesc.getShardId());
+          isLeader = leaderReplica.getName().equals(
+              req.getCore().getCoreDescriptor().getCloudDescriptor()
+                  .getCoreNodeName());
+          if (isLeader) {
+            long commitVersion = vinfo.getNewClock();
+            cmd.setVersion(commitVersion);
+            doLocalCommit(cmd);
+          } else {
+            assert TestInjection.waitForInSyncWithLeader(req.getCore(),
+                zkController, collection, cloudDesc.getShardId());
+          }
+        } catch (InterruptedException e) {
+          throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + cloudDesc.getShardId(), e);
+        }
+      } else {
+        doLocalCommit(cmd);
+      }
     } else {
       ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
       if (!req.getParams().getBool(COMMIT_END_POINT, false)) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/java/org/apache/solr/util/TestInjection.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/util/TestInjection.java b/solr/core/src/java/org/apache/solr/util/TestInjection.java
index 5e4dc75..97291a1 100644
--- a/solr/core/src/java/org/apache/solr/util/TestInjection.java
+++ b/solr/core/src/java/org/apache/solr/util/TestInjection.java
@@ -28,14 +28,28 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.NonExistentCoreException;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.Pair;
+import org.apache.solr.common.util.SuppressForbidden;
 import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.ReplicationHandler;
+import org.apache.solr.update.SolrIndexWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.solr.handler.ReplicationHandler.CMD_DETAILS;
+import static org.apache.solr.handler.ReplicationHandler.COMMAND;
+
 
 /**
  * Allows random faults to be injected in running code during test runs.
@@ -118,6 +132,8 @@ public class TestInjection {
   public static int randomDelayMaxInCoreCreationInSec = 10;
 
   public static String splitFailureBeforeReplicaCreation = null;
+
+  public static String waitForReplicasInSync = "true:60";
   
   private static Set<Timer> timers = Collections.synchronizedSet(new HashSet<Timer>());
 
@@ -343,6 +359,44 @@ public class TestInjection {
 
     return true;
   }
+
+  @SuppressForbidden(reason = "Need currentTimeMillis, because COMMIT_TIME_MSEC_KEY use currentTimeMillis as value")
+  public static boolean waitForInSyncWithLeader(SolrCore core, ZkController zkController, String collection, String shardId) throws InterruptedException {
+    if (waitForReplicasInSync == null) return true;
+
+    Pair<Boolean,Integer> pair = parseValue(waitForReplicasInSync);
+    boolean enabled = pair.first();
+    if (!enabled) return true;
+    long t = System.currentTimeMillis() - 100;
+    try {
+      for (int i = 0; i < pair.second(); i++) {
+        if (core.isClosed()) return true;
+        Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
+            collection, shardId);
+        try (HttpSolrClient leaderClient = new HttpSolrClient.Builder(leaderReplica.getCoreUrl()).build()) {
+          ModifiableSolrParams params = new ModifiableSolrParams();
+          params.set(CommonParams.QT, ReplicationHandler.PATH);
+          params.set(COMMAND, CMD_DETAILS);
+
+          NamedList<Object> response = leaderClient.request(new QueryRequest(params));
+          long leaderVersion = (long) ((NamedList)response.get("details")).get("indexVersion");
+
+          String localVersion = core.getDeletionPolicy().getLatestCommit().getUserData().get(SolrIndexWriter.COMMIT_TIME_MSEC_KEY);
+          if (localVersion == null && leaderVersion == 0 && !core.getUpdateHandler().getUpdateLog().hasUncommittedChanges()) return true;
+          if (localVersion != null && Long.parseLong(localVersion) == leaderVersion && (leaderVersion >= t || i >= 6)) {
+            return true;
+          } else {
+            Thread.sleep(500);
+          }
+        }
+      }
+
+    } catch (Exception e) {
+      log.error("Exception when wait for replicas in sync with master");
+    }
+
+    return false;
+  }
   
   private static Pair<Boolean,Integer> parseValue(String raw) {
     Matcher m = ENABLED_PERCENT.matcher(raw);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/schema.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/schema.xml b/solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/schema.xml
new file mode 100644
index 0000000..31802f9
--- /dev/null
+++ b/solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/schema.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<schema name="minimal" version="1.1">
+
+  <field name="inplace_updatable_int"   type="int"   indexed="false" stored="false" docValues="true" />
+  <dynamicField name="*" type="string" indexed="true" stored="true"/>
+
+  <!-- for versioning -->
+  <field name="_version_" type="long" indexed="false" stored="false"  docValues="true" />
+  <field name="id" type="string" indexed="true" stored="true" docValues="true"/>
+  <uniqueKey>id</uniqueKey>
+
+  <fieldType name="string" class="solr.StrField"/>
+  <fieldType name="int" class="solr.TrieIntField" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
+  <fieldType name="long" class="solr.TrieLongField" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
+</schema>

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/solrconfig.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/solrconfig.xml b/solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/solrconfig.xml
new file mode 100644
index 0000000..8da7d28
--- /dev/null
+++ b/solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/solrconfig.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- Minimal solrconfig.xml with /select, /admin and /update only -->
+
+<config>
+
+  <dataDir>${solr.data.dir:}</dataDir>
+
+  <directoryFactory name="DirectoryFactory"
+                    class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
+  <schemaFactory class="ClassicIndexSchemaFactory"/>
+
+  <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+  <updateHandler class="solr.DirectUpdateHandler2">
+    <commitWithin>
+      <softCommit>${solr.commitwithin.softcommit:true}</softCommit>
+    </commitWithin>
+    <updateLog class="${solr.ulog:solr.UpdateLog}"></updateLog>
+  </updateHandler>
+
+  <requestHandler name="/select" class="solr.SearchHandler">
+    <lst name="defaults">
+      <str name="echoParams">explicit</str>
+      <str name="indent">true</str>
+      <str name="df">text</str>
+    </lst>
+
+  </requestHandler>
+</config>
+

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java
index 582c8b4..5eb4b3b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java
+++ b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java
@@ -54,11 +54,17 @@ public class BasicDistributedZk2Test extends AbstractFullDistribZkTestBase {
   private static final String SHARD2 = "shard2";
   private static final String SHARD1 = "shard1";
   private static final String ONE_NODE_COLLECTION = "onenodecollection";
+  private final boolean onlyLeaderIndexes = random().nextBoolean();
 
   public BasicDistributedZk2Test() {
     super();
     sliceCount = 2;
   }
+
+  @Override
+  protected int getRealtimeReplicas() {
+    return onlyLeaderIndexes? 1 : -1;
+  }
   
   @Test
   @ShardsFixed(num = 4)

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
index 25c483b..d1dbe9c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
@@ -87,6 +87,8 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private static final String DEFAULT_COLLECTION = "collection1";
+
+  private final boolean onlyLeaderIndexes = random().nextBoolean();
   String t1="a_t";
   String i1="a_i1";
   String tlong = "other_tl1";
@@ -114,7 +116,12 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
     pending = new HashSet<>();
     
   }
-  
+
+  @Override
+  protected int getRealtimeReplicas() {
+    return onlyLeaderIndexes? 1 : -1;
+  }
+
   @Override
   protected void setDistributedParams(ModifiableSolrParams params) {
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
index 4e6122e..628884c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
@@ -55,6 +55,8 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
   
   private static final Integer RUN_LENGTH = Integer.parseInt(System.getProperty("solr.tests.cloud.cm.runlength", "-1"));
 
+  private final boolean onlyLeaderIndexes = random().nextBoolean();
+
   @BeforeClass
   public static void beforeSuperClass() {
     schemaString = "schema15.xml";      // we need a string id
@@ -109,6 +111,11 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
     clientSoTimeout = 5000;
   }
 
+  @Override
+  protected int getRealtimeReplicas() {
+    return onlyLeaderIndexes? 1 : -1;
+  }
+
   @Test
   public void test() throws Exception {
     cloudClient.setSoTimeout(clientSoTimeout);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
index e9e8907..8904ea8 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
@@ -55,6 +55,12 @@ import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
 
 public class ForceLeaderTest extends HttpPartitionTest {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final boolean onlyLeaderIndexes = random().nextBoolean();
+
+  @Override
+  protected int getRealtimeReplicas() {
+    return onlyLeaderIndexes? 1 : -1;
+  }
 
   @Test
   @Override

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
index 5ae4c17..01002cf 100644
--- a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
@@ -76,12 +76,19 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
   // give plenty of time for replicas to recover when running in slow Jenkins test envs
   protected static final int maxWaitSecsToSeeAllActive = 90;
 
+  private final boolean onlyLeaderIndexes = random().nextBoolean();
+
   public HttpPartitionTest() {
     super();
     sliceCount = 2;
     fixShardCount(3);
   }
 
+  @Override
+  protected int getRealtimeReplicas() {
+    return onlyLeaderIndexes? 1 : -1;
+  }
+
   /**
    * We need to turn off directUpdatesToLeadersOnly due to SOLR-9512
    */

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7830462d/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java
index fd122ad..457b9d9 100644
--- a/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java
@@ -37,6 +37,8 @@ public class LeaderInitiatedRecoveryOnCommitTest extends BasicDistributedZkTest
 
   private static final long sleepMsBeforeHealPartition = 2000L;
 
+  private final boolean onlyLeaderIndexes = random().nextBoolean();
+
   public LeaderInitiatedRecoveryOnCommitTest() {
     super();
     sliceCount = 1;
@@ -44,6 +46,11 @@ public class LeaderInitiatedRecoveryOnCommitTest extends BasicDistributedZkTest
   }
 
   @Override
+  protected int getRealtimeReplicas() {
+    return onlyLeaderIndexes? 1 : -1;
+  }
+
+  @Override
   @Test
   public void test() throws Exception {
     oneShardTest();