You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by tf...@apache.org on 2017/03/08 22:31:44 UTC

[1/6] lucene-solr:jira/solr-10233: First commit

Repository: lucene-solr
Updated Branches:
  refs/heads/jira/solr-10233 [created] 429411ca5


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery.java b/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery.java
index 1af09f4..538024b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery.java
@@ -52,6 +52,7 @@ import org.junit.Test;
 public class TestCloudRecovery extends SolrCloudTestCase {
 
   private static final String COLLECTION = "collection1";
+  private static boolean onlyLeaderIndexes;
 
   @BeforeClass
   public static void setupCluster() throws Exception {
@@ -63,8 +64,10 @@ public class TestCloudRecovery extends SolrCloudTestCase {
         .addConfig("config", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
         .configure();
 
+    onlyLeaderIndexes = random().nextBoolean();
     CollectionAdminRequest
         .createCollection(COLLECTION, "config", 2, 2)
+        .setRealtimeReplicas(onlyLeaderIndexes? 1: -1)
         .setMaxShardsPerNode(2)
         .process(cluster.getSolrClient());
     AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
@@ -107,7 +110,11 @@ public class TestCloudRecovery extends SolrCloudTestCase {
     resp = cloudClient.query(COLLECTION, params);
     assertEquals(4, resp.getResults().getNumFound());
     // Make sure all nodes is recover from tlog
-    assertEquals(4, countReplayLog.get());
+    if (onlyLeaderIndexes) {
+      assertEquals(2, countReplayLog.get());
+    } else {
+      assertEquals(4, countReplayLog.get());
+    }
 
     // check metrics
     int replicationCount = 0;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java b/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java
index 8905077..8fbfee3 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java
@@ -60,7 +60,10 @@ public class TestCollectionAPI extends ReplicaPropertiesBase {
   @ShardsFixed(num = 2)
   public void test() throws Exception {
     try (CloudSolrClient client = createCloudClient(null)) {
-      createCollection(null, COLLECTION_NAME, 2, 2, 2, client, null, "conf1");
+      CollectionAdminRequest.Create req = CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf1",2,2);
+      req.setRealtimeReplicas(1);
+      req.setMaxShardsPerNode(2);
+      client.request(req);
       createCollection(null, COLLECTION_NAME1, 1, 1, 1, client, null, "conf1");
     }
 
@@ -170,6 +173,7 @@ public class TestCollectionAPI extends ReplicaPropertiesBase {
       Map<String, Object> collection = (Map<String, Object>) collections.get(COLLECTION_NAME);
       assertNotNull(collection);
       assertEquals("conf1", collection.get("configName"));
+      assertEquals("1", collection.get("realtimeReplicas"));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java b/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java
index 4c90bc6..cddbbb1 100644
--- a/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java
+++ b/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java
@@ -71,6 +71,7 @@ import org.slf4j.LoggerFactory;
 @Slow
 public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final boolean onlyLeaderIndexes = random().nextBoolean();
 
   @BeforeClass
   public static void beforeSuperClass() throws Exception {
@@ -108,7 +109,12 @@ public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase {
       iw.decref();
     }
   }
-  
+
+  @Override
+  protected int getRealtimeReplicas() {
+    return onlyLeaderIndexes? 1 : -1;
+  }
+
   @After
   public void after() {
     System.clearProperty("solr.tests.intClassName");
@@ -196,6 +202,10 @@ public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase {
 
   // The following should work: full update to doc 0, in-place update for doc 0, delete doc 0
   private void outOfOrderDBQsTest() throws Exception {
+    if (onlyLeaderIndexes) {
+      log.info("RTG with DBQs are not working in active replicas");
+      return;
+    }
     
     clearIndex();
     commit();
@@ -262,6 +272,10 @@ public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase {
   }
 
   private void reorderedDBQIndividualReplicaTest() throws Exception {
+    if (onlyLeaderIndexes) {
+      log.info("RTG with DBQs are not working in active replicas");
+      return;
+    }
     clearIndex();
     commit();
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index 94750c0a..8beb6ed 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -366,6 +366,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
 
     private Properties properties;
     protected Boolean autoAddReplicas;
+    protected Integer realtimeReplicas;
     protected Integer stateFormat;
     private String[] rule , snitch;
 
@@ -407,6 +408,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     public Create setNumShards(Integer numShards) {this.numShards = numShards; return this; }
     public Create setMaxShardsPerNode(Integer numShards) { this.maxShardsPerNode = numShards; return this; }
     public Create setAutoAddReplicas(boolean autoAddReplicas) { this.autoAddReplicas = autoAddReplicas; return this; }
+    public Create setRealtimeReplicas(Integer realtimeReplicas) { this.realtimeReplicas = realtimeReplicas; return this;}
     @Deprecated
     public Create setReplicationFactor(Integer repl) { this.replicationFactor = repl; return this; }
     public Create setStateFormat(Integer stateFormat) { this.stateFormat = stateFormat; return this; }
@@ -421,6 +423,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     public Integer getMaxShardsPerNode() { return maxShardsPerNode; }
     public Integer getReplicationFactor() { return replicationFactor; }
     public Boolean getAutoAddReplicas() { return autoAddReplicas; }
+    public Integer getRealtimeReplicas() { return realtimeReplicas; }
     public Integer getStateFormat() { return stateFormat; }
     
     /**
@@ -507,6 +510,9 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
       if (autoAddReplicas != null) {
         params.set(ZkStateReader.AUTO_ADD_REPLICAS, autoAddReplicas);
       }
+      if (realtimeReplicas != null) {
+        params.set(ZkStateReader.REALTIME_REPLICAS, realtimeReplicas);
+      }
       if(properties != null) {
         addProperties(params, properties);
       }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
index 179b9d5..bf0f04f 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
@@ -33,6 +33,7 @@ import org.noggit.JSONWriter;
 
 import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
+import static org.apache.solr.common.cloud.ZkStateReader.REALTIME_REPLICAS;
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
 
 /**
@@ -59,6 +60,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
   private final Integer replicationFactor;
   private final Integer maxShardsPerNode;
   private final Boolean autoAddReplicas;
+  private final Integer realtimeReplicas;
 
 
   public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) {
@@ -84,6 +86,11 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
     this.maxShardsPerNode = (Integer) verifyProp(props, MAX_SHARDS_PER_NODE);
     Boolean autoAddReplicas = (Boolean) verifyProp(props, AUTO_ADD_REPLICAS);
     this.autoAddReplicas = autoAddReplicas == null ? false : autoAddReplicas;
+    Integer realtimeReplicas = (Integer) verifyProp(props, REALTIME_REPLICAS);
+    this.realtimeReplicas = realtimeReplicas == null ? -1 : realtimeReplicas;
+    if (this.realtimeReplicas != -1 && this.realtimeReplicas != 1) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Invalid realtimeReplicas must be 1 or -1, found:" + this.realtimeReplicas);
+    }
     verifyProp(props, RULE);
     verifyProp(props, SNITCH);
     Iterator<Map.Entry<String, Slice>> iter = slices.entrySet().iterator();
@@ -126,6 +133,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
     switch (propName) {
       case MAX_SHARDS_PER_NODE:
       case REPLICATION_FACTOR:
+      case REALTIME_REPLICAS:
         return Integer.parseInt(o.toString());
       case AUTO_ADD_REPLICAS:
         return Boolean.parseBoolean(o.toString());
@@ -226,6 +234,10 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
     return maxShardsPerNode;
   }
 
+  public int getRealtimeReplicas() {
+    return realtimeReplicas;
+  }
+
   public String getZNode(){
     return znode;
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index fea5978..51b4b59 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -96,6 +96,7 @@ public class ZkStateReader implements Closeable {
   public static final String MAX_SHARDS_PER_NODE = "maxShardsPerNode";
   public static final String AUTO_ADD_REPLICAS = "autoAddReplicas";
   public static final String MAX_CORES_PER_NODE = "maxCoresPerNode";
+  public static final String REALTIME_REPLICAS = "realtimeReplicas";
 
   public static final String ROLES = "/roles.json";
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
----------------------------------------------------------------------
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
index 04eb722..ade1c69 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
@@ -272,6 +272,10 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
     chaosMonkey = new ChaosMonkey(zkServer, zkStateReader, DEFAULT_COLLECTION,
         shardToJetty, shardToLeaderJetty);
   }
+
+  protected int getRealtimeReplicas() {
+    return -1;
+  }
   
   protected CloudSolrClient createCloudClient(String defaultCollection) {
     CloudSolrClient client = getCloudSolrClient(zkServer.getZkAddress(), random().nextBoolean());
@@ -383,7 +387,8 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
           Utils.toJSON(Utils.makeMap(Overseer.QUEUE_OPERATION,
               CollectionParams.CollectionAction.CREATE.toLower(), "name",
               DEFAULT_COLLECTION, "numShards", String.valueOf(sliceCount),
-              DocCollection.STATE_FORMAT, getStateFormat())));
+              DocCollection.STATE_FORMAT, getStateFormat(),
+              ZkStateReader.REALTIME_REPLICAS, getRealtimeReplicas())));
       zkClient.close();
     }
 
@@ -1619,7 +1624,8 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
         NUM_SLICES, numShards,
         ZkStateReader.REPLICATION_FACTOR, replicationFactor,
         CREATE_NODE_SET, createNodeSetStr,
-        ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode),
+        ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode,
+        ZkStateReader.REALTIME_REPLICAS, getRealtimeReplicas()),
         client);
   }
 
@@ -1631,7 +1637,8 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
         NUM_SLICES, numShards,
         ZkStateReader.REPLICATION_FACTOR, replicationFactor,
         CREATE_NODE_SET, createNodeSetStr,
-        ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode),
+        ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode,
+        ZkStateReader.REALTIME_REPLICAS, getRealtimeReplicas()),
         client, configName);
   }
 
@@ -1814,6 +1821,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
     Map<String, Object> props = makeMap(
         ZkStateReader.REPLICATION_FACTOR, replicationFactor,
         ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode,
+        ZkStateReader.REALTIME_REPLICAS, getRealtimeReplicas(),
         NUM_SLICES, numShards);
     Map<String,List<Integer>> collectionInfos = new HashMap<>();
     createCollection(collectionInfos, collName, props, client);


[3/6] lucene-solr:jira/solr-10233: Fix review bug

Posted by tf...@apache.org.
Fix review bug


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/cbcf8929
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/cbcf8929
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/cbcf8929

Branch: refs/heads/jira/solr-10233
Commit: cbcf8929899021fbfc3f9c41494b164e432578ca
Parents: 6344378
Author: Cao Manh Dat <da...@apache.org>
Authored: Fri Mar 3 14:20:25 2017 +0700
Committer: Tomas Fernandez Lobbe <tf...@apache.org>
Committed: Wed Mar 8 13:37:28 2017 -0800

----------------------------------------------------------------------
 solr/core/src/java/org/apache/solr/handler/IndexFetcher.java    | 2 +-
 solr/core/src/java/org/apache/solr/update/UpdateLog.java        | 2 +-
 .../test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java | 5 +++++
 .../test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java  | 4 ++--
 4 files changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cbcf8929/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index 3e1a4aa..a07496f 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -422,7 +422,7 @@ public class IndexFetcher {
           isFullCopyNeeded = true;
         }
 
-        if (!isFullCopyNeeded) {
+        if (!isFullCopyNeeded && !fetchFromLeader) {
           // a searcher might be using some flushed but not committed segments
           // because of soft commits (which open a searcher on IW's data)
           // so we need to close the existing searcher on the last commit

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cbcf8929/solr/core/src/java/org/apache/solr/update/UpdateLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index 421d33c..6a5f407 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -1098,7 +1098,7 @@ public static final int VERSION_IDX = 1;
 
   /**
    * Replay current tlog, so all updates will be written to index.
-   * This is must do task for a active replica become a new leader.
+   * This is must do task for a append replica become a new leader.
    * @return future of this task
    */
   public Future<RecoveryInfo> recoverFromCurrentLog() {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cbcf8929/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java b/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java
index c987c90..ce4c139 100644
--- a/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java
@@ -39,6 +39,11 @@ public class RecoveryAfterSoftCommitTest extends AbstractFullDistribZkTestBase {
     fixShardCount(2);
   }
 
+  @Override
+  protected int getRealtimeReplicas() {
+    return 1;
+  }
+
   @BeforeClass
   public static void beforeTests() {
     System.setProperty("solr.tests.maxBufferedDocs", String.valueOf(MAX_BUFFERED_DOCS));

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/cbcf8929/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java b/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java
index cddbbb1..4ea90fd 100644
--- a/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java
+++ b/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java
@@ -203,7 +203,7 @@ public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase {
   // The following should work: full update to doc 0, in-place update for doc 0, delete doc 0
   private void outOfOrderDBQsTest() throws Exception {
     if (onlyLeaderIndexes) {
-      log.info("RTG with DBQs are not working in active replicas");
+      log.info("RTG with DBQs are not working in append replicas");
       return;
     }
     
@@ -273,7 +273,7 @@ public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase {
 
   private void reorderedDBQIndividualReplicaTest() throws Exception {
     if (onlyLeaderIndexes) {
-      log.info("RTG with DBQs are not working in active replicas");
+      log.info("RTG with DBQs are not working in append replicas");
       return;
     }
     clearIndex();


[2/6] lucene-solr:jira/solr-10233: First commit

Posted by tf...@apache.org.
First commit


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/63443783
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/63443783
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/63443783

Branch: refs/heads/jira/solr-10233
Commit: 6344378375f9b71833964159556fbb41a891bae3
Parents: 8756be0
Author: Cao Manh Dat <da...@apache.org>
Authored: Fri Mar 3 10:43:03 2017 +0700
Committer: Tomas Fernandez Lobbe <tf...@apache.org>
Committed: Wed Mar 8 13:37:16 2017 -0800

----------------------------------------------------------------------
 .../solr/hadoop/TreeMergeOutputFormat.java      |   3 +-
 .../org/apache/solr/cloud/ElectionContext.java  |  20 +-
 .../cloud/OverseerCollectionMessageHandler.java |   1 +
 .../org/apache/solr/cloud/RecoveryStrategy.java |  29 +-
 .../apache/solr/cloud/ReplicateFromLeader.java  | 123 ++++++
 .../org/apache/solr/cloud/ZkController.java     |  32 +-
 .../org/apache/solr/core/CoreContainer.java     |   7 +
 .../org/apache/solr/handler/IndexFetcher.java   |  32 +-
 .../apache/solr/handler/ReplicationHandler.java |  28 +-
 .../solr/handler/admin/CollectionsHandler.java  |   4 +-
 .../org/apache/solr/update/CommitTracker.java   |   5 +
 .../solr/update/DirectUpdateHandler2.java       |  31 +-
 .../apache/solr/update/HdfsTransactionLog.java  |  50 +++
 .../apache/solr/update/SolrIndexSplitter.java   |   3 +-
 .../org/apache/solr/update/SolrIndexWriter.java |   6 +-
 .../org/apache/solr/update/TransactionLog.java  |  50 +++
 .../org/apache/solr/update/UpdateCommand.java   |   1 +
 .../java/org/apache/solr/update/UpdateLog.java  | 171 +++++++-
 .../processor/DistributedUpdateProcessor.java   |  38 +-
 .../org/apache/solr/util/TestInjection.java     |  53 +++
 .../conf/schema.xml                             |  31 ++
 .../conf/solrconfig.xml                         |  48 +++
 .../solr/cloud/BasicDistributedZk2Test.java     |   6 +
 .../solr/cloud/BasicDistributedZkTest.java      |   9 +-
 .../cloud/ChaosMonkeyNothingIsSafeTest.java     |   7 +
 .../org/apache/solr/cloud/ForceLeaderTest.java  |   6 +
 .../apache/solr/cloud/HttpPartitionTest.java    |   7 +
 .../LeaderInitiatedRecoveryOnCommitTest.java    |   7 +
 .../solr/cloud/OnlyLeaderIndexesTest.java       | 411 +++++++++++++++++++
 .../apache/solr/cloud/TestCloudRecovery.java    |   9 +-
 .../apache/solr/cloud/TestCollectionAPI.java    |   6 +-
 .../solr/update/TestInPlaceUpdatesDistrib.java  |  16 +-
 .../solrj/request/CollectionAdminRequest.java   |   6 +
 .../apache/solr/common/cloud/DocCollection.java |  12 +
 .../apache/solr/common/cloud/ZkStateReader.java |   1 +
 .../cloud/AbstractFullDistribZkTestBase.java    |  14 +-
 36 files changed, 1254 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java
----------------------------------------------------------------------
diff --git a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java b/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java
index e3487ad..cac57c3 100644
--- a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java
+++ b/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java
@@ -163,7 +163,8 @@ public class TreeMergeOutputFormat extends FileOutputFormat<Text, NullWritable>
 
         // Set Solr's commit data so the created index is usable by SolrCloud. E.g. Currently SolrCloud relies on
         // commitTimeMSec in the commit data to do replication.
-        SolrIndexWriter.setCommitData(writer);
+        //TODO no commitUpdateCommand
+        SolrIndexWriter.setCommitData(writer, -1);
 
         timer = new RTimer();
         LOG.info("Optimizing Solr: Closing index writer");

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
index d3ad322..223a539 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.fs.Path;
@@ -420,7 +421,24 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
         try {
           // we must check LIR before registering as leader
           checkLIR(coreName, allReplicasInLine);
-          
+
+          boolean onlyLeaderIndexes = zkController.getClusterState().getCollection(collection).getRealtimeReplicas() == 1;
+          if (onlyLeaderIndexes) {
+            // stop replicate from old leader
+            zkController.stopReplicationFromLeader(coreName);
+            if (weAreReplacement) {
+              try (SolrCore core = cc.getCore(coreName)) {
+                Future<UpdateLog.RecoveryInfo> future = core.getUpdateHandler().getUpdateLog().recoverFromCurrentLog();
+                if (future != null) {
+                  log.info("Replaying tlog before become new leader");
+                  future.get();
+                } else {
+                  log.info("New leader does not have old tlog to replay");
+                }
+              }
+            }
+          }
+
           super.runLeaderProcess(weAreReplacement, 0);
           try (SolrCore core = cc.getCore(coreName)) {
             if (core != null) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
index 00eb12d..4d64a00 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
@@ -131,6 +131,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
       ZkStateReader.REPLICATION_FACTOR, "1",
       ZkStateReader.MAX_SHARDS_PER_NODE, "1",
       ZkStateReader.AUTO_ADD_REPLICAS, "false",
+      ZkStateReader.REALTIME_REPLICAS, "-1",
       DocCollection.RULE, null,
       SNITCH, null));
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 3bd2e74..52efff9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -91,7 +91,8 @@ public class RecoveryStrategy extends Thread implements Closeable {
   private boolean recoveringAfterStartup;
   private CoreContainer cc;
   private volatile HttpUriRequest prevSendPreRecoveryHttpUriRequest;
-  
+  private boolean onlyLeaderIndexes;
+
   // this should only be used from SolrCoreState
   public RecoveryStrategy(CoreContainer cc, CoreDescriptor cd, RecoveryListener recoveryListener) {
     this.cc = cc;
@@ -102,6 +103,8 @@ public class RecoveryStrategy extends Thread implements Closeable {
     zkStateReader = zkController.getZkStateReader();
     baseUrl = zkController.getBaseUrl();
     coreZkNodeName = cd.getCloudDescriptor().getCoreNodeName();
+    String collection = cd.getCloudDescriptor().getCollectionName();
+    onlyLeaderIndexes = zkStateReader.getClusterState().getCollection(collection).getRealtimeReplicas() == 1;
   }
 
   public void setRecoveringAfterStartup(boolean recoveringAfterStartup) {
@@ -198,7 +201,7 @@ public class RecoveryStrategy extends Thread implements Closeable {
       UpdateRequest ureq = new UpdateRequest();
       ureq.setParams(new ModifiableSolrParams());
       ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
-      ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
+      ureq.getParams().set(UpdateParams.OPEN_SEARCHER, onlyLeaderIndexes);
       ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(
           client);
     }
@@ -247,7 +250,8 @@ public class RecoveryStrategy extends Thread implements Closeable {
       return;
     }
 
-    boolean firstTime = true;
+    // we temporary ignore peersync for realtimeReplicas mode
+    boolean firstTime = !onlyLeaderIndexes;
 
     List<Long> recentVersions;
     try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
@@ -299,6 +303,10 @@ public class RecoveryStrategy extends Thread implements Closeable {
       }
     }
 
+    if (onlyLeaderIndexes) {
+      zkController.stopReplicationFromLeader(coreName);
+    }
+
     Future<RecoveryInfo> replayFuture = null;
     while (!successfulRecovery && !isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
       try {
@@ -452,6 +460,9 @@ public class RecoveryStrategy extends Thread implements Closeable {
         if (successfulRecovery) {
           LOG.info("Registering as Active after recovery.");
           try {
+            if (onlyLeaderIndexes) {
+              zkController.startReplicationFromLeader(coreName);
+            }
             zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
           } catch (Exception e) {
             LOG.error("Could not publish as ACTIVE after succesful recovery", e);
@@ -525,8 +536,20 @@ public class RecoveryStrategy extends Thread implements Closeable {
     LOG.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery));
   }
 
+  public static Runnable testing_beforeReplayBufferingUpdates;
+
   private Future<RecoveryInfo> replay(SolrCore core)
       throws InterruptedException, ExecutionException {
+    if (testing_beforeReplayBufferingUpdates != null) {
+      testing_beforeReplayBufferingUpdates.run();
+    }
+    if (onlyLeaderIndexes) {
+      // roll over all updates during buffering to new tlog, make RTG available
+      SolrQueryRequest req = new LocalSolrQueryRequest(core,
+          new ModifiableSolrParams());
+      core.getUpdateHandler().getUpdateLog().copyOverBufferingUpdates(new CommitUpdateCommand(req, false));
+      return null;
+    }
     Future<RecoveryInfo> future = core.getUpdateHandler().getUpdateLog().applyBufferedUpdates();
     if (future == null) {
       // no replay needed\

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
new file mode 100644
index 0000000..48aaced
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+
+import org.apache.lucene.index.IndexCommit;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrConfig;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.ReplicationHandler;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.update.CommitUpdateCommand;
+import org.apache.solr.update.SolrIndexWriter;
+import org.apache.solr.update.UpdateLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplicateFromLeader {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private CoreContainer cc;
+  private String coreName;
+
+  private ReplicationHandler replicationProcess;
+  private long lastVersion = 0;
+
+  public ReplicateFromLeader(CoreContainer cc, String coreName) {
+    this.cc = cc;
+    this.coreName = coreName;
+  }
+
+  public void startReplication() throws InterruptedException {
+    try (SolrCore core = cc.getCore(coreName)) {
+      if (core == null) {
+        if (cc.isShutDown()) {
+          return;
+        } else {
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "SolrCore not found:" + coreName + " in " + cc.getCoreNames());
+        }
+      }
+      SolrConfig.UpdateHandlerInfo uinfo = core.getSolrConfig().getUpdateHandlerInfo();
+      String pollIntervalStr = "00:00:03";
+      if (uinfo.autoCommmitMaxTime != -1) {
+        pollIntervalStr = toPollIntervalStr(uinfo.autoCommmitMaxTime/2);
+      } else if (uinfo.autoSoftCommmitMaxTime != -1) {
+        pollIntervalStr = toPollIntervalStr(uinfo.autoSoftCommmitMaxTime/2);
+      }
+
+      NamedList slaveConfig = new NamedList();
+      slaveConfig.add("fetchFromLeader", true);
+      slaveConfig.add("pollInterval", pollIntervalStr);
+      NamedList replicationConfig = new NamedList();
+      replicationConfig.add("slave", slaveConfig);
+
+      String lastCommitVersion = getCommitVersion(core);
+      if (lastCommitVersion != null) {
+        lastVersion = Long.parseLong(lastCommitVersion);
+      }
+
+      replicationProcess = new ReplicationHandler();
+      replicationProcess.setPollListener((solrCore, pollSuccess) -> {
+        if (pollSuccess) {
+          String commitVersion = getCommitVersion(core);
+          if (commitVersion == null) return;
+          if (Long.parseLong(commitVersion) == lastVersion) return;
+          UpdateLog updateLog = solrCore.getUpdateHandler().getUpdateLog();
+          SolrQueryRequest req = new LocalSolrQueryRequest(core,
+              new ModifiableSolrParams());
+          CommitUpdateCommand cuc = new CommitUpdateCommand(req, false);
+          cuc.setVersion(Long.parseLong(commitVersion));
+          updateLog.copyOverOldUpdates(cuc);
+        }
+      });
+      replicationProcess.init(replicationConfig);
+      replicationProcess.inform(core);
+    }
+  }
+
+  public static String getCommitVersion(SolrCore solrCore) {
+    IndexCommit commit = solrCore.getDeletionPolicy().getLatestCommit();
+    try {
+      String commitVersion = commit.getUserData().get(SolrIndexWriter.COMMIT_COMMAND_VERSION);
+      if (commitVersion == null) return null;
+      else return commitVersion;
+    } catch (Exception e) {
+      LOG.warn("Cannot get commit command version from index commit point ",e);
+      return null;
+    }
+  }
+
+  private static String toPollIntervalStr(int ms) {
+    int sec = ms/1000;
+    int hour = sec / 3600;
+    sec = sec % 3600;
+    int min = sec / 60;
+    sec = sec % 60;
+    return hour + ":" + min + ":" + sec;
+  }
+
+  public void stopReplication() {
+    replicationProcess.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 333acd4..d7ae437 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -86,6 +86,7 @@ import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.logging.MDCLoggingContext;
+import org.apache.solr.update.CommitUpdateCommand;
 import org.apache.solr.update.UpdateLog;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -189,6 +190,7 @@ public class ZkController {
 
   private LeaderElector overseerElector;
 
+  private Map<String, ReplicateFromLeader> replicateFromLeaders = new ConcurrentHashMap<>();
 
   // for now, this can be null in tests, in which case recovery will be inactive, and other features
   // may accept defaults or use mocks rather than pulling things from a CoreContainer
@@ -877,7 +879,7 @@ public class ZkController {
           coreName, baseUrl, cloudDesc.getCollectionName(), shardId);
       
       ZkNodeProps leaderProps = new ZkNodeProps(props);
-      
+
       try {
         // If we're a preferred leader, insert ourselves at the head of the queue
         boolean joinAtHead = false;
@@ -913,9 +915,16 @@ public class ZkController {
         // leader election perhaps?
         
         UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
-        
+        boolean onlyLeaderIndexes = zkStateReader.getClusterState().getCollection(collection).getRealtimeReplicas() == 1;
+        boolean isReplicaInOnlyLeaderIndexes = onlyLeaderIndexes && !isLeader;
+        if (isReplicaInOnlyLeaderIndexes) {
+          String commitVersion = ReplicateFromLeader.getCommitVersion(core);
+          if (commitVersion != null) {
+            ulog.copyOverOldUpdates(Long.parseLong(commitVersion));
+          }
+        }
         // we will call register again after zk expiration and on reload
-        if (!afterExpiration && !core.isReloaded() && ulog != null) {
+        if (!afterExpiration && !core.isReloaded() && ulog != null && !isReplicaInOnlyLeaderIndexes) {
           // disable recovery in case shard is in construction state (for shard splits)
           Slice slice = getClusterState().getSlice(collection, shardId);
           if (slice.getState() != Slice.State.CONSTRUCTION || !isLeader) {
@@ -934,6 +943,9 @@ public class ZkController {
         boolean didRecovery
             = checkRecovery(recoverReloadedCores, isLeader, skipRecovery, collection, coreZkNodeName, core, cc, afterExpiration);
         if (!didRecovery) {
+          if (isReplicaInOnlyLeaderIndexes) {
+            startReplicationFromLeader(coreName);
+          }
           publish(desc, Replica.State.ACTIVE);
         }
         
@@ -948,6 +960,20 @@ public class ZkController {
     }
   }
 
+  public void startReplicationFromLeader(String coreName) throws InterruptedException {
+    ReplicateFromLeader replicateFromLeader = new ReplicateFromLeader(cc, coreName);
+    if (replicateFromLeaders.putIfAbsent(coreName, replicateFromLeader) == null) {
+      replicateFromLeader.startReplication();
+    }
+  }
+
+  public void stopReplicationFromLeader(String coreName) {
+    ReplicateFromLeader replicateFromLeader = replicateFromLeaders.remove(coreName);
+    if (replicateFromLeader != null) {
+      replicateFromLeader.stopReplication();
+    }
+  }
+
   // timeoutms is the timeout for the first call to get the leader - there is then
   // a longer wait to make sure that leader matches our local state
   private String getLeader(final CloudDescriptor cloudDesc, int timeoutms) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/java/org/apache/solr/core/CoreContainer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index b9597ae..0de671e 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -1137,6 +1137,13 @@ public class CoreContainer {
         log.info("Reloading SolrCore '{}' using configuration from {}", cd.getName(), coreConfig.getName());
         SolrCore newCore = core.reload(coreConfig);
         registerCore(cd.getName(), newCore, false, false);
+        if (getZkController() != null) {
+          boolean onlyLeaderIndexes = getZkController().getClusterState().getCollection(cd.getCollectionName()).getRealtimeReplicas() == 1;
+          if (onlyLeaderIndexes && !cd.getCloudDescriptor().isLeader()) {
+            getZkController().stopReplicationFromLeader(core.getName());
+            getZkController().startReplicationFromLeader(newCore.getName());
+          }
+        }
       } catch (SolrCoreState.CoreIsClosedException e) {
         throw e;
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index 8634aee..3e1a4aa 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -68,8 +68,11 @@ import org.apache.solr.client.solrj.impl.HttpClientUtil;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder;
 import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.ExecutorUtil;
@@ -115,7 +118,7 @@ public class IndexFetcher {
 
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private final String masterUrl;
+  private String masterUrl;
 
   final ReplicationHandler replicationHandler;
 
@@ -150,6 +153,8 @@ public class IndexFetcher {
 
   private boolean useExternalCompression = false;
 
+  private boolean fetchFromLeader = false;
+
   private final HttpClient myHttpClient;
 
   private Integer connTimeout;
@@ -167,11 +172,15 @@ public class IndexFetcher {
 
   public IndexFetcher(final NamedList initArgs, final ReplicationHandler handler, final SolrCore sc) {
     solrCore = sc;
+    Object fetchFromLeader = initArgs.get(FETCH_FROM_LEADER);
+    if (fetchFromLeader != null && fetchFromLeader instanceof Boolean) {
+      this.fetchFromLeader = (boolean) fetchFromLeader;
+    }
     String masterUrl = (String) initArgs.get(MASTER_URL);
-    if (masterUrl == null)
+    if (masterUrl == null && !this.fetchFromLeader)
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
               "'masterUrl' is required for a slave");
-    if (masterUrl.endsWith(ReplicationHandler.PATH)) {
+    if (masterUrl != null && masterUrl.endsWith(ReplicationHandler.PATH)) {
       masterUrl = masterUrl.substring(0, masterUrl.length()-12);
       LOG.warn("'masterUrl' must be specified without the "+ReplicationHandler.PATH+" suffix");
     }
@@ -298,6 +307,15 @@ public class IndexFetcher {
     }
 
     try {
+      if (fetchFromLeader) {
+        Replica replica = getLeaderReplica();
+        CloudDescriptor cd = solrCore.getCoreDescriptor().getCloudDescriptor();
+        if (cd.getCoreNodeName().equals(replica.getName())) {
+          return false;
+        }
+        masterUrl = replica.getCoreUrl();
+        LOG.info("Updated masterUrl to " + masterUrl);
+      }
       //get the current 'replicateable' index version in the master
       NamedList response;
       try {
@@ -565,6 +583,14 @@ public class IndexFetcher {
     }
   }
 
+  private Replica getLeaderReplica() throws InterruptedException {
+    ZkController zkController = solrCore.getCoreDescriptor().getCoreContainer().getZkController();
+    CloudDescriptor cd = solrCore.getCoreDescriptor().getCloudDescriptor();
+    Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
+        cd.getCollectionName(), cd.getShardId());
+    return leaderReplica;
+  }
+
   private void cleanup(final SolrCore core, Directory tmpIndexDir,
       Directory indexDir, boolean deleteTmpIdxDir, File tmpTlogDir, boolean successfulInstall) throws IOException {
     try {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
index cdbadc4..e40b2c3 100644
--- a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
@@ -209,6 +209,11 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
   private Long pollIntervalNs;
   private String pollIntervalStr;
 
+  private PollListener pollListener;
+  public interface PollListener {
+    void onComplete(SolrCore solrCore, boolean pollSuccess) throws IOException;
+  }
+
   /**
    * Disable the timer task for polling
    */
@@ -218,6 +223,10 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
     return pollIntervalStr;
   }
 
+  public void setPollListener(PollListener pollListener) {
+    this.pollListener = pollListener;
+  }
+
   @Override
   public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
     rsp.setHttpCaching(false);
@@ -1142,7 +1151,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
       try {
         LOG.debug("Polling for index modifications");
         markScheduledExecutionStart();
-        doFetch(null, false);
+        boolean pollSuccess = doFetch(null, false);
+        if (pollListener != null) pollListener.onComplete(core, pollSuccess);
       } catch (Exception e) {
         LOG.error("Exception in fetching index", e);
       }
@@ -1328,6 +1338,20 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
     });
   }
 
+  public void close() {
+    if (executorService != null) executorService.shutdown();
+    if (pollingIndexFetcher != null) {
+      pollingIndexFetcher.destroy();
+    }
+    if (currentIndexFetcher != null && currentIndexFetcher != pollingIndexFetcher) {
+      currentIndexFetcher.destroy();
+    }
+    ExecutorUtil.shutdownAndAwaitTermination(restoreExecutor);
+    if (restoreFuture != null) {
+      restoreFuture.cancel(false);
+    }
+  }
+
   /**
    * Register a listener for postcommit/optimize
    *
@@ -1680,6 +1704,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
 
   public static final String MASTER_URL = "masterUrl";
 
+  public static final String FETCH_FROM_LEADER = "fetchFromLeader";
+
   public static final String STATUS = "status";
 
   public static final String COMMAND = "command";

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index d7759ca..2e17af6 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -115,6 +115,7 @@ import static org.apache.solr.common.cloud.DocCollection.STATE_FORMAT;
 import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS;
 import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
+import static org.apache.solr.common.cloud.ZkStateReader.REALTIME_REPLICAS;
 import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
@@ -404,7 +405,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
           STATE_FORMAT,
           AUTO_ADD_REPLICAS,
           RULE,
-          SNITCH);
+          SNITCH,
+          REALTIME_REPLICAS);
 
       if (props.get(STATE_FORMAT) == null) {
         props.put(STATE_FORMAT, "2");

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/java/org/apache/solr/update/CommitTracker.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/CommitTracker.java b/solr/core/src/java/org/apache/solr/update/CommitTracker.java
index 61f0c35..9c09ebe 100644
--- a/solr/core/src/java/org/apache/solr/update/CommitTracker.java
+++ b/solr/core/src/java/org/apache/solr/update/CommitTracker.java
@@ -207,6 +207,11 @@ public final class CommitTracker implements Runnable {
       command.openSearcher = openSearcher;
       command.waitSearcher = waitSearcher;
       command.softCommit = softCommit;
+      if (core.getCoreDescriptor().getCloudDescriptor() != null
+          && core.getCoreDescriptor().getCloudDescriptor().isLeader()
+          && !softCommit) {
+        command.version = core.getUpdateHandler().getUpdateLog().getVersionInfo().getNewClock();
+      }
       // no need for command.maxOptimizeSegments = 1; since it is not optimizing
 
       // we increment this *before* calling commit because it was causing a race

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
index 4592bcf..abb5512 100644
--- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
+++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
@@ -45,7 +45,9 @@ import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.lucene.search.Query;
 import org.apache.lucene.search.TermQuery;
 import org.apache.lucene.util.BytesRefHash;
+import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.SimpleOrderedMap;
@@ -123,6 +125,14 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
     commitWithinSoftCommit = updateHandlerInfo.commitWithinSoftCommit;
     indexWriterCloseWaitsForMerges = updateHandlerInfo.indexWriterCloseWaitsForMerges;
 
+    ZkController zkController = core.getCoreDescriptor().getCoreContainer().getZkController();
+    if (zkController != null) {
+      DocCollection dc = zkController.getClusterState().getCollection(core.getCoreDescriptor().getCollectionName());
+      if (dc.getRealtimeReplicas() == 1) {
+        commitWithinSoftCommit = false;
+        commitTracker.setOpenSearcher(true);
+      }
+    }
 
   }
   
@@ -233,6 +243,11 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
       cmd.overwrite = false;
     }
     try {
+      if ( (cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0) {
+        if (ulog != null) ulog.add(cmd);
+        return 1;
+      }
+
       if (cmd.overwrite) {
         // Check for delete by query commands newer (i.e. reordered). This
         // should always be null on a leader
@@ -404,6 +419,11 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
     deleteByIdCommands.increment();
     deleteByIdCommandsCumulative.mark();
 
+    if ( (cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0 ) {
+      if (ulog != null) ulog.delete(cmd);
+      return;
+    }
+
     Term deleteTerm = new Term(idField.getName(), cmd.getIndexedId());
     // SolrCore.verbose("deleteDocuments",deleteTerm,writer);
     RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
@@ -463,6 +483,11 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
     deleteByQueryCommandsCumulative.mark();
     boolean madeIt=false;
     try {
+      if ( (cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0) {
+        if (ulog != null) ulog.deleteByQuery(cmd);
+        madeIt = true;
+        return;
+      }
       Query q = getQuery(cmd);
       
       boolean delAll = MatchAllDocsQuery.class == q.getClass();
@@ -563,7 +588,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
       log.info("start "+cmd);
       RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
       try {
-        SolrIndexWriter.setCommitData(iw.get());
+        SolrIndexWriter.setCommitData(iw.get(), cmd.getVersion());
         iw.get().prepareCommit();
       } finally {
         iw.decref();
@@ -647,7 +672,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
           // SolrCore.verbose("writer.commit() start writer=",writer);
 
           if (writer.hasUncommittedChanges()) {
-            SolrIndexWriter.setCommitData(writer);
+            SolrIndexWriter.setCommitData(writer, cmd.getVersion());
             writer.commit();
           } else {
             log.info("No uncommitted changes. Skipping IW.commit.");
@@ -838,7 +863,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
           }
 
           // todo: refactor this shared code (or figure out why a real CommitUpdateCommand can't be used)
-          SolrIndexWriter.setCommitData(writer);
+          SolrIndexWriter.setCommitData(writer, cmd.getVersion());
           writer.commit();
 
           synchronized (solrCoreState.getUpdateLock()) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
index 90f6856..c478935 100644
--- a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
+++ b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java
@@ -20,8 +20,10 @@ import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -368,6 +370,10 @@ public class HdfsTransactionLog extends TransactionLog {
     return new HDFSLogReader(startingPos);
   }
 
+  public LogReader getSortedReader(long startingPos) {
+    return new HDFSSortedLogReader(startingPos);
+  }
+
   /** Returns a single threaded reverse reader */
   @Override
   public ReverseReader getReverseReader() throws IOException {
@@ -477,6 +483,50 @@ public class HdfsTransactionLog extends TransactionLog {
 
   }
 
+  public class HDFSSortedLogReader extends HDFSLogReader{
+    private long startingPos;
+    private boolean inOrder = true;
+    private TreeMap<Long, Long> versionToPos;
+    Iterator<Long> iterator;
+
+    public HDFSSortedLogReader(long startingPos) {
+      super(startingPos);
+      this.startingPos = startingPos;
+    }
+
+    @Override
+    public Object next() throws IOException, InterruptedException {
+      if (versionToPos == null) {
+        versionToPos = new TreeMap<>();
+        Object o;
+        long pos = startingPos;
+
+        long lastVersion = Long.MIN_VALUE;
+        while ( (o = super.next()) != null) {
+          List entry = (List) o;
+          long version = (Long) entry.get(UpdateLog.VERSION_IDX);
+          version = Math.abs(version);
+          versionToPos.put(version, pos);
+          pos = currentPos();
+
+          if (version < lastVersion) inOrder = false;
+          lastVersion = version;
+        }
+        fis.seek(startingPos);
+      }
+
+      if (inOrder) {
+        return super.next();
+      } else {
+        if (iterator == null) iterator = versionToPos.values().iterator();
+        if (!iterator.hasNext()) return null;
+        long pos = iterator.next();
+        if (pos != currentPos()) fis.seek(pos);
+        return super.next();
+      }
+    }
+  }
+
   public class HDFSReverseReader extends ReverseReader {
     FSDataFastInputStream fis;
     private LogCodec codec = new LogCodec(resolver) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java b/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
index a147b0f..e9950f2 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
@@ -137,7 +137,8 @@ public class SolrIndexSplitter {
         // we commit explicitly instead of sending a CommitUpdateCommand through the processor chain
         // because the sub-shard cores will just ignore such a commit because the update log is not
         // in active state at this time.
-        SolrIndexWriter.setCommitData(iw);
+        //TODO no commitUpdateCommand
+        SolrIndexWriter.setCommitData(iw, -1);
         iw.commit();
         success = true;
       } finally {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java b/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java
index 3c0c1a5..6a264f8 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java
@@ -61,6 +61,7 @@ public class SolrIndexWriter extends IndexWriter {
   /** Stored into each Lucene commit to record the
    *  System.currentTimeMillis() when commit was called. */
   public static final String COMMIT_TIME_MSEC_KEY = "commitTimeMSec";
+  public static final String COMMIT_COMMAND_VERSION = "commitCommandVer";
 
   private final Object CLOSE_LOCK = new Object();
   
@@ -183,10 +184,11 @@ public class SolrIndexWriter extends IndexWriter {
 
   @SuppressForbidden(reason = "Need currentTimeMillis, commit time should be used only for debugging purposes, " +
       " but currently suspiciously used for replication as well")
-  public static void setCommitData(IndexWriter iw) {
-    log.info("Calling setCommitData with IW:" + iw.toString());
+  public static void setCommitData(IndexWriter iw, long commitCommandVersion) {
+    log.info("Calling setCommitData with IW:" + iw.toString() + " commitCommandVersion:"+commitCommandVersion);
     final Map<String,String> commitData = new HashMap<>();
     commitData.put(COMMIT_TIME_MSEC_KEY, String.valueOf(System.currentTimeMillis()));
+    commitData.put(COMMIT_COMMAND_VERSION, String.valueOf(commitCommandVersion));
     iw.setLiveCommitData(commitData.entrySet());
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/java/org/apache/solr/update/TransactionLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/TransactionLog.java b/solr/core/src/java/org/apache/solr/update/TransactionLog.java
index 5037b45..73328cf 100644
--- a/solr/core/src/java/org/apache/solr/update/TransactionLog.java
+++ b/solr/core/src/java/org/apache/solr/update/TransactionLog.java
@@ -29,9 +29,11 @@ import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.lucene.util.BytesRef;
@@ -632,6 +634,10 @@ public class TransactionLog implements Closeable {
     return new LogReader(startingPos);
   }
 
+  public LogReader getSortedReader(long startingPos) {
+    return new SortedLogReader(startingPos);
+  }
+
   /** Returns a single threaded reverse reader */
   public ReverseReader getReverseReader() throws IOException {
     return new FSReverseReader();
@@ -715,6 +721,50 @@ public class TransactionLog implements Closeable {
 
   }
 
+  public class SortedLogReader extends LogReader {
+    private long startingPos;
+    private boolean inOrder = true;
+    private TreeMap<Long, Long> versionToPos;
+    Iterator<Long> iterator;
+
+    public SortedLogReader(long startingPos) {
+      super(startingPos);
+      this.startingPos = startingPos;
+    }
+
+    @Override
+    public Object next() throws IOException, InterruptedException {
+      if (versionToPos == null) {
+        versionToPos = new TreeMap<>();
+        Object o;
+        long pos = startingPos;
+
+        long lastVersion = Long.MIN_VALUE;
+        while ( (o = super.next()) != null) {
+          List entry = (List) o;
+          long version = (Long) entry.get(UpdateLog.VERSION_IDX);
+          version = Math.abs(version);
+          versionToPos.put(version, pos);
+          pos = currentPos();
+
+          if (version < lastVersion) inOrder = false;
+          lastVersion = version;
+        }
+        fis.seek(startingPos);
+      }
+
+      if (inOrder) {
+        return super.next();
+      } else {
+        if (iterator == null) iterator = versionToPos.values().iterator();
+        if (!iterator.hasNext()) return null;
+        long pos = iterator.next();
+        if (pos != currentPos()) fis.seek(pos);
+        return super.next();
+      }
+    }
+  }
+
   public abstract class ReverseReader {
 
     /** Returns the next object from the log, or null if none available.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateCommand.java b/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
index 9f01571..b124271 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateCommand.java
@@ -34,6 +34,7 @@ public abstract class UpdateCommand implements Cloneable {
   public static int PEER_SYNC    = 0x00000004; // update command is a missing update being provided by a peer.
   public static int IGNORE_AUTOCOMMIT = 0x00000008; // this update should not count toward triggering of autocommits.
   public static int CLEAR_CACHES = 0x00000010; // clear caches associated with the update log.  used when applying reordered DBQ updates when doing an add.
+  public static int IGNORE_INDEXWRITER = 0x00000020;
 
   public UpdateCommand(SolrQueryRequest req) {
     this.req = req;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/java/org/apache/solr/update/UpdateLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index 16eff9c..421d33c 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -27,6 +27,7 @@ import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -618,7 +619,7 @@ public static final int VERSION_IDX = 1;
       }
 
       // only change our caches if we are not buffering
-      if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
+      if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0 && (cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) == 0) {
         // given that we just did a delete-by-query, we don't know what documents were
         // affected and hence we must purge our caches.
         openRealtimeSearcher();
@@ -1095,6 +1096,162 @@ public static final int VERSION_IDX = 1;
     return cs.submit(replayer, recoveryInfo);
   }
 
+  /**
+   * Replay current tlog, so all updates will be written to index.
+   * This is must do task for a active replica become a new leader.
+   * @return future of this task
+   */
+  public Future<RecoveryInfo> recoverFromCurrentLog() {
+    if (tlog == null) {
+      return null;
+    }
+    map.clear();
+    recoveryInfo = new RecoveryInfo();
+    tlog.incref();
+
+    ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<>(recoveryExecutor);
+    LogReplayer replayer = new LogReplayer(Collections.singletonList(tlog), false, true);
+
+    versionInfo.blockUpdates();
+    try {
+      state = State.REPLAYING;
+    } finally {
+      versionInfo.unblockUpdates();
+    }
+
+    return cs.submit(replayer, recoveryInfo);
+  }
+
+  /**
+   * Block updates, append a commit at current tlog,
+   * then copy over buffer updates to new tlog and bring back ulog to active state.
+   * So any updates which hasn't made it to the index is preserved in the current tlog,
+   * this also make RTG work
+   * @param cuc any updates that have version larger than the version of cuc will be copied over
+   */
+  public void copyOverBufferingUpdates(CommitUpdateCommand cuc) {
+    versionInfo.blockUpdates();
+    try {
+      operationFlags &= ~FLAG_GAP;
+      state = State.ACTIVE;
+      copyAndSwitchToNewTlog(cuc);
+    } finally {
+      versionInfo.unblockUpdates();
+    }
+  }
+
+  /**
+   * Block updates, append a commit at current tlog, then copy over updates to a new tlog.
+   * So any updates which hasn't made it to the index is preserved in the current tlog
+   * @param cuc any updates that have version larger than the version of cuc will be copied over
+   */
+  public void copyOverOldUpdates(CommitUpdateCommand cuc) {
+    versionInfo.blockUpdates();
+    try {
+      copyAndSwitchToNewTlog(cuc);
+    } finally {
+      versionInfo.unblockUpdates();
+    }
+  }
+
+  protected void copyAndSwitchToNewTlog(CommitUpdateCommand cuc) {
+    synchronized (this) {
+      if (tlog == null) return;
+      preCommit(cuc);
+      try {
+        copyOverOldUpdates(cuc.getVersion());
+      } finally {
+        postCommit(cuc);
+      }
+    }
+  }
+
+  /**
+   * Copy over updates from prevTlog or last tlog (in tlog folder) to a new tlog
+   * @param commitVersion any updates that have version larger than the commitVersion will be copied over
+   */
+  public void copyOverOldUpdates(long commitVersion) {
+    TransactionLog oldTlog = prevTlog;
+    if (oldTlog == null && !logs.isEmpty()) {
+      oldTlog = logs.getFirst();
+    }
+    if (oldTlog == null || oldTlog.refcount.get() == 0) {
+      return;
+    }
+
+    try {
+      if (oldTlog.endsWithCommit()) {
+        return;
+      }
+    } catch (IOException e) {
+      log.warn("Exception reading log", e);
+      return;
+    }
+
+    SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core,
+        new ModifiableSolrParams());
+    TransactionLog.LogReader logReader = oldTlog.getReader(0);
+    Object o = null;
+    try {
+      while ( (o = logReader.next()) != null ) {
+        try {
+          List entry = (List)o;
+          int operationAndFlags = (Integer) entry.get(0);
+          int oper = operationAndFlags & OPERATION_MASK;
+          long version = (Long) entry.get(1);
+          if (Math.abs(version) > commitVersion) {
+            switch (oper) {
+              case UpdateLog.UPDATE_INPLACE:
+              case UpdateLog.ADD: {
+                SolrInputDocument sdoc = (SolrInputDocument) entry.get(entry.size() - 1);
+                AddUpdateCommand cmd = new AddUpdateCommand(req);
+                cmd.solrDoc = sdoc;
+                cmd.setVersion(version);
+                cmd.setFlags(UpdateCommand.IGNORE_AUTOCOMMIT);
+                add(cmd);
+                break;
+              }
+              case UpdateLog.DELETE: {
+                byte[] idBytes = (byte[]) entry.get(2);
+                DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+                cmd.setIndexedId(new BytesRef(idBytes));
+                cmd.setVersion(version);
+                cmd.setFlags(UpdateCommand.IGNORE_AUTOCOMMIT);
+                delete(cmd);
+                break;
+              }
+
+              case UpdateLog.DELETE_BY_QUERY: {
+                String query = (String) entry.get(2);
+                DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
+                cmd.query = query;
+                cmd.setVersion(version);
+                cmd.setFlags(UpdateCommand.IGNORE_AUTOCOMMIT);
+                deleteByQuery(cmd);
+                break;
+              }
+
+              default:
+                throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
+            }
+          }
+        } catch (ClassCastException e) {
+          log.warn("Unexpected log entry or corrupt log.  Entry=" + o, e);
+        }
+      }
+      // Prev tlog will be closed, so nullify prevMap
+      if (prevTlog == oldTlog) {
+        prevMap = null;
+      }
+    } catch (IOException e) {
+      log.error("Exception reading versions from log",e);
+    } catch (InterruptedException e) {
+      log.warn("Exception reading log", e);
+    } finally {
+      if (logReader != null) logReader.close();
+    }
+  }
+
 
   protected void ensureLog() {
     if (tlog == null) {
@@ -1482,6 +1639,7 @@ public static final int VERSION_IDX = 1;
     boolean activeLog;
     boolean finishing = false;  // state where we lock out other updates and finish those updates that snuck in before we locked
     boolean debug = loglog.isDebugEnabled();
+    boolean inSortedOrder;
 
     public LogReplayer(List<TransactionLog> translogs, boolean activeLog) {
       this.translogs = new LinkedList<>();
@@ -1489,6 +1647,11 @@ public static final int VERSION_IDX = 1;
       this.activeLog = activeLog;
     }
 
+    public LogReplayer(List<TransactionLog> translogs, boolean activeLog, boolean inSortedOrder) {
+      this(translogs, activeLog);
+      this.inSortedOrder = inSortedOrder;
+    }
+
 
 
     private SolrQueryRequest req;
@@ -1554,7 +1717,11 @@ public static final int VERSION_IDX = 1;
       try {
         loglog.warn("Starting log replay " + translog + " active=" + activeLog + " starting pos=" + recoveryInfo.positionOfStart);
         long lastStatusTime = System.nanoTime();
-        tlogReader = translog.getReader(recoveryInfo.positionOfStart);
+        if (inSortedOrder) {
+          tlogReader = translog.getSortedReader(recoveryInfo.positionOfStart);
+        } else {
+          tlogReader = translog.getReader(recoveryInfo.positionOfStart);
+        }
 
         // NOTE: we don't currently handle a core reload during recovery.  This would cause the core
         // to change underneath us.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index ec093cf..08ede72 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -279,6 +279,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
   // this is set to true in the constructor if the next processors in the chain
   // are custom and may modify the SolrInputDocument racing with its serialization for replication
   private final boolean cloneRequiredOnLeader;
+  private final boolean onlyLeaderIndexes;
 
   public DistributedUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
     this(req, rsp, new AtomicUpdateDocumentMerger(req), next);
@@ -324,8 +325,12 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     
     if (cloudDesc != null) {
       collection = cloudDesc.getCollectionName();
+      ClusterState cstate = zkController.getClusterState();
+      DocCollection coll = cstate.getCollection(collection);
+      onlyLeaderIndexes = coll.getRealtimeReplicas() == 1;
     } else {
       collection = null;
+      onlyLeaderIndexes = false;
     }
 
     boolean shouldClone = false;
@@ -1186,6 +1191,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
                 checkDeleteByQueries = true;
               }
             }
+            if (onlyLeaderIndexes && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+              cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
+            }
           }
         }
         
@@ -1692,6 +1700,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
             return;
           }
 
+          if (onlyLeaderIndexes && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+            cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
+          }
+
           doLocalDelete(cmd);
         }
       }
@@ -1845,6 +1857,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
                 return true;
               }
             }
+
+            if (onlyLeaderIndexes && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+              cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
+            }
           }
         }
 
@@ -1876,7 +1892,27 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     }
     
     if (!zkEnabled || req.getParams().getBool(COMMIT_END_POINT, false) || singleLeader) {
-      doLocalCommit(cmd);
+      if (onlyLeaderIndexes) {
+        try {
+          Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
+              collection, cloudDesc.getShardId());
+          isLeader = leaderReplica.getName().equals(
+              req.getCore().getCoreDescriptor().getCloudDescriptor()
+                  .getCoreNodeName());
+          if (isLeader) {
+            long commitVersion = vinfo.getNewClock();
+            cmd.setVersion(commitVersion);
+            doLocalCommit(cmd);
+          } else {
+            assert TestInjection.waitForInSyncWithLeader(req.getCore(),
+                zkController, collection, cloudDesc.getShardId());
+          }
+        } catch (InterruptedException e) {
+          throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + cloudDesc.getShardId(), e);
+        }
+      } else {
+        doLocalCommit(cmd);
+      }
     } else {
       ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
       if (!req.getParams().getBool(COMMIT_END_POINT, false)) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/java/org/apache/solr/util/TestInjection.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/util/TestInjection.java b/solr/core/src/java/org/apache/solr/util/TestInjection.java
index 5e4dc75..a1e77e5 100644
--- a/solr/core/src/java/org/apache/solr/util/TestInjection.java
+++ b/solr/core/src/java/org/apache/solr/util/TestInjection.java
@@ -28,14 +28,27 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.NonExistentCoreException;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.Pair;
 import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.ReplicationHandler;
+import org.apache.solr.update.SolrIndexWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.solr.handler.ReplicationHandler.CMD_DETAILS;
+import static org.apache.solr.handler.ReplicationHandler.COMMAND;
+
 
 /**
  * Allows random faults to be injected in running code during test runs.
@@ -118,6 +131,8 @@ public class TestInjection {
   public static int randomDelayMaxInCoreCreationInSec = 10;
 
   public static String splitFailureBeforeReplicaCreation = null;
+
+  public static String waitForReplicasInSync = "true:60";
   
   private static Set<Timer> timers = Collections.synchronizedSet(new HashSet<Timer>());
 
@@ -343,6 +358,44 @@ public class TestInjection {
 
     return true;
   }
+
+  public static boolean waitForInSyncWithLeader(SolrCore core, ZkController zkController, String collection, String shardId) throws InterruptedException {
+    if (waitForReplicasInSync == null) return true;
+
+    Pair<Boolean,Integer> pair = parseValue(waitForReplicasInSync);
+    boolean enabled = pair.first();
+    if (!enabled) return true;
+
+    Thread.sleep(1000);
+    try {
+      for (int i = 0; i < pair.second(); i++) {
+        if (core.isClosed()) return true;
+        Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
+            collection, shardId);
+        try (HttpSolrClient leaderClient = new HttpSolrClient.Builder(leaderReplica.getCoreUrl()).build()) {
+          ModifiableSolrParams params = new ModifiableSolrParams();
+          params.set(CommonParams.QT, ReplicationHandler.PATH);
+          params.set(COMMAND, CMD_DETAILS);
+
+          NamedList<Object> response = leaderClient.request(new QueryRequest(params));
+          long leaderVersion = (long) ((NamedList)response.get("details")).get("indexVersion");
+
+          String localVersion = core.getDeletionPolicy().getLatestCommit().getUserData().get(SolrIndexWriter.COMMIT_TIME_MSEC_KEY);
+          if (localVersion == null && leaderVersion == 0) return true;
+          if (localVersion != null && Long.parseLong(localVersion) == leaderVersion) {
+            return true;
+          } else {
+            Thread.sleep(500);
+          }
+        }
+      }
+
+    } catch (Exception e) {
+      log.error("Exception when wait for replicas in sync with master");
+    }
+
+    return false;
+  }
   
   private static Pair<Boolean,Integer> parseValue(String raw) {
     Matcher m = ENABLED_PERCENT.matcher(raw);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/schema.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/schema.xml b/solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/schema.xml
new file mode 100644
index 0000000..31802f9
--- /dev/null
+++ b/solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/schema.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<schema name="minimal" version="1.1">
+
+  <field name="inplace_updatable_int"   type="int"   indexed="false" stored="false" docValues="true" />
+  <dynamicField name="*" type="string" indexed="true" stored="true"/>
+
+  <!-- for versioning -->
+  <field name="_version_" type="long" indexed="false" stored="false"  docValues="true" />
+  <field name="id" type="string" indexed="true" stored="true" docValues="true"/>
+  <uniqueKey>id</uniqueKey>
+
+  <fieldType name="string" class="solr.StrField"/>
+  <fieldType name="int" class="solr.TrieIntField" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
+  <fieldType name="long" class="solr.TrieLongField" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
+</schema>

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/solrconfig.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/solrconfig.xml b/solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/solrconfig.xml
new file mode 100644
index 0000000..8da7d28
--- /dev/null
+++ b/solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/solrconfig.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- Minimal solrconfig.xml with /select, /admin and /update only -->
+
+<config>
+
+  <dataDir>${solr.data.dir:}</dataDir>
+
+  <directoryFactory name="DirectoryFactory"
+                    class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
+  <schemaFactory class="ClassicIndexSchemaFactory"/>
+
+  <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+  <updateHandler class="solr.DirectUpdateHandler2">
+    <commitWithin>
+      <softCommit>${solr.commitwithin.softcommit:true}</softCommit>
+    </commitWithin>
+    <updateLog class="${solr.ulog:solr.UpdateLog}"></updateLog>
+  </updateHandler>
+
+  <requestHandler name="/select" class="solr.SearchHandler">
+    <lst name="defaults">
+      <str name="echoParams">explicit</str>
+      <str name="indent">true</str>
+      <str name="df">text</str>
+    </lst>
+
+  </requestHandler>
+</config>
+

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java
index 582c8b4..5eb4b3b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java
+++ b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java
@@ -54,11 +54,17 @@ public class BasicDistributedZk2Test extends AbstractFullDistribZkTestBase {
   private static final String SHARD2 = "shard2";
   private static final String SHARD1 = "shard1";
   private static final String ONE_NODE_COLLECTION = "onenodecollection";
+  private final boolean onlyLeaderIndexes = random().nextBoolean();
 
   public BasicDistributedZk2Test() {
     super();
     sliceCount = 2;
   }
+
+  @Override
+  protected int getRealtimeReplicas() {
+    return onlyLeaderIndexes? 1 : -1;
+  }
   
   @Test
   @ShardsFixed(num = 4)

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
index 25c483b..d1dbe9c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
@@ -87,6 +87,8 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private static final String DEFAULT_COLLECTION = "collection1";
+
+  private final boolean onlyLeaderIndexes = random().nextBoolean();
   String t1="a_t";
   String i1="a_i1";
   String tlong = "other_tl1";
@@ -114,7 +116,12 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
     pending = new HashSet<>();
     
   }
-  
+
+  @Override
+  protected int getRealtimeReplicas() {
+    return onlyLeaderIndexes? 1 : -1;
+  }
+
   @Override
   protected void setDistributedParams(ModifiableSolrParams params) {
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
index 4e6122e..628884c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
@@ -55,6 +55,8 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
   
   private static final Integer RUN_LENGTH = Integer.parseInt(System.getProperty("solr.tests.cloud.cm.runlength", "-1"));
 
+  private final boolean onlyLeaderIndexes = random().nextBoolean();
+
   @BeforeClass
   public static void beforeSuperClass() {
     schemaString = "schema15.xml";      // we need a string id
@@ -109,6 +111,11 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
     clientSoTimeout = 5000;
   }
 
+  @Override
+  protected int getRealtimeReplicas() {
+    return onlyLeaderIndexes? 1 : -1;
+  }
+
   @Test
   public void test() throws Exception {
     cloudClient.setSoTimeout(clientSoTimeout);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
index e9e8907..8904ea8 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
@@ -55,6 +55,12 @@ import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
 
 public class ForceLeaderTest extends HttpPartitionTest {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final boolean onlyLeaderIndexes = random().nextBoolean();
+
+  @Override
+  protected int getRealtimeReplicas() {
+    return onlyLeaderIndexes? 1 : -1;
+  }
 
   @Test
   @Override

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
index 5ae4c17..01002cf 100644
--- a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
@@ -76,12 +76,19 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
   // give plenty of time for replicas to recover when running in slow Jenkins test envs
   protected static final int maxWaitSecsToSeeAllActive = 90;
 
+  private final boolean onlyLeaderIndexes = random().nextBoolean();
+
   public HttpPartitionTest() {
     super();
     sliceCount = 2;
     fixShardCount(3);
   }
 
+  @Override
+  protected int getRealtimeReplicas() {
+    return onlyLeaderIndexes? 1 : -1;
+  }
+
   /**
    * We need to turn off directUpdatesToLeadersOnly due to SOLR-9512
    */

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java
index fd122ad..457b9d9 100644
--- a/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java
@@ -37,6 +37,8 @@ public class LeaderInitiatedRecoveryOnCommitTest extends BasicDistributedZkTest
 
   private static final long sleepMsBeforeHealPartition = 2000L;
 
+  private final boolean onlyLeaderIndexes = random().nextBoolean();
+
   public LeaderInitiatedRecoveryOnCommitTest() {
     super();
     sliceCount = 1;
@@ -44,6 +46,11 @@ public class LeaderInitiatedRecoveryOnCommitTest extends BasicDistributedZkTest
   }
 
   @Override
+  protected int getRealtimeReplicas() {
+    return onlyLeaderIndexes? 1 : -1;
+  }
+
+  @Override
   @Test
   public void test() throws Exception {
     oneShardTest();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/63443783/solr/core/src/test/org/apache/solr/cloud/OnlyLeaderIndexesTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/OnlyLeaderIndexesTest.java b/solr/core/src/test/org/apache/solr/cloud/OnlyLeaderIndexesTest.java
new file mode 100644
index 0000000..40b066e
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/OnlyLeaderIndexesTest.java
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+
+import org.apache.lucene.index.IndexWriter;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.update.DirectUpdateHandler2;
+import org.apache.solr.update.SolrIndexWriter;
+import org.apache.solr.update.UpdateHandler;
+import org.apache.solr.update.UpdateLog;
+import org.apache.solr.util.RefCounted;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class OnlyLeaderIndexesTest extends SolrCloudTestCase {
+  private static final String COLLECTION = "collection1";
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
+    System.setProperty("solr.ulog.numRecordsToKeep", "1000");
+
+    configureCluster(3)
+        .addConfig("config", TEST_PATH().resolve("configsets")
+        .resolve("cloud-minimal-inplace-updates").resolve("conf"))
+        .configure();
+
+    CollectionAdminRequest
+        .createCollection(COLLECTION, "config", 1, 3)
+        .setRealtimeReplicas(1)
+        .setMaxShardsPerNode(1)
+        .process(cluster.getSolrClient());
+    AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
+        false, true, 30);
+  }
+
+  @Test
+  public void test() throws Exception {
+    basicTest();
+    recoveryTest();
+    dbiTest();
+    basicLeaderElectionTest();
+    outOfOrderDBQWithInPlaceUpdatesTest();
+  }
+
+  public void basicTest() throws Exception {
+    CloudSolrClient cloudClient = cluster.getSolrClient();
+    new UpdateRequest()
+        .add(sdoc("id", "1"))
+        .add(sdoc("id", "2"))
+        .add(sdoc("id", "3"))
+        .add(sdoc("id", "4"))
+        .process(cloudClient, COLLECTION);
+
+    {
+      UpdateHandler updateHandler = getSolrCore(true).get(0).getUpdateHandler();
+      RefCounted<IndexWriter> iwRef = updateHandler.getSolrCoreState().getIndexWriter(null);
+      assertTrue("IndexWriter at leader must see updates ", iwRef.get().hasUncommittedChanges());
+      iwRef.decref();
+    }
+
+    for (SolrCore solrCore : getSolrCore(false)) {
+      RefCounted<IndexWriter> iwRef = solrCore.getUpdateHandler().getSolrCoreState().getIndexWriter(null);
+      assertFalse("IndexWriter at replicas must not see updates ", iwRef.get().hasUncommittedChanges());
+      iwRef.decref();
+    }
+
+    checkRTG(1, 4, cluster.getJettySolrRunners());
+
+    new UpdateRequest()
+        .deleteById("1")
+        .deleteByQuery("id:2")
+        .process(cloudClient, COLLECTION);
+
+    // The DBQ is not processed at replicas, so we still can get doc2 and other docs by RTG
+    checkRTG(2,4, getSolrRunner(false));
+
+    new UpdateRequest()
+        .commit(cloudClient, COLLECTION);
+
+    checkShardConsistency(2, 1);
+
+    // Update log roll over
+    for (SolrCore solrCore : getSolrCore(false)) {
+      UpdateLog updateLog = solrCore.getUpdateHandler().getUpdateLog();
+      assertFalse(updateLog.hasUncommittedChanges());
+    }
+
+    // UpdateLog copy over old updates
+    for (int i = 15; i <= 150; i++) {
+      cloudClient.add(COLLECTION, sdoc("id",String.valueOf(i)));
+      if (random().nextInt(100) < 15 & i != 150) {
+        cloudClient.commit(COLLECTION);
+      }
+    }
+    checkRTG(120,150, cluster.getJettySolrRunners());
+    waitForReplicasCatchUp(20);
+  }
+
+  public void recoveryTest() throws Exception {
+    CloudSolrClient cloudClient = cluster.getSolrClient();
+    new UpdateRequest()
+        .deleteByQuery("*:*")
+        .commit(cluster.getSolrClient(), COLLECTION);
+    new UpdateRequest()
+        .add(sdoc("id", "3"))
+        .add(sdoc("id", "4"))
+        .commit(cloudClient, COLLECTION);
+    // Replica recovery
+    new UpdateRequest()
+        .add(sdoc("id", "5"))
+        .process(cloudClient, COLLECTION);
+    JettySolrRunner solrRunner = getSolrRunner(false).get(0);
+    ChaosMonkey.stop(solrRunner);
+    new UpdateRequest()
+        .add(sdoc("id", "6"))
+        .process(cloudClient, COLLECTION);
+    ChaosMonkey.start(solrRunner);
+    AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
+        false, true, 30);
+    // We skip peerSync, so replica will always trigger commit on leader
+    checkShardConsistency(4, 20);
+
+    // More Replica recovery testing
+    new UpdateRequest()
+        .add(sdoc("id", "7"))
+        .process(cloudClient, COLLECTION);
+    checkRTG(3,7, cluster.getJettySolrRunners());
+    DirectUpdateHandler2.commitOnClose = false;
+    ChaosMonkey.stop(solrRunner);
+    DirectUpdateHandler2.commitOnClose = true;
+    ChaosMonkey.start(solrRunner);
+    AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
+        false, true, 30);
+    checkRTG(3,7, cluster.getJettySolrRunners());
+    checkShardConsistency(5, 20);
+
+    // Test replica recovery apply buffer updates
+    Semaphore waitingForBufferUpdates = new Semaphore(0);
+    Semaphore waitingForReplay = new Semaphore(0);
+    RecoveryStrategy.testing_beforeReplayBufferingUpdates = () -> {
+      try {
+        waitingForReplay.release();
+        waitingForBufferUpdates.acquire();
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    };
+    ChaosMonkey.stop(solrRunner);
+    ChaosMonkey.start(solrRunner);
+    waitingForReplay.acquire();
+    new UpdateRequest()
+        .add(sdoc("id", "8"))
+        .add(sdoc("id", "9"))
+        .process(cloudClient, COLLECTION);
+    waitingForBufferUpdates.release();
+    RecoveryStrategy.testing_beforeReplayBufferingUpdates = null;
+    AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
+        false, true, 30);
+    checkRTG(3,9, cluster.getJettySolrRunners());
+    checkShardConsistency(5, 20);
+    for (SolrCore solrCore : getSolrCore(false)) {
+      RefCounted<IndexWriter> iwRef = solrCore.getUpdateHandler().getSolrCoreState().getIndexWriter(null);
+      assertFalse("IndexWriter at replicas must not see updates ", iwRef.get().hasUncommittedChanges());
+      iwRef.decref();
+    }
+  }
+
+  public void dbiTest() throws Exception{
+    CloudSolrClient cloudClient = cluster.getSolrClient();
+    new UpdateRequest()
+        .deleteByQuery("*:*")
+        .commit(cluster.getSolrClient(), COLLECTION);
+    new UpdateRequest()
+        .add(sdoc("id", "1"))
+        .commit(cloudClient, COLLECTION);
+    checkShardConsistency(1, 1);
+    new UpdateRequest()
+        .deleteById("1")
+        .process(cloudClient, COLLECTION);
+    try {
+      checkRTG(1, 1, cluster.getJettySolrRunners());
+      fail("Doc1 is deleted but it's still exist");
+    } catch (AssertionError e) {}
+  }
+
+  public void basicLeaderElectionTest() throws Exception {
+    CloudSolrClient cloudClient = cluster.getSolrClient();
+    new UpdateRequest()
+        .deleteByQuery("*:*")
+        .commit(cluster.getSolrClient(), COLLECTION);
+    new UpdateRequest()
+        .add(sdoc("id", "1"))
+        .add(sdoc("id", "2"))
+        .process(cloudClient, COLLECTION);
+    JettySolrRunner oldLeaderJetty = getSolrRunner(true).get(0);
+    ChaosMonkey.kill(oldLeaderJetty);
+    AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
+        false, true, 60);
+    new UpdateRequest()
+        .add(sdoc("id", "3"))
+        .add(sdoc("id", "4"))
+        .process(cloudClient, COLLECTION);
+    ChaosMonkey.start(oldLeaderJetty);
+    AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
+        false, true, 60);
+    checkRTG(1,4, cluster.getJettySolrRunners());
+    new UpdateRequest()
+        .commit(cloudClient, COLLECTION);
+    checkShardConsistency(4,1);
+  }
+
+  public void outOfOrderDBQWithInPlaceUpdatesTest() throws Exception {
+    CloudSolrClient cloudClient = cluster.getSolrClient();
+    new UpdateRequest()
+        .deleteByQuery("*:*")
+        .commit(cluster.getSolrClient(), COLLECTION);
+    List<UpdateRequest> updates = new ArrayList<>();
+    updates.add(simulatedUpdateRequest(null, "id", 1, "title_s", "title0_new", "inplace_updatable_int", 5, "_version_", Long.MAX_VALUE-100)); // full update
+    updates.add(simulatedDBQ("inplace_updatable_int:5", Long.MAX_VALUE-98));
+    updates.add(simulatedUpdateRequest(Long.MAX_VALUE-100, "id", 1, "inplace_updatable_int", 6, "_version_", Long.MAX_VALUE-99));
+    for (JettySolrRunner solrRunner: getSolrRunner(false)) {
+      try (SolrClient client = solrRunner.newClient()) {
+        for (UpdateRequest up : updates) {
+          up.process(client, COLLECTION);
+        }
+      }
+    }
+    JettySolrRunner oldLeaderJetty = getSolrRunner(true).get(0);
+    ChaosMonkey.kill(oldLeaderJetty);
+    AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
+        false, true, 30);
+    ChaosMonkey.start(oldLeaderJetty);
+    AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
+        false, true, 30);
+    checkRTG(1,1, cluster.getJettySolrRunners());
+    SolrDocument doc = cluster.getSolrClient().getById(COLLECTION,"1");
+    assertNotNull(doc.get("title_s"));
+  }
+
+  private UpdateRequest simulatedUpdateRequest(Long prevVersion, Object... fields) throws SolrServerException, IOException {
+    SolrInputDocument doc = sdoc(fields);
+
+    // get baseUrl of the leader
+    String baseUrl = getBaseUrl();
+
+    UpdateRequest ur = new UpdateRequest();
+    ur.add(doc);
+    ur.setParam("update.distrib", "FROMLEADER");
+    if (prevVersion != null) {
+      ur.setParam("distrib.inplace.prevversion", String.valueOf(prevVersion));
+      ur.setParam("distrib.inplace.update", "true");
+    }
+    ur.setParam("distrib.from", baseUrl);
+    return ur;
+  }
+
+  private UpdateRequest simulatedDBQ(String query, long version) throws SolrServerException, IOException {
+    String baseUrl = getBaseUrl();
+
+    UpdateRequest ur = new UpdateRequest();
+    ur.deleteByQuery(query);
+    ur.setParam("_version_", ""+version);
+    ur.setParam("update.distrib", "FROMLEADER");
+    ur.setParam("distrib.from", baseUrl);
+    return ur;
+  }
+
+  private String getBaseUrl() {
+    DocCollection collection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION);
+    Slice slice = collection.getSlice("shard1");
+    return slice.getLeader().getCoreUrl();
+  }
+
+  private void checkRTG(int from, int to, List<JettySolrRunner> solrRunners) throws Exception{
+
+    for (JettySolrRunner solrRunner: solrRunners) {
+      try (SolrClient client = solrRunner.newClient()) {
+        for (int i = from; i <= to; i++) {
+          SolrQuery query = new SolrQuery("*:*");
+          query.set("distrib", false);
+          query.setRequestHandler("/get");
+          query.set("id",i);
+          QueryResponse res = client.query(COLLECTION, query);
+          assertNotNull("Can not find doc "+ i + " in " + solrRunner.getBaseUrl(),res.getResponse().get("doc"));
+        }
+      }
+    }
+
+  }
+
+  private void checkShardConsistency(int expected, int numTry) throws Exception{
+
+    for (int i = 0; i < numTry; i++) {
+      boolean inSync = true;
+      for (JettySolrRunner solrRunner: cluster.getJettySolrRunners()) {
+        try (SolrClient client = solrRunner.newClient()) {
+          SolrQuery query = new SolrQuery("*:*");
+          query.set("distrib", false);
+          long results = client.query(COLLECTION, query).getResults().getNumFound();
+          if (expected != results) {
+            inSync = false;
+            Thread.sleep(500);
+            break;
+          }
+        }
+      }
+      if (inSync) return;
+    }
+
+    fail("Some replicas are not in sync with leader");
+  }
+
+  private void waitForReplicasCatchUp(int numTry) throws IOException, InterruptedException {
+    String leaderTimeCommit = getSolrCore(true).get(0).getDeletionPolicy().getLatestCommit().getUserData().get(SolrIndexWriter.COMMIT_TIME_MSEC_KEY);
+    if (leaderTimeCommit == null) return;
+    for (int i = 0; i < numTry; i++) {
+      boolean inSync = true;
+      for (SolrCore solrCore : getSolrCore(false)) {
+        String replicateTimeCommit = solrCore.getDeletionPolicy().getLatestCommit().getUserData().get(SolrIndexWriter.COMMIT_TIME_MSEC_KEY);
+        if (!leaderTimeCommit.equals(replicateTimeCommit)) {
+          inSync = false;
+          Thread.sleep(500);
+          break;
+        }
+      }
+      if (inSync) return;
+    }
+
+    fail("Some replicas are not in sync with leader");
+
+  }
+
+  private List<SolrCore> getSolrCore(boolean isLeader) {
+    List<SolrCore> rs = new ArrayList<>();
+
+    CloudSolrClient cloudClient = cluster.getSolrClient();
+    DocCollection docCollection = cloudClient.getZkStateReader().getClusterState().getCollection(COLLECTION);
+
+    for (JettySolrRunner solrRunner : cluster.getJettySolrRunners()) {
+      if (solrRunner.getCoreContainer() == null) continue;
+      for (SolrCore solrCore : solrRunner.getCoreContainer().getCores()) {
+        CloudDescriptor cloudDescriptor = solrCore.getCoreDescriptor().getCloudDescriptor();
+        Slice slice = docCollection.getSlice(cloudDescriptor.getShardId());
+        Replica replica = docCollection.getReplica(cloudDescriptor.getCoreNodeName());
+        if (slice.getLeader() == replica && isLeader) {
+          rs.add(solrCore);
+        } else if (slice.getLeader() != replica && !isLeader) {
+          rs.add(solrCore);
+        }
+      }
+    }
+    return rs;
+  }
+
+  private List<JettySolrRunner> getSolrRunner(boolean isLeader) {
+    List<JettySolrRunner> rs = new ArrayList<>();
+
+    CloudSolrClient cloudClient = cluster.getSolrClient();
+    DocCollection docCollection = cloudClient.getZkStateReader().getClusterState().getCollection(COLLECTION);
+
+    for (JettySolrRunner solrRunner : cluster.getJettySolrRunners()) {
+      if (solrRunner.getCoreContainer() == null) continue;
+      for (SolrCore solrCore : solrRunner.getCoreContainer().getCores()) {
+        CloudDescriptor cloudDescriptor = solrCore.getCoreDescriptor().getCloudDescriptor();
+        Slice slice = docCollection.getSlice(cloudDescriptor.getShardId());
+        Replica replica = docCollection.getReplica(cloudDescriptor.getCoreNodeName());
+        if (slice.getLeader() == replica && isLeader) {
+          rs.add(solrRunner);
+        } else if (slice.getLeader() != replica && !isLeader) {
+          rs.add(solrRunner);
+        }
+      }
+    }
+    return rs;
+  }
+
+}
\ No newline at end of file


[6/6] lucene-solr:jira/solr-10233: Make waitForInSyncWithLeader more stable in different test enviroments

Posted by tf...@apache.org.
Make waitForInSyncWithLeader more stable in different test enviroments


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/429411ca
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/429411ca
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/429411ca

Branch: refs/heads/jira/solr-10233
Commit: 429411ca5d3d2e24a2956ee7525e2ca3fcd28d99
Parents: 043d5cb
Author: Cao Manh Dat <da...@apache.org>
Authored: Wed Mar 8 16:03:11 2017 +0700
Committer: Tomas Fernandez Lobbe <tf...@apache.org>
Committed: Wed Mar 8 13:57:55 2017 -0800

----------------------------------------------------------------------
 .../core/src/java/org/apache/solr/util/TestInjection.java | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/429411ca/solr/core/src/java/org/apache/solr/util/TestInjection.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/util/TestInjection.java b/solr/core/src/java/org/apache/solr/util/TestInjection.java
index a1e77e5..bc80683 100644
--- a/solr/core/src/java/org/apache/solr/util/TestInjection.java
+++ b/solr/core/src/java/org/apache/solr/util/TestInjection.java
@@ -18,8 +18,10 @@ package org.apache.solr.util;
 
 import java.lang.invoke.MethodHandles;
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Random;
 import java.util.Set;
 import java.util.Timer;
@@ -41,6 +43,7 @@ import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.Pair;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.RealTimeGetHandler;
 import org.apache.solr.handler.ReplicationHandler;
 import org.apache.solr.update.SolrIndexWriter;
 import org.slf4j.Logger;
@@ -365,8 +368,7 @@ public class TestInjection {
     Pair<Boolean,Integer> pair = parseValue(waitForReplicasInSync);
     boolean enabled = pair.first();
     if (!enabled) return true;
-
-    Thread.sleep(1000);
+    long t = System.currentTimeMillis() - 100;
     try {
       for (int i = 0; i < pair.second(); i++) {
         if (core.isClosed()) return true;
@@ -381,8 +383,8 @@ public class TestInjection {
           long leaderVersion = (long) ((NamedList)response.get("details")).get("indexVersion");
 
           String localVersion = core.getDeletionPolicy().getLatestCommit().getUserData().get(SolrIndexWriter.COMMIT_TIME_MSEC_KEY);
-          if (localVersion == null && leaderVersion == 0) return true;
-          if (localVersion != null && Long.parseLong(localVersion) == leaderVersion) {
+          if (localVersion == null && leaderVersion == 0 && !core.getUpdateHandler().getUpdateLog().hasUncommittedChanges()) return true;
+          if (localVersion != null && Long.parseLong(localVersion) == leaderVersion && leaderVersion >= t) {
             return true;
           } else {
             Thread.sleep(500);


[5/6] lucene-solr:jira/solr-10233: First commit for SOLR-10233

Posted by tf...@apache.org.
First commit for SOLR-10233

Many nocommits and TODOs. Not all tests passing.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/043d5cb2
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/043d5cb2
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/043d5cb2

Branch: refs/heads/jira/solr-10233
Commit: 043d5cb23ce59104abd6bb682eeabf63be51225f
Parents: cbcf892
Author: Tomas Fernandez Lobbe <tf...@apache.org>
Authored: Wed Mar 8 13:56:18 2017 -0800
Committer: Tomas Fernandez Lobbe <tf...@apache.org>
Committed: Wed Mar 8 13:56:18 2017 -0800

----------------------------------------------------------------------
 .../org/apache/solr/cloud/AddReplicaCmd.java    |   9 +-
 .../src/java/org/apache/solr/cloud/Assign.java  |   4 +-
 .../org/apache/solr/cloud/CloudDescriptor.java  |  17 +-
 .../apache/solr/cloud/CreateCollectionCmd.java  |  35 +-
 .../org/apache/solr/cloud/ElectionContext.java  |   5 +-
 .../cloud/OverseerCollectionMessageHandler.java |  82 +++-
 .../org/apache/solr/cloud/RecoveryStrategy.java |   2 +-
 .../apache/solr/cloud/ReplicateFromLeader.java  |  35 +-
 .../cloud/ReplicateOnlyRecoveryStrategy.java    | 363 ++++++++++++++
 .../org/apache/solr/cloud/SplitShardCmd.java    |   2 +-
 .../org/apache/solr/cloud/ZkController.java     |  48 +-
 .../solr/cloud/overseer/ReplicaMutator.java     |   7 +-
 .../solr/cloud/overseer/SliceMutator.java       |  28 +-
 .../solr/cloud/overseer/ZkStateWriter.java      |   1 +
 .../apache/solr/cloud/rule/ReplicaAssigner.java |   8 +-
 .../org/apache/solr/core/CoreContainer.java     |  41 +-
 .../org/apache/solr/handler/IndexFetcher.java   |  17 +-
 .../solr/handler/admin/CollectionsHandler.java  |  15 +-
 .../solr/handler/admin/CoreAdminHandler.java    |   3 +
 .../handler/component/RealTimeGetComponent.java |   5 +
 .../solr/update/DefaultSolrCoreState.java       |  24 +-
 .../solr/update/DirectUpdateHandler2.java       |   3 +-
 .../org/apache/solr/update/UpdateHandler.java   |  12 +-
 .../processor/DistributedUpdateProcessor.java   |  11 +-
 .../cloud-minimal/conf/solrconfig.xml           |   2 +-
 .../apache/solr/cloud/TestPassiveReplica.java   | 484 +++++++++++++++++++
 .../solr/client/solrj/impl/CloudSolrClient.java |   3 +-
 .../solrj/request/CollectionAdminRequest.java   |  84 +++-
 .../apache/solr/common/cloud/DocCollection.java |  10 +-
 .../org/apache/solr/common/cloud/Replica.java   |  32 ++
 .../org/apache/solr/common/cloud/Slice.java     |  21 +-
 .../apache/solr/common/cloud/ZkStateReader.java |  12 +-
 .../solr/common/params/CoreAdminParams.java     |   5 +
 .../java/org/apache/solr/SolrTestCaseJ4.java    |   2 +-
 .../java/org/apache/solr/cloud/ChaosMonkey.java |   4 +-
 35 files changed, 1276 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
index 6bb3350..4420a92 100644
--- a/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
@@ -72,6 +72,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
     String node = message.getStr(CoreAdminParams.NODE);
     String shard = message.getStr(SHARD_ID_PROP);
     String coreName = message.getStr(CoreAdminParams.NAME);
+    Replica.Type replicaType = Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.REALTIME.name()));
     boolean parallel = message.getBool("parallel", false);
     if (StringUtils.isBlank(coreName)) {
       coreName = message.getStr(CoreAdminParams.PROPERTY_PREFIX + CoreAdminParams.NAME);
@@ -93,7 +94,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
     // Kind of unnecessary, but it does put the logic of whether to override maxShardsPerNode in one place.
     if (!skipCreateReplicaInClusterState) {
       node = getNodesForNewReplicas(clusterState, collection, shard, 1, node,
-          ocmh.overseer.getZkController().getCoreContainer()).get(0).nodeName;
+          ocmh.overseer.getZkController().getCoreContainer()).get(0).nodeName;// TODO: use replica type in this logic too
     }
     log.info("Node Identified {} for creating new replica", node);
 
@@ -101,7 +102,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Node: " + node + " is not live");
     }
     if (coreName == null) {
-      coreName = Assign.buildCoreName(coll, shard);
+      coreName = Assign.buildCoreName(coll, shard, replicaType);
     } else if (!skipCreateReplicaInClusterState) {
       //Validate that the core name is unique in that collection
       for (Slice slice : coll.getSlices()) {
@@ -126,7 +127,8 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
             ZkStateReader.CORE_NAME_PROP, coreName,
             ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
             ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(node),
-            ZkStateReader.NODE_NAME_PROP, node);
+            ZkStateReader.NODE_NAME_PROP, node,
+            ZkStateReader.REPLICA_TYPE, replicaType.name());
         Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
       }
       params.set(CoreAdminParams.CORE_NODE_NAME,
@@ -142,6 +144,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
     params.set(CoreAdminParams.NAME, coreName);
     params.set(COLL_CONF, configName);
     params.set(CoreAdminParams.COLLECTION, collection);
+    params.set(CoreAdminParams.REPLICA_TYPE, replicaType.name());
     if (shard != null) {
       params.set(CoreAdminParams.SHARD, shard);
     } else if (routeKey != null) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/core/src/java/org/apache/solr/cloud/Assign.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Assign.java b/solr/core/src/java/org/apache/solr/cloud/Assign.java
index ba03ccd..c404226 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Assign.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Assign.java
@@ -108,7 +108,7 @@ public class Assign {
     return returnShardId;
   }
 
-  static String buildCoreName(DocCollection collection, String shard) {
+  static String buildCoreName(DocCollection collection, String shard, Replica.Type type) {
     Slice slice = collection.getSlice(shard);
     int replicaNum = slice.getReplicas().size();
     for (; ; ) {
@@ -123,7 +123,7 @@ public class Assign {
       if (exists) replicaNum++;
       else break;
     }
-    return collection.getName() + "_" + shard + "_replica" + replicaNum;
+    return collection.getName() + "_" + shard + "_replica_" + type.name().substring(0,1).toLowerCase() + replicaNum;
   }
 
   static class ReplicaCount {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java b/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
index fdc7b02..d34d1ee 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
@@ -43,6 +43,13 @@ public class CloudDescriptor {
   volatile Replica.State lastPublished = Replica.State.ACTIVE;
 
   public static final String NUM_SHARDS = "numShards";
+  
+  public static final String REPLICA_TYPE = "replicaType";
+  
+  /**
+   * The type of replica this core hosts
+   */
+  private final Replica.Type replicaType;
 
   public CloudDescriptor(String coreName, Properties props, CoreDescriptor cd) {
     this.cd = cd;
@@ -56,7 +63,7 @@ public class CloudDescriptor {
     if (Strings.isNullOrEmpty(nodeName))
       this.nodeName = null;
     this.numShards = PropertiesUtil.toInteger(props.getProperty(CloudDescriptor.NUM_SHARDS), null);
-
+    this.replicaType = Replica.Type.valueOf(props.getProperty(CloudDescriptor.REPLICA_TYPE, Replica.Type.REALTIME.name()));
     for (String propName : props.stringPropertyNames()) {
       if (propName.startsWith(ZkController.COLLECTION_PARAM_PREFIX)) {
         collectionParams.put(propName.substring(ZkController.COLLECTION_PARAM_PREFIX.length()), props.getProperty(propName));
@@ -64,6 +71,10 @@ public class CloudDescriptor {
     }
   }
   
+  public boolean requiresTransactionLog() {
+    return this.replicaType != Replica.Type.PASSIVE;
+  }
+  
   public Replica.State getLastPublished() {
     return lastPublished;
   }
@@ -135,4 +146,8 @@ public class CloudDescriptor {
     if(nodeName==null) cd.getPersistableStandardProperties().remove(CoreDescriptor.CORE_NODE_NAME);
     else cd.getPersistableStandardProperties().setProperty(CoreDescriptor.CORE_NODE_NAME, nodeName);
   }
+
+  public Replica.Type getReplicaType() {
+    return replicaType;
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
index a1bb70e..d9564b5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
@@ -62,6 +62,8 @@ import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
 import static org.apache.solr.cloud.OverseerCollectionMessageHandler.RANDOM;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
+import static org.apache.solr.common.cloud.ZkStateReader.PASSIVE_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.REALTIME_REPLICAS;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
 import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
 import static org.apache.solr.common.params.CommonParams.NAME;
@@ -96,7 +98,9 @@ public class CreateCollectionCmd implements Cmd {
       // look at the replication factor and see if it matches reality
       // if it does not, find best nodes to create more cores
 
-      int repFactor = message.getInt(REPLICATION_FACTOR, 1);
+      int numRealtimeReplicas = message.getInt(REALTIME_REPLICAS, message.getInt(REPLICATION_FACTOR, 1));
+      int numPassiveReplicas = message.getInt(PASSIVE_REPLICAS, 0);
+      //TODO: Add active/append replicas
 
       ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
       final String async = message.getStr(ASYNC);
@@ -114,10 +118,10 @@ public class CreateCollectionCmd implements Cmd {
         ClusterStateMutator.getShardNames(numSlices, shardNames);
       }
 
-      int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, 1);
+      int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, 100); //nocommit
 
-      if (repFactor <= 0) {
-        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, REPLICATION_FACTOR + " must be greater than 0");
+      if (numRealtimeReplicas <= 0) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, REALTIME_REPLICAS + " must be greater than 0");
       }
 
       if (numSlices <= 0) {
@@ -135,11 +139,11 @@ public class CreateCollectionCmd implements Cmd {
 
         positionVsNodes = new HashMap<>();
       } else {
-        if (repFactor > nodeList.size()) {
+        if (numRealtimeReplicas > nodeList.size()) {
           log.warn("Specified "
-              + REPLICATION_FACTOR
+              + REALTIME_REPLICAS
               + " of "
-              + repFactor
+              + numRealtimeReplicas
               + " on collection "
               + collectionName
               + " is higher than or equal to the number of Solr instances currently live or live and part of your " + CREATE_NODE_SET + "("
@@ -148,19 +152,20 @@ public class CreateCollectionCmd implements Cmd {
         }
 
         int maxShardsAllowedToCreate = maxShardsPerNode * nodeList.size();
-        int requestedShardsToCreate = numSlices * repFactor;
+        int requestedShardsToCreate = numSlices * (numRealtimeReplicas + numPassiveReplicas);
         if (maxShardsAllowedToCreate < requestedShardsToCreate) {
           throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create collection " + collectionName + ". Value of "
               + MAX_SHARDS_PER_NODE + " is " + maxShardsPerNode
               + ", and the number of nodes currently live or live and part of your "+CREATE_NODE_SET+" is " + nodeList.size()
               + ". This allows a maximum of " + maxShardsAllowedToCreate
               + " to be created. Value of " + NUM_SLICES + " is " + numSlices
-              + " and value of " + REPLICATION_FACTOR + " is " + repFactor
+              + ", value of " + REALTIME_REPLICAS + " is " + numRealtimeReplicas
+              + " and value of " + PASSIVE_REPLICAS + " is " + numPassiveReplicas
               + ". This requires " + requestedShardsToCreate
               + " shards to be created (higher than the allowed number)");
         }
 
-        positionVsNodes = ocmh.identifyNodes(clusterState, nodeList, message, shardNames, repFactor);
+        positionVsNodes = ocmh.identifyNodes(clusterState, nodeList, message, shardNames, numRealtimeReplicas, 0, numPassiveReplicas);
       }
 
       ZkStateReader zkStateReader = ocmh.zkStateReader;
@@ -201,12 +206,13 @@ public class CreateCollectionCmd implements Cmd {
 
 
       log.debug(formatString("Creating SolrCores for new collection {0}, shardNames {1} , replicationFactor : {2}",
-          collectionName, shardNames, repFactor));
+          collectionName, shardNames, numRealtimeReplicas));
       Map<String,ShardRequest> coresToCreate = new LinkedHashMap<>();
       for (Map.Entry<ReplicaAssigner.Position, String> e : positionVsNodes.entrySet()) {
         ReplicaAssigner.Position position = e.getKey();
         String nodeName = e.getValue();
-        String coreName = collectionName + "_" + position.shard + "_replica" + (position.index + 1);
+        // TODO: Adding the suffix is great for debugging, but may be an issue if at some point we want to support a way to change replica type
+        String coreName = collectionName + "_" + position.shard + "_replica" + position.suffix + (position.index + 1);
         log.debug(formatString("Creating core {0} as part of shard {1} of collection {2} on {3}"
             , coreName, position.shard, collectionName, nodeName));
 
@@ -221,7 +227,8 @@ public class CreateCollectionCmd implements Cmd {
               ZkStateReader.SHARD_ID_PROP, position.shard,
               ZkStateReader.CORE_NAME_PROP, coreName,
               ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
-              ZkStateReader.BASE_URL_PROP, baseUrl);
+              ZkStateReader.BASE_URL_PROP, baseUrl, 
+              ZkStateReader.REPLICA_TYPE, position.type.name());
           Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
         }
 
@@ -235,6 +242,8 @@ public class CreateCollectionCmd implements Cmd {
         params.set(CoreAdminParams.SHARD, position.shard);
         params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
         params.set(CoreAdminParams.NEW_COLLECTION, "true");
+        // This is used to tell the CoreAdminHandler that the new core doesn't need a tlog in case of passive replicas
+        params.set(CoreAdminParams.REPLICA_TYPE, position.type.name());
 
         if (async != null) {
           String coreAdminAsyncId = async + Math.abs(System.nanoTime());

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
index 223a539..4aa721d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
@@ -20,6 +20,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -619,7 +620,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
         }
         
         // on startup and after connection timeout, wait for all known shards
-        if (found >= slices.getReplicasMap().size()) {
+        if (found >= slices.getReplicas(EnumSet.of(Replica.Type.APPEND, Replica.Type.REALTIME)).size()) {
           log.info("Enough replicas found to continue.");
           return true;
         } else {
@@ -627,7 +628,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
             log.info("Waiting until we see more replicas up for shard {}: total={}"
               + " found={}"
               + " timeoutin={}ms",
-                shardId, slices.getReplicasMap().size(), found,
+                shardId, slices.getReplicas(EnumSet.of(Replica.Type.APPEND, Replica.Type.REALTIME)).size(), found,
                 TimeUnit.MILLISECONDS.convert(timeoutAt - System.nanoTime(), TimeUnit.NANOSECONDS));
           }
         }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
index 4d64a00..9319e8d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
@@ -16,6 +16,50 @@
  */
 package org.apache.solr.cloud;
 
+import static org.apache.solr.common.cloud.DocCollection.SNITCH;
+import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICAPROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.BACKUP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESHARDUNIQUE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATEALIAS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESNAPSHOT;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEALIAS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETENODE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICAPROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESNAPSHOT;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATESTATEFORMAT;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_COLL_TASK;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_REPLICA_TASK;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_SHARD_TASK;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MODIFYCOLLECTION;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.RELOAD;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.REPLACENODE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.RESTORE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.SPLITSHARD;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.util.Utils.makeMap;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
@@ -33,7 +77,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.collect.ImmutableMap;
 import org.apache.commons.lang.StringUtils;
 import org.apache.solr.client.solrj.SolrResponse;
 import org.apache.solr.client.solrj.SolrServerException;
@@ -79,21 +122,7 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.common.cloud.DocCollection.SNITCH;
-import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-import static org.apache.solr.common.params.CommonParams.NAME;
-import static org.apache.solr.common.util.Utils.makeMap;
+import com.google.common.collect.ImmutableMap;
 
 /**
  * A {@link OverseerMessageHandler} that handles Collections API related
@@ -126,6 +155,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
 
   static final String SKIP_CREATE_REPLICA_IN_CLUSTER_STATE = "skipCreateReplicaInClusterState";
 
+  //nocommit: review
   public static final Map<String, Object> COLL_PROPS = Collections.unmodifiableMap(makeMap(
       ROUTER, DocRouter.DEFAULT_NAME,
       ZkStateReader.REPLICATION_FACTOR, "1",
@@ -699,14 +729,24 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
                                       List<String> nodeList,
                                       ZkNodeProps message,
                                       List<String> shardNames,
-                                      int repFactor) throws IOException {
+                                      int numRealtimeReplicas, 
+                                      int numActiveReplicas,
+                                      int numPassiveReplicas) throws IOException {
     List<Map> rulesMap = (List) message.get("rule");
     if (rulesMap == null) {
       int i = 0;
       Map<Position, String> result = new HashMap<>();
       for (String aShard : shardNames) {
-        for (int j = 0; j < repFactor; j++){
-          result.put(new Position(aShard, j), nodeList.get(i % nodeList.size()));
+        for (int j = 0; j < numRealtimeReplicas; j++){
+          result.put(new Position(aShard, j, Replica.Type.REALTIME), nodeList.get(i % nodeList.size()));
+          i++;
+        }
+        for (int j = 0; j < numActiveReplicas; j++){
+          result.put(new Position(aShard, j, Replica.Type.APPEND), nodeList.get(i % nodeList.size()));
+          i++;
+        }
+        for (int j = 0; j < numPassiveReplicas; j++){
+          result.put(new Position(aShard, j, Replica.Type.PASSIVE), nodeList.get(i % nodeList.size()));
           i++;
         }
       }
@@ -718,7 +758,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
 
     Map<String, Integer> sharVsReplicaCount = new HashMap<>();
 
-    for (String shard : shardNames) sharVsReplicaCount.put(shard, repFactor);
+    for (String shard : shardNames) sharVsReplicaCount.put(shard, numRealtimeReplicas); // TODO: We probably want to separate different replica types here, not just add them
     ReplicaAssigner replicaAssigner = new ReplicaAssigner(rules,
         sharVsReplicaCount,
         (List<Map>) message.get(SNITCH),
@@ -749,6 +789,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
       
       if (result.size() == coreNames.size()) {
         return result;
+      } else {
+        log.debug("Expecting {} cores but found {}", coreNames.size(), result.size());
       }
       if (timeout.hasTimedOut()) {
         throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out waiting to see all replicas: " + coreNames + " in cluster state.");

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 52efff9..cd13e24 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -461,7 +461,7 @@ public class RecoveryStrategy extends Thread implements Closeable {
           LOG.info("Registering as Active after recovery.");
           try {
             if (onlyLeaderIndexes) {
-              zkController.startReplicationFromLeader(coreName);
+              zkController.startReplicationFromLeader(coreName, true);
             }
             zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
           } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
index 48aaced..7188d28 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
@@ -49,7 +49,12 @@ public class ReplicateFromLeader {
     this.coreName = coreName;
   }
 
-  public void startReplication() throws InterruptedException {
+  /**
+   * Start a replication handler thread that will periodically pull indices from the shard leader
+   * @param switchTransactionLog if true, ReplicationHandler will rotate the transaction log once
+   * the replication is done
+   */
+  public void startReplication(boolean switchTransactionLog) throws InterruptedException {
     try (SolrCore core = cc.getCore(coreName)) {
       if (core == null) {
         if (cc.isShutDown()) {
@@ -78,19 +83,21 @@ public class ReplicateFromLeader {
       }
 
       replicationProcess = new ReplicationHandler();
-      replicationProcess.setPollListener((solrCore, pollSuccess) -> {
-        if (pollSuccess) {
-          String commitVersion = getCommitVersion(core);
-          if (commitVersion == null) return;
-          if (Long.parseLong(commitVersion) == lastVersion) return;
-          UpdateLog updateLog = solrCore.getUpdateHandler().getUpdateLog();
-          SolrQueryRequest req = new LocalSolrQueryRequest(core,
-              new ModifiableSolrParams());
-          CommitUpdateCommand cuc = new CommitUpdateCommand(req, false);
-          cuc.setVersion(Long.parseLong(commitVersion));
-          updateLog.copyOverOldUpdates(cuc);
-        }
-      });
+      if (switchTransactionLog) {
+        replicationProcess.setPollListener((solrCore, pollSuccess) -> {
+          if (pollSuccess) {
+            String commitVersion = getCommitVersion(core);
+            if (commitVersion == null) return;
+            if (Long.parseLong(commitVersion) == lastVersion) return;
+            UpdateLog updateLog = solrCore.getUpdateHandler().getUpdateLog();
+            SolrQueryRequest req = new LocalSolrQueryRequest(core,
+                new ModifiableSolrParams());
+            CommitUpdateCommand cuc = new CommitUpdateCommand(req, false);
+            cuc.setVersion(Long.parseLong(commitVersion));
+            updateLog.copyOverOldUpdates(cuc);
+          }
+        });
+      }
       replicationProcess.init(replicationConfig);
       replicationProcess.inform(core);
     }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/core/src/java/org/apache/solr/cloud/ReplicateOnlyRecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplicateOnlyRecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/ReplicateOnlyRecoveryStrategy.java
new file mode 100644
index 0000000..8077d20
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/ReplicateOnlyRecoveryStrategy.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Arrays;
+
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.store.Directory;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.UpdateParams;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.core.DirectoryFactory.DirContext;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.ReplicationHandler;
+import org.apache.solr.logging.MDCLoggingContext;
+import org.apache.solr.request.SolrRequestHandler;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
+import org.apache.solr.util.RefCounted;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplicateOnlyRecoveryStrategy extends Thread implements Closeable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static final int MAX_RETRIES = 500;
+  private static final int STARTING_RECOVERY_DELAY = 5000;
+
+  public static interface RecoveryListener {
+    public void recovered();
+    public void failed();
+  }
+  
+  private volatile boolean close = false;
+
+  private RecoveryStrategy.RecoveryListener recoveryListener;
+  private ZkController zkController;
+  private String baseUrl;
+  private String coreZkNodeName;
+  private ZkStateReader zkStateReader;
+  private volatile String coreName;
+  private int retries;
+  private boolean recoveringAfterStartup;
+  private CoreContainer cc;
+
+  // this should only be used from SolrCoreState
+  public ReplicateOnlyRecoveryStrategy(CoreContainer cc, CoreDescriptor cd, RecoveryStrategy.RecoveryListener recoveryListener) {
+    this.cc = cc;
+    this.coreName = cd.getName();
+    this.recoveryListener = recoveryListener;
+    setName("ReplicateOnlyRecoveryThread-"+this.coreName);
+    zkController = cc.getZkController();
+    zkStateReader = zkController.getZkStateReader();
+    baseUrl = zkController.getBaseUrl();
+    coreZkNodeName = cd.getCloudDescriptor().getCoreNodeName();
+  }
+
+  public void setRecoveringAfterStartup(boolean recoveringAfterStartup) {
+    this.recoveringAfterStartup = recoveringAfterStartup;
+  }
+
+  // make sure any threads stop retrying
+  @Override
+  public void close() {
+    close = true;
+    LOG.warn("Stopping recovery for core=[{}] coreNodeName=[{}]", coreName, coreZkNodeName);
+  }
+
+  private void recoveryFailed(final SolrCore core,
+      final ZkController zkController, final String baseUrl,
+      final String shardZkNodeName, final CoreDescriptor cd) throws KeeperException, InterruptedException {
+    SolrException.log(LOG, "Recovery failed - I give up.");
+    try {
+      zkController.publish(cd, Replica.State.RECOVERY_FAILED);
+    } finally {
+      close();
+      recoveryListener.failed();
+    }
+  }
+  
+  private void replicate(String nodeName, SolrCore core, ZkNodeProps leaderprops)
+      throws SolrServerException, IOException {
+
+    ZkCoreNodeProps leaderCNodeProps = new ZkCoreNodeProps(leaderprops);
+    String leaderUrl = leaderCNodeProps.getCoreUrl();
+    
+    LOG.info("Attempting to replicate from [{}].", leaderUrl);
+    
+    // send commit
+    commitOnLeader(leaderUrl);
+    
+    // use rep handler directly, so we can do this sync rather than async
+    SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
+    ReplicationHandler replicationHandler = (ReplicationHandler) handler;
+    
+    if (replicationHandler == null) {
+      throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
+          "Skipping recovery, no " + ReplicationHandler.PATH + " handler found");
+    }
+    
+    ModifiableSolrParams solrParams = new ModifiableSolrParams();
+    solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl);
+    
+    if (isClosed()) return; // we check closed on return
+    boolean success = replicationHandler.doFetch(solrParams, false);
+    
+    if (!success) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Replication for recovery failed.");
+    }
+    
+    // solrcloud_debug
+    if (LOG.isDebugEnabled()) {
+      try {
+        RefCounted<SolrIndexSearcher> searchHolder = core
+            .getNewestSearcher(false);
+        SolrIndexSearcher searcher = searchHolder.get();
+        Directory dir = core.getDirectoryFactory().get(core.getIndexDir(), DirContext.META_DATA, null);
+        try {
+          LOG.debug(core.getCoreDescriptor().getCoreContainer()
+              .getZkController().getNodeName()
+              + " replicated "
+              + searcher.search(new MatchAllDocsQuery(), 1).totalHits
+              + " from "
+              + leaderUrl
+              + " gen:"
+              + (core.getDeletionPolicy().getLatestCommit() != null ? "null" : core.getDeletionPolicy().getLatestCommit().getGeneration())
+              + " data:" + core.getDataDir()
+              + " index:" + core.getIndexDir()
+              + " newIndex:" + core.getNewIndexDir()
+              + " files:" + Arrays.asList(dir.listAll()));
+        } finally {
+          core.getDirectoryFactory().release(dir);
+          searchHolder.decref();
+        }
+      } catch (Exception e) {
+        LOG.debug("Error in solrcloud_debug block", e);
+      }
+    }
+
+  }
+
+  private void commitOnLeader(String leaderUrl) throws SolrServerException,
+      IOException {
+    try (HttpSolrClient client = new HttpSolrClient.Builder(leaderUrl).build()) {
+      client.setConnectionTimeout(30000);
+      UpdateRequest ureq = new UpdateRequest();
+      ureq.setParams(new ModifiableSolrParams());
+      ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
+      ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
+      ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(client);
+    }
+  }
+
+  @Override
+  public void run() {
+
+    // set request info for logging
+    try (SolrCore core = cc.getCore(coreName)) {
+
+      if (core == null) {
+        SolrException.log(LOG, "SolrCore not found - cannot recover:" + coreName);
+        return;
+      }
+      MDCLoggingContext.setCore(core);
+
+      try {
+        doRecovery(core);
+      } catch (InterruptedException e) {
+        // nocommit: InterruptedException should't really happen now
+        Thread.currentThread().interrupt();
+        SolrException.log(LOG, "", e);
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+      } catch (Exception e) {
+        LOG.error("", e);
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+      }
+    } finally {
+      MDCLoggingContext.clear();
+    }
+  }
+
+  // TODO: perhaps make this grab a new core each time through the loop to handle core reloads?
+  public void doRecovery(SolrCore core) throws KeeperException, InterruptedException {
+    boolean successfulRecovery = false;
+
+//    if (core.getUpdateHandler().getUpdateLog() != null) {
+//      SolrException.log(LOG, "'replicate-only' recovery strategy should only be used if no update logs are present, but this core has one: "
+//          + core.getUpdateHandler().getUpdateLog());
+//      return;
+//    }
+    while (!successfulRecovery && !isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
+      try {
+        CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
+        ZkNodeProps leaderprops = zkStateReader.getLeaderRetry(
+            cloudDesc.getCollectionName(), cloudDesc.getShardId());
+        final String leaderBaseUrl = leaderprops.getStr(ZkStateReader.BASE_URL_PROP);
+        final String leaderCoreName = leaderprops.getStr(ZkStateReader.CORE_NAME_PROP);
+
+        String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName);
+
+        String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
+
+        boolean isLeader = leaderUrl.equals(ourUrl); //TODO: We can probably delete most of this code if we say this strategy can only be used for passive replicas
+        if (isLeader && !cloudDesc.isLeader()) {
+          throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
+        }
+        if (cloudDesc.isLeader()) {
+          // we are now the leader - no one else must have been suitable
+          LOG.warn("We have not yet recovered - but we are now the leader!");
+          LOG.info("Finished recovery process.");
+          zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
+          return;
+        }
+        
+        
+        LOG.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leaderUrl,
+            ourUrl);
+        zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING);
+        
+        if (isClosed()) {
+          LOG.info("Recovery for core {} has been closed", core.getName());
+          break;
+        }
+        
+        if (isClosed()) {
+          LOG.info("Recovery for core {} has been closed", core.getName());
+          break;
+        }
+        LOG.info("Starting Replication Recovery.");
+
+        try {
+          LOG.info("Stopping background replicate from leader process");
+          zkController.stopReplicationFromLeader(coreName);
+          replicate(zkController.getNodeName(), core, leaderprops);
+
+          if (isClosed()) {
+            LOG.info("Recovery for core {} has been closed", core.getName());
+            break;
+          }
+
+          LOG.info("Replication Recovery was successful.");
+          successfulRecovery = true;
+        } catch (Exception e) {
+          SolrException.log(LOG, "Error while trying to recover", e);
+        }
+
+      } catch (Exception e) {
+        SolrException.log(LOG, "Error while trying to recover. core=" + coreName, e);
+      } finally {
+        if (successfulRecovery) {
+          LOG.info("Restaring background replicate from leader process");
+          zkController.startReplicationFromLeader(coreName, false);
+          LOG.info("Registering as Active after recovery.");
+          try {
+            zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
+          } catch (Exception e) {
+            LOG.error("Could not publish as ACTIVE after succesful recovery", e);
+            successfulRecovery = false;
+          }
+          
+          if (successfulRecovery) {
+            close = true;
+            recoveryListener.recovered();
+          }
+        }
+      }
+
+      if (!successfulRecovery) {
+        // lets pause for a moment and we need to try again...
+        // TODO: we don't want to retry for some problems?
+        // Or do a fall off retry...
+        try {
+
+          if (isClosed()) {
+            LOG.info("Recovery for core {} has been closed", core.getName());
+            break;
+          }
+          
+          LOG.error("Recovery failed - trying again... (" + retries + ")");
+          
+          retries++;
+          if (retries >= MAX_RETRIES) {
+            SolrException.log(LOG, "Recovery failed - max retries exceeded (" + retries + ").");
+            try {
+              recoveryFailed(core, zkController, baseUrl, coreZkNodeName, core.getCoreDescriptor());
+            } catch (Exception e) {
+              SolrException.log(LOG, "Could not publish that recovery failed", e);
+            }
+            break;
+          }
+        } catch (Exception e) {
+          SolrException.log(LOG, "An error has occurred during recovery", e);
+        }
+
+        try {
+          // Wait an exponential interval between retries, start at 5 seconds and work up to a minute.
+          // If we're at attempt >= 4, there's no point computing pow(2, retries) because the result 
+          // will always be the minimum of the two (12). Since we sleep at 5 seconds sub-intervals in
+          // order to check if we were closed, 12 is chosen as the maximum loopCount (5s * 12 = 1m).
+          double loopCount = retries < 4 ? Math.min(Math.pow(2, retries), 12) : 12;
+          LOG.info("Wait [{}] seconds before trying to recover again (attempt={})", loopCount, retries);
+          for (int i = 0; i < loopCount; i++) {
+            if (isClosed()) {
+              LOG.info("Recovery for core {} has been closed", core.getName());
+              break; // check if someone closed us
+            }
+            Thread.sleep(STARTING_RECOVERY_DELAY);
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          LOG.warn("Recovery was interrupted.", e);
+          close = true;
+        }
+      }
+
+    }
+
+    // if replay was skipped (possibly to due pulling a full index from the leader),
+    // then we still need to update version bucket seeds after recovery
+    if (successfulRecovery) {
+      LOG.info("Updating version bucket highest from index after successful recovery.");
+      core.seedVersionBuckets();
+    }
+
+    LOG.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery));
+  }
+
+  public boolean isClosed() {
+    return close;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
index 5a099e1..837af79 100644
--- a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
@@ -385,7 +385,7 @@ public class SplitShardCmd implements Cmd {
       Map<ReplicaAssigner.Position, String> nodeMap = ocmh.identifyNodes(clusterState,
           new ArrayList<>(clusterState.getLiveNodes()),
           new ZkNodeProps(collection.getProperties()),
-          subSlices, repFactor - 1);
+          subSlices, repFactor - 1, 0, 0);
 
       List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index d7ae437..ce37de5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -62,6 +62,7 @@ import org.apache.solr.common.cloud.DefaultZkCredentialsProvider;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.OnReconnect;
 import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Replica.Type;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkACLProvider;
@@ -86,7 +87,6 @@ import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.logging.MDCLoggingContext;
-import org.apache.solr.update.CommitUpdateCommand;
 import org.apache.solr.update.UpdateLog;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -883,12 +883,17 @@ public class ZkController {
       try {
         // If we're a preferred leader, insert ourselves at the head of the queue
         boolean joinAtHead = false;
-        Replica replica = zkStateReader.getClusterState().getReplica(desc.getCloudDescriptor().getCollectionName(),
-            coreZkNodeName);
+        Replica replica = zkStateReader.getClusterState().getReplica(collection, coreZkNodeName);
         if (replica != null) {
           joinAtHead = replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false);
         }
-        joinElection(desc, afterExpiration, joinAtHead);
+        //TODO WHy would replica be null?
+        if (replica == null || replica.getType() != Type.PASSIVE) {
+          joinElection(desc, afterExpiration, joinAtHead);
+        } else if (replica.getType() == Type.PASSIVE) {
+          log.debug("Replica {} skipping election because replica is passive", coreZkNodeName);
+          startReplicationFromLeader(coreName, false);
+        }
       } catch (InterruptedException e) {
         // Restore the interrupted status
         Thread.currentThread().interrupt();
@@ -905,6 +910,7 @@ public class ZkController {
       String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
       log.debug("We are " + ourUrl + " and leader is " + leaderUrl);
       boolean isLeader = leaderUrl.equals(ourUrl);
+      assert !(isLeader && zkStateReader.getClusterState().getCollection(collection).getReplica(coreZkNodeName).getType() == Type.PASSIVE): "Passive replica became leader!";
       
       try (SolrCore core = cc.getCore(desc.getName())) {
         
@@ -944,11 +950,12 @@ public class ZkController {
             = checkRecovery(recoverReloadedCores, isLeader, skipRecovery, collection, coreZkNodeName, core, cc, afterExpiration);
         if (!didRecovery) {
           if (isReplicaInOnlyLeaderIndexes) {
-            startReplicationFromLeader(coreName);
+            startReplicationFromLeader(coreName, true);
           }
           publish(desc, Replica.State.ACTIVE);
         }
         
+        
         core.getCoreDescriptor().getCloudDescriptor().setHasRegistered(true);
       }
       
@@ -960,14 +967,18 @@ public class ZkController {
     }
   }
 
-  public void startReplicationFromLeader(String coreName) throws InterruptedException {
+  public void startReplicationFromLeader(String coreName, boolean switchTransactionLog) throws InterruptedException {
+    log.info(coreName + " starting replication from leader");
     ReplicateFromLeader replicateFromLeader = new ReplicateFromLeader(cc, coreName);
     if (replicateFromLeaders.putIfAbsent(coreName, replicateFromLeader) == null) {
-      replicateFromLeader.startReplication();
+      replicateFromLeader.startReplication(switchTransactionLog);
+    } else {
+      log.warn("A replicate from leader instance already exists for core {}", coreName);
     }
   }
 
   public void stopReplicationFromLeader(String coreName) {
+    log.info(coreName + " stopping replication from leader");
     ReplicateFromLeader replicateFromLeader = replicateFromLeaders.remove(coreName);
     if (replicateFromLeader != null) {
       replicateFromLeader.stopReplication();
@@ -1189,6 +1200,7 @@ public class ZkController {
       // If the leader initiated recovery, then verify that this replica has performed
       // recovery as requested before becoming active; don't even look at lirState if going down
       if (state != Replica.State.DOWN) {
+        //nocommit check for readOnly case
         final Replica.State lirState = getLeaderInitiatedRecoveryState(collection, shardId, coreNodeName);
         if (lirState != null) {
           if (state == Replica.State.ACTIVE) {
@@ -1269,12 +1281,16 @@ public class ZkController {
       assert false : "No collection was specified [" + collection + "]";
       return;
     }
+    Replica replica = zkStateReader.getClusterState().getReplica(collection, coreNodeName);
+    
+    if (replica == null || replica.getType() != Type.PASSIVE) {
+      ElectionContext context = electionContexts.remove(new ContextKey(collection, coreNodeName));
 
-    ElectionContext context = electionContexts.remove(new ContextKey(collection, coreNodeName));
-
-    if (context != null) {
-      context.cancelElection();
+      if (context != null) {
+        context.cancelElection();
+      }
     }
+//    //TODO: Do we need to stop replication for type==append?
 
     CloudDescriptor cloudDescriptor = cd.getCloudDescriptor();
     zkStateReader.unregisterCore(cloudDescriptor.getCollectionName());
@@ -1357,6 +1373,7 @@ public class ZkController {
       final String shardId = zkStateReader.getClusterState().getShardId(cd.getCollectionName(), getNodeName(), cd.getName());
       if (shardId != null) {
         cd.getCloudDescriptor().setShardId(shardId);
+        log.debug("Shard ID is {} for core {} ", shardId, cd.getName());
         return;
       }
       try {
@@ -1456,6 +1473,7 @@ public class ZkController {
             return true;
           }
           errorMessage.set("coreNodeName " + coreNodeName + " exists, but does not match expected node or core name");
+          log.debug("coreNodeName " + coreNodeName + " exists, but does not match expected node or core name");
           return false;
         });
       } catch (TimeoutException e) {
@@ -2410,11 +2428,9 @@ public class ZkController {
 
       for (Slice slice : slices) {
         Collection<Replica> replicas = slice.getReplicas();
-        for (Replica replica : replicas) {
-          if (replica.getName().equals(
-              dcore.getCloudDescriptor().getCoreNodeName())) {
-            return true;
-          }
+        Replica r = slice.getReplica(dcore.getCloudDescriptor().getCoreNodeName());
+        if (r != null) {
+          return true;
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
index f03eeeb..c467405 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
@@ -271,11 +271,12 @@ public class ReplicaMutator {
 
     replicaProps.putAll(message.getProperties());
     if (slice != null) {
-      Replica oldReplica = slice.getReplicasMap().get(coreNodeName);
+      Replica oldReplica = slice.getReplica(coreNodeName);
       if (oldReplica != null) {
         if (oldReplica.containsKey(ZkStateReader.LEADER_PROP)) {
           replicaProps.put(ZkStateReader.LEADER_PROP, oldReplica.get(ZkStateReader.LEADER_PROP));
         }
+        replicaProps.put(ZkStateReader.REPLICA_TYPE, oldReplica.getType().toString());
         // Move custom props over.
         for (Map.Entry<String, Object> ent : oldReplica.getProperties().entrySet()) {
           if (ent.getKey().startsWith(COLL_PROP_PREFIX)) {
@@ -311,6 +312,8 @@ public class ReplicaMutator {
 
 
     Replica replica = new Replica(coreNodeName, replicaProps);
+    
+    log.debug("Will update state for replica: " + replica);
 
     Map<String, Object> sliceProps = null;
     Map<String, Replica> replicas;
@@ -328,11 +331,11 @@ public class ReplicaMutator {
       sliceProps.put(ZkStateReader.STATE_PROP, shardState);
       sliceProps.put(Slice.PARENT, shardParent);
     }
-
     replicas.put(replica.getName(), replica);
     slice = new Slice(sliceName, replicas, sliceProps);
 
     DocCollection newCollection = CollectionMutator.updateSlice(collectionName, collection, slice);
+    log.debug("Collection is now: " + newCollection);
     return new ZkWriteCommand(collectionName, newCollection);
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
index ec2ce2e..160bf0c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
@@ -16,14 +16,16 @@
  */
 package org.apache.solr.cloud.overseer;
 
-import java.lang.invoke.MethodHandles;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_PROP_PREFIX;
+import static org.apache.solr.cloud.overseer.CollectionMutator.checkCollectionKeyExistence;
+import static org.apache.solr.common.util.Utils.makeMap;
 
+import java.lang.invoke.MethodHandles;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
 
-import com.google.common.collect.ImmutableSet;
 import org.apache.solr.cloud.Assign;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.common.cloud.ClusterState;
@@ -37,9 +39,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_PROP_PREFIX;
-import static org.apache.solr.cloud.overseer.CollectionMutator.checkCollectionKeyExistence;
-import static org.apache.solr.common.util.Utils.makeMap;
+import com.google.common.collect.ImmutableSet;
 
 public class SliceMutator {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -66,14 +66,16 @@ public class SliceMutator {
       log.error("Invalid Collection/Slice {}/{} ", coll, slice);
       return ZkStateWriter.NO_OP;
     }
-
     String coreNodeName = Assign.assignNode(collection);
-    Replica replica = new Replica(coreNodeName,
+//    Replica replica = new Replica(coreNodeName,
+    // coreNodeName overlaps?
+    Replica replica = new Replica(message.getStr(ZkStateReader.CORE_NAME_PROP),
         makeMap(
             ZkStateReader.CORE_NAME_PROP, message.getStr(ZkStateReader.CORE_NAME_PROP),
             ZkStateReader.BASE_URL_PROP, message.getStr(ZkStateReader.BASE_URL_PROP),
             ZkStateReader.STATE_PROP, message.getStr(ZkStateReader.STATE_PROP),
-            ZkStateReader.NODE_NAME_PROP, message.getStr(ZkStateReader.NODE_NAME_PROP)));
+            ZkStateReader.NODE_NAME_PROP, message.getStr(ZkStateReader.NODE_NAME_PROP), 
+            ZkStateReader.REPLICA_TYPE, message.get(ZkStateReader.REPLICA_TYPE)));
     return new ZkWriteCommand(coll, updateReplica(collection, sl, replica.getName(), replica));
   }
 
@@ -248,13 +250,15 @@ public class SliceMutator {
   }
 
   public static DocCollection updateReplica(DocCollection collection, final Slice slice, String coreNodeName, final Replica replica) {
-    Map<String, Replica> copy = slice.getReplicasCopy();
+    Map<String, Replica> replicasCopy = slice.getReplicasCopy();
     if (replica == null) {
-      copy.remove(coreNodeName);
+      replicasCopy.remove(coreNodeName);
     } else {
-      copy.put(replica.getName(), replica);
+      replicasCopy.put(replica.getName(), replica);
     }
-    Slice newSlice = new Slice(slice.getName(), copy, slice.getProperties());
+    Slice newSlice = new Slice(slice.getName(), replicasCopy, slice.getProperties());
+    log.info("Old Slice: " + slice);
+    log.info("New Slice: " + newSlice);
     return CollectionMutator.updateSlice(collection.getName(), collection, newSlice);
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 23fb56c..a906b86 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -268,6 +268,7 @@ public class ZkStateWriter {
       }
     }
 
+    log.debug("New Cluster State is: " + clusterState);
     return clusterState;
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java b/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java
index 3eab8b4..3c46743 100644
--- a/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java
+++ b/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java
@@ -62,10 +62,14 @@ public class ReplicaAssigner {
   public static class Position implements Comparable<Position> {
     public final String shard;
     public final int index;
+    public final Replica.Type type;
+    public final String suffix;
 
-    public Position(String shard, int replicaIdx) {
+    public Position(String shard, int replicaIdx, Replica.Type type) {
       this.shard = shard;
       this.index = replicaIdx;
+      this.suffix = '_' + type.name().substring(0,1).toLowerCase();
+      this.type = type;
     }
 
     @Override
@@ -188,7 +192,7 @@ public class ReplicaAssigner {
       List<Position> positions = new ArrayList<>();
       for (int pos : p) {
         for (int j = 0; j < shardVsReplicaCount.get(shardNames.get(pos)); j++) {
-          positions.add(new Position(shardNames.get(pos), j));
+          positions.add(new Position(shardNames.get(pos), j, Replica.Type.REALTIME));
         }
       }
       Collections.sort(positions);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/core/src/java/org/apache/solr/core/CoreContainer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 0de671e..19f824a 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -16,6 +16,17 @@
  */
 package org.apache.solr.core;
 
+import static java.util.Objects.requireNonNull;
+import static org.apache.solr.common.params.CommonParams.AUTHC_PATH;
+import static org.apache.solr.common.params.CommonParams.AUTHZ_PATH;
+import static org.apache.solr.common.params.CommonParams.COLLECTIONS_HANDLER_PATH;
+import static org.apache.solr.common.params.CommonParams.CONFIGSETS_HANDLER_PATH;
+import static org.apache.solr.common.params.CommonParams.CORES_HANDLER_PATH;
+import static org.apache.solr.common.params.CommonParams.INFO_HANDLER_PATH;
+import static org.apache.solr.common.params.CommonParams.METRICS_PATH;
+import static org.apache.solr.common.params.CommonParams.ZK_PATH;
+import static org.apache.solr.security.AuthenticationPlugin.AUTHENTICATION_PLUGIN_PROP;
+
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.nio.file.Path;
@@ -36,9 +47,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
-import com.codahale.metrics.Gauge;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
 import org.apache.http.auth.AuthSchemeProvider;
 import org.apache.http.client.CredentialsProvider;
 import org.apache.http.config.Lookup;
@@ -57,6 +65,7 @@ import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Replica.State;
+import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.common.util.Utils;
@@ -95,16 +104,9 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static java.util.Objects.requireNonNull;
-import static org.apache.solr.common.params.CommonParams.AUTHC_PATH;
-import static org.apache.solr.common.params.CommonParams.AUTHZ_PATH;
-import static org.apache.solr.common.params.CommonParams.COLLECTIONS_HANDLER_PATH;
-import static org.apache.solr.common.params.CommonParams.CONFIGSETS_HANDLER_PATH;
-import static org.apache.solr.common.params.CommonParams.CORES_HANDLER_PATH;
-import static org.apache.solr.common.params.CommonParams.INFO_HANDLER_PATH;
-import static org.apache.solr.common.params.CommonParams.METRICS_PATH;
-import static org.apache.solr.common.params.CommonParams.ZK_PATH;
-import static org.apache.solr.security.AuthenticationPlugin.AUTHENTICATION_PLUGIN_PROP;
+import com.codahale.metrics.Gauge;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 
 /**
  *
@@ -1138,10 +1140,13 @@ public class CoreContainer {
         SolrCore newCore = core.reload(coreConfig);
         registerCore(cd.getName(), newCore, false, false);
         if (getZkController() != null) {
-          boolean onlyLeaderIndexes = getZkController().getClusterState().getCollection(cd.getCollectionName()).getRealtimeReplicas() == 1;
-          if (onlyLeaderIndexes && !cd.getCloudDescriptor().isLeader()) {
+          DocCollection docCollection = getZkController().getClusterState().getCollection(cd.getCollectionName());
+          boolean onlyLeaderIndexes = docCollection.getRealtimeReplicas() == 1;
+          Replica replica = docCollection.getReplica(cd.getCloudDescriptor().getCoreNodeName());
+          assert replica != null;
+          if (onlyLeaderIndexes && !cd.getCloudDescriptor().isLeader()) { //TODO: needed here?
             getZkController().stopReplicationFromLeader(core.getName());
-            getZkController().startReplicationFromLeader(newCore.getName());
+            getZkController().startReplicationFromLeader(newCore.getName(), true);
           }
         }
       } catch (SolrCoreState.CoreIsClosedException e) {
@@ -1228,6 +1233,10 @@ public class CoreContainer {
     if (zkSys.getZkController() != null) {
       // cancel recovery in cloud mode
       core.getSolrCoreState().cancelRecovery();
+      if (core.getCoreDescriptor().getCloudDescriptor().getReplicaType() == Replica.Type.PASSIVE) { // TODO: Also for Replica.Type.ACTIVE? 
+        // Stop replication if this is part of a passive replica before closing the code
+        zkSys.getZkController().stopReplicationFromLeader(name);
+      }
     }
     
     core.unloadOnClose(deleteIndexDir, deleteDataDir, deleteInstanceDir);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index a07496f..435ddbe 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -304,17 +304,28 @@ public class IndexFetcher {
       // when we are a bit more confident we may want to try a partial replication
       // if the error is connection related or something, but we have to be careful
       forceReplication = true;
+      LOG.info("Last replication failed, so I'll force replication");
     }
 
     try {
       if (fetchFromLeader) {
+        assert !solrCore.isClosed(): "Replication should be stopped before closing the core";
         Replica replica = getLeaderReplica();
         CloudDescriptor cd = solrCore.getCoreDescriptor().getCloudDescriptor();
         if (cd.getCoreNodeName().equals(replica.getName())) {
           return false;
         }
-        masterUrl = replica.getCoreUrl();
-        LOG.info("Updated masterUrl to " + masterUrl);
+        if (replica.getState() != Replica.State.ACTIVE) {
+          LOG.info("Replica {} is leader but it's state is {}, skipping replication", replica.getName(), replica.getState());
+          return false;
+        }
+        if (!replica.getCoreUrl().equals(masterUrl)) {
+          masterUrl = replica.getCoreUrl();
+          LOG.info("Updated masterUrl to {}", masterUrl);
+          // TODO: Do we need to set forceReplication = true?
+        } else {
+          LOG.debug("masterUrl didn't change");
+        }
       }
       //get the current 'replicateable' index version in the master
       NamedList response;
@@ -355,6 +366,7 @@ public class IndexFetcher {
         if (forceReplication && commit.getGeneration() != 0) {
           // since we won't get the files for an empty index,
           // we just clear ours and commit
+          LOG.info("New index in Master. Deleting mine...");
           RefCounted<IndexWriter> iw = solrCore.getUpdateHandler().getSolrCoreState().getIndexWriter(solrCore);
           try {
             iw.get().deleteAll();
@@ -367,6 +379,7 @@ public class IndexFetcher {
 
         //there is nothing to be replicated
         successfulInstall = true;
+        LOG.debug("Nothing to replicate, master's version is 0");
         return true;
       }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 2e17af6..a08131b 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -112,15 +112,7 @@ import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
 import static org.apache.solr.common.cloud.DocCollection.RULE;
 import static org.apache.solr.common.cloud.DocCollection.SNITCH;
 import static org.apache.solr.common.cloud.DocCollection.STATE_FORMAT;
-import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
-import static org.apache.solr.common.cloud.ZkStateReader.REALTIME_REPLICAS;
-import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.*;
 import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
 import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
@@ -406,6 +398,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
           AUTO_ADD_REPLICAS,
           RULE,
           SNITCH,
+          PASSIVE_REPLICAS,
           REALTIME_REPLICAS);
 
       if (props.get(STATE_FORMAT) == null) {
@@ -620,7 +613,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
           _ROUTE_,
           CoreAdminParams.NAME,
           INSTANCE_DIR,
-          DATA_DIR);
+          DATA_DIR,
+          REPLICA_TYPE);
       return copyPropertiesWithPrefix(req.getParams(), props, COLL_PROP_PREFIX);
     }),
     OVERSEERSTATUS_OP(OVERSEERSTATUS, (req, rsp, h) -> (Map) new LinkedHashMap<>()),
@@ -1074,6 +1068,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
 
   }
 
+  // TODO: Maybe add REALTIME_REPLICAS HERE?
   public static final List<String> MODIFIABLE_COLL_PROPS = Arrays.asList(
       RULE,
       SNITCH,

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
index 275ec18..b5e0e73 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
@@ -171,7 +171,9 @@ public class CoreAdminHandler extends RequestHandlerBase implements PermissionNa
 
       final CallInfo callInfo = new CallInfo(this, req, rsp, op);
       if (taskId == null) {
+        log.info("Starting Operation: " + req);
         callInfo.call();
+        log.info("Done with Operation: " + req);
       } else {
         try {
           MDC.put("CoreAdminHandler.asyncId", taskId);
@@ -227,6 +229,7 @@ public class CoreAdminHandler extends RequestHandlerBase implements PermissionNa
       .put(CoreAdminParams.ROLES, CoreDescriptor.CORE_ROLES)
       .put(CoreAdminParams.CORE_NODE_NAME, CoreDescriptor.CORE_NODE_NAME)
       .put(ZkStateReader.NUM_SHARDS_PROP, CloudDescriptor.NUM_SHARDS)
+      .put(CoreAdminParams.REPLICA_TYPE, CloudDescriptor.REPLICA_TYPE)
       .build();
 
   protected static Map<String, String> buildCoreParams(SolrParams params) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
index 123abea..c60f520 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
@@ -103,6 +103,11 @@ public class RealTimeGetComponent extends SearchComponent
     SolrQueryResponse rsp = rb.rsp;
     SolrParams params = req.getParams();
 
+    if (req.getCore().getCoreDescriptor().getCloudDescriptor() != null 
+        && !req.getCore().getCoreDescriptor().getCloudDescriptor().requiresTransactionLog()) {
+      return;
+    }
+    
     if (!params.getBool(COMPONENT_NAME, true)) {
       return;
     }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
index 59e35f0..82adbdd 100644
--- a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
+++ b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.update;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.concurrent.ExecutionException;
@@ -32,8 +33,9 @@ import org.apache.lucene.index.MergePolicy;
 import org.apache.lucene.search.Sort;
 import org.apache.solr.cloud.ActionThrottle;
 import org.apache.solr.cloud.RecoveryStrategy;
-import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.cloud.ReplicateOnlyRecoveryStrategy;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.DirectoryFactory;
@@ -64,7 +66,8 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
   private SolrIndexWriter indexWriter = null;
   private DirectoryFactory directoryFactory;
 
-  private volatile RecoveryStrategy recoveryStrat;
+//  private volatile RecoveryStrategy recoveryStrat; //nocommit: Make interface
+  private volatile Thread recoveryStrat;
 
   private volatile boolean lastReplicationSuccess = true;
 
@@ -310,9 +313,16 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
               recoveryThrottle.minimumWaitBetweenActions();
               recoveryThrottle.markAttemptingAction();
               
-              recoveryStrat = new RecoveryStrategy(cc, cd, DefaultSolrCoreState.this);
-              recoveryStrat.setRecoveringAfterStartup(recoveringAfterStartup);
-              Future<?> future = cc.getUpdateShardHandler().getRecoveryExecutor().submit(recoveryStrat);
+              Future<?> future = null;
+              if (cd.getCloudDescriptor().requiresTransactionLog()) {
+                RecoveryStrategy recoveryStrat = new RecoveryStrategy(cc, cd, DefaultSolrCoreState.this);
+                recoveryStrat.setRecoveringAfterStartup(recoveringAfterStartup);
+                future = cc.getUpdateShardHandler().getRecoveryExecutor().submit(recoveryStrat);
+              } else {
+                ReplicateOnlyRecoveryStrategy recoveryStrat = new ReplicateOnlyRecoveryStrategy(cc, cd, DefaultSolrCoreState.this);
+                recoveryStrat.setRecoveringAfterStartup(recoveringAfterStartup);
+                future = cc.getUpdateShardHandler().getRecoveryExecutor().submit(recoveryStrat);
+              }
               try {
                 future.get();
               } catch (InterruptedException e) {
@@ -352,9 +362,11 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
   public void cancelRecovery() {
     if (recoveryStrat != null) {
       try {
-        recoveryStrat.close();
+        ((Closeable)recoveryStrat).close();
       } catch (NullPointerException e) {
         // okay
+      } catch (IOException e) {
+        // okay
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
index abb5512..31cb782 100644
--- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
+++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
@@ -137,7 +137,8 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
   }
   
   public DirectUpdateHandler2(SolrCore core, UpdateHandler updateHandler) {
-    super(core, updateHandler.getUpdateLog());
+//  TODO  super(core, updateHandler.getUpdateLog(), core.getCoreDescriptor().getCloudDescriptor().requiresTransactionLog());
+    super(core, updateHandler.getUpdateLog(), true);
     solrCoreState = core.getSolrCoreState();
     
     UpdateHandlerInfo updateHandlerInfo = core.getSolrConfig()

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
index 1cf8a3f..f08bb17 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
@@ -91,10 +91,16 @@ public abstract class UpdateHandler implements SolrInfoMBean {
   }
 
   public UpdateHandler(SolrCore core)  {
-    this(core, null);
+//    this(core, null, core.getCoreDescriptor().getCloudDescriptor().requiresTransactionLog());//nocommit, handle non-cloud case
+    this(core, null, true);//nocommit, handle non-cloud case
   }
-
+  
   public UpdateHandler(SolrCore core, UpdateLog updateLog)  {
+//    this(core, updateLog, core.getCoreDescriptor().getCloudDescriptor().requiresTransactionLog());//nocommit, handle non-cloud case
+    this(core, updateLog, true);//nocommit, handle non-cloud case
+  }
+
+  public UpdateHandler(SolrCore core, UpdateLog updateLog, boolean needsUpdateLog)  {
     this.core=core;
     idField = core.getLatestSchema().getUniqueKeyField();
     idFieldType = idField!=null ? idField.getType() : null;
@@ -102,7 +108,7 @@ public abstract class UpdateHandler implements SolrInfoMBean {
     PluginInfo ulogPluginInfo = core.getSolrConfig().getPluginInfo(UpdateLog.class.getName());
 
 
-    if (updateLog == null && ulogPluginInfo != null && ulogPluginInfo.isEnabled()) {
+    if (updateLog == null && ulogPluginInfo != null && ulogPluginInfo.isEnabled() && needsUpdateLog) {
       String dataDir = (String)ulogPluginInfo.initArgs.get("dir");
 
       String ulogDir = core.getCoreDescriptor().getUlogDir();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index 08ede72..18d0ff9 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -1885,7 +1886,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
       zkCheck();
       
       nodes = getCollectionUrls(req, req.getCore().getCoreDescriptor()
-          .getCloudDescriptor().getCollectionName());
+          .getCloudDescriptor().getCollectionName(), EnumSet.of(Replica.Type.APPEND,Replica.Type.REALTIME));
       if (isLeader && nodes.size() == 1) {
         singleLeader = true;
       }
@@ -1905,7 +1906,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
             doLocalCommit(cmd);
           } else {
             assert TestInjection.waitForInSyncWithLeader(req.getCore(),
-                zkController, collection, cloudDesc.getShardId());
+                zkController, collection, cloudDesc.getShardId()): "Core " + req.getCore() + " not in sync with leader";
           }
         } catch (InterruptedException e) {
           throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + cloudDesc.getShardId(), e);
@@ -1959,7 +1960,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
  
 
   
-  private List<Node> getCollectionUrls(SolrQueryRequest req, String collection) {
+  private List<Node> getCollectionUrls(SolrQueryRequest req, String collection, EnumSet<Replica.Type> types) {
     ClusterState clusterState = req.getCore().getCoreDescriptor()
         .getCoreContainer().getZkController().getClusterState();
     Map<String,Slice> slices = clusterState.getSlicesMap(collection);
@@ -1974,6 +1975,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
       Map<String,Replica> shardMap = replicas.getReplicasMap();
       
       for (Entry<String,Replica> entry : shardMap.entrySet()) {
+        if (!types.contains(entry.getValue().getType())) {
+          log.info("getCollectionUrls: Skipping replica " + entry.getValue().getName());//nocommit: too verbose
+          continue;
+        }
         ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());
         if (clusterState.liveNodesContain(nodeProps.getNodeName())) {
           urls.add(new StdNode(nodeProps, collection, replicas.getName()));

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/core/src/test-files/solr/configsets/cloud-minimal/conf/solrconfig.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/configsets/cloud-minimal/conf/solrconfig.xml b/solr/core/src/test-files/solr/configsets/cloud-minimal/conf/solrconfig.xml
index 059e58f..8da7d28 100644
--- a/solr/core/src/test-files/solr/configsets/cloud-minimal/conf/solrconfig.xml
+++ b/solr/core/src/test-files/solr/configsets/cloud-minimal/conf/solrconfig.xml
@@ -33,7 +33,7 @@
     <commitWithin>
       <softCommit>${solr.commitwithin.softcommit:true}</softCommit>
     </commitWithin>
-    <updateLog></updateLog>
+    <updateLog class="${solr.ulog:solr.UpdateLog}"></updateLog>
   </updateHandler>
 
   <requestHandler name="/select" class="solr.SearchHandler">


[4/6] lucene-solr:jira/solr-10233: First commit for SOLR-10233

Posted by tf...@apache.org.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/core/src/test/org/apache/solr/cloud/TestPassiveReplica.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestPassiveReplica.java b/solr/core/src/test/org/apache/solr/cloud/TestPassiveReplica.java
new file mode 100644
index 0000000..a0a3667
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/TestPassiveReplica.java
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.CollectionStatePredicate;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.util.TestInjection;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.KeeperException;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+
+@Slow
+public class TestPassiveReplica extends SolrCloudTestCase {
+  
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  
+  // TODO: test no ulog in passive replicas
+  // TODO: Make sure that FORCELEADER can't be used with Passive
+  // TODO: Backup/Snapshot should not work on passive replicas 
+  // TODO: ADDSHARD operation
+  
+  private String collectionName = null;
+  private final static int REPLICATION_TIMEOUT_SECS = 10;
+  
+  private String suggestedCollectionName() {
+    return (getTestClass().getSimpleName().replace("Test", "") + "_" + getTestName().split(" ")[0]).replaceAll("(.)(\\p{Upper})", "$1_$2").toLowerCase();
+  }
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    TestInjection.waitForReplicasInSync = null; // We'll be explicit about this in this test
+    configureCluster(2) // 2 + random().nextInt(3) 
+        .addConfig("conf", configset("cloud-minimal"))
+        .configure();
+    CollectionAdminRequest.ClusterProp clusterPropRequest = CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "false");
+    CollectionAdminResponse response = clusterPropRequest.process(cluster.getSolrClient());
+    assertEquals(0, response.getStatus());
+  }
+  
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    collectionName = suggestedCollectionName();
+    expectThrows(SolrException.class, () -> getCollectionState(collectionName));
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    for (JettySolrRunner jetty:cluster.getJettySolrRunners()) {
+      if (!jetty.isRunning()) {
+        LOG.warn("Jetty {} not running, probably some bad test. Starting it", jetty.getLocalPort());
+        ChaosMonkey.start(jetty);
+      }
+    }
+    if (cluster.getSolrClient().getZkStateReader().getClusterState().getCollectionOrNull(collectionName) != null) {
+      LOG.info("tearDown deleting collection");
+      CollectionAdminRequest.deleteCollection(collectionName).process(cluster.getSolrClient());
+      waitForDeletion(collectionName);
+    }
+    super.tearDown();
+  }
+  
+  // Just to compare test time, nocommit
+  @Ignore
+  public void testCreateDelete2() throws Exception {
+    try {
+      CollectionAdminRequest.createCollection(collectionName, "conf", 1, 8, 0, 0).process(cluster.getSolrClient());
+      DocCollection docCollection = getCollectionState(collectionName);
+      assertNotNull(docCollection);
+//      assertEquals("Expecting 4 relpicas per shard",
+//          8, docCollection.getReplicas().size());
+//      assertEquals("Expecting 6 passive replicas, 3 per shard",
+//          6, docCollection.getReplicas(EnumSet.of(Replica.Type.PASSIVE)).size());
+//      assertEquals("Expecting 2 writer replicas, one per shard",
+//          2, docCollection.getReplicas(EnumSet.of(Replica.Type.WRITER)).size());
+//      for (Slice s:docCollection.getSlices()) {
+//        // read-only replicas can never become leaders
+//        assertFalse(s.getLeader().isReadOnly());
+//      }
+    } finally {
+      zkClient().printLayoutToStdOut();
+    }
+  }
+  
+  @Repeat(iterations=2) // 2 times to make sure cleanup is complete and we can create the same collection
+  public void testCreateDelete() throws Exception {
+    try {
+      CollectionAdminRequest.createCollection(collectionName, "conf", 2, 1, 0, 3).process(cluster.getSolrClient());
+      DocCollection docCollection = getCollectionState(collectionName);
+      assertNotNull(docCollection);
+      assertEquals("Expecting 4 relpicas per shard",
+          8, docCollection.getReplicas().size());
+      assertEquals("Expecting 6 passive replicas, 3 per shard",
+          6, docCollection.getReplicas(EnumSet.of(Replica.Type.PASSIVE)).size());
+      assertEquals("Expecting 2 writer replicas, one per shard",
+          2, docCollection.getReplicas(EnumSet.of(Replica.Type.REALTIME)).size());
+      for (Slice s:docCollection.getSlices()) {
+        // read-only replicas can never become leaders
+        assertFalse(s.getLeader().getType() == Replica.Type.PASSIVE);
+      }
+      
+      // TODO assert collection shards elect queue and validate there are no passive replicas
+    } finally {
+      zkClient().printLayoutToStdOut();
+    }
+  }
+  
+  public void testAddDocs() throws Exception {
+    int numReadOnlyReplicas = 1 + random().nextInt(3);
+    CollectionAdminRequest.createCollection(collectionName, "conf", 1, 1, 0, numReadOnlyReplicas).process(cluster.getSolrClient());
+    waitForState("Expected collection to be created with 1 shard and " + (numReadOnlyReplicas + 1) + " replicas", collectionName, clusterShape(1, numReadOnlyReplicas + 1));
+    DocCollection docCollection = assertNumberOfReplicas(1, 0, numReadOnlyReplicas, false, true);
+    assertEquals(1, docCollection.getSlices().size());
+    
+    cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "1", "foo", "bar"));
+    cluster.getSolrClient().commit(collectionName);
+    
+    Slice s = docCollection.getSlices().iterator().next();
+    try (HttpSolrClient leaderClient = getHttpSolrClient(s.getLeader().getCoreUrl())) {
+      leaderClient.commit(); // TODO: this shouldn't be necessary here
+      assertEquals(1, leaderClient.query(new SolrQuery("*:*")).getResults().getNumFound());
+    }
+    
+    TimeOut t = new TimeOut(REPLICATION_TIMEOUT_SECS, TimeUnit.SECONDS);
+    for (Replica r:s.getReplicas(EnumSet.of(Replica.Type.PASSIVE))) {
+      //TODO: assert replication < REPLICATION_TIMEOUT_SECS
+      try (HttpSolrClient readOnlyReplicaClient = getHttpSolrClient(r.getCoreUrl())) {
+        while (true) {
+          try {
+            assertEquals("Replica " + r.getName() + " not up to date after 10 seconds",
+                1, readOnlyReplicaClient.query(new SolrQuery("*:*")).getResults().getNumFound());
+            break;
+          } catch (AssertionError e) {
+            if (t.hasTimedOut()) {
+              throw e;
+            } else {
+              Thread.sleep(100);
+            }
+          }
+        }
+        SolrQuery req = new SolrQuery(
+            "qt", "/admin/plugins",
+            "stats", "true");
+        QueryResponse statsResponse = readOnlyReplicaClient.query(req);
+        assertEquals("Replicas shouldn't process the add document request: " + statsResponse, 
+            0L, ((NamedList<Object>)statsResponse.getResponse()).findRecursive("plugins", "UPDATE", "updateHandler", "stats", "adds"));
+      }
+    }
+  }
+  
+  public void testAddRemovePassiveReplica() throws Exception {
+    CollectionAdminRequest.createCollection(collectionName, "conf", 2, 1, 0, 0)
+      .setMaxShardsPerNode(100)
+      .process(cluster.getSolrClient());
+    cluster.getSolrClient().getZkStateReader().registerCore(collectionName); //TODO: Is this needed? 
+    waitForState("Expected collection to be created with 2 shards and 1 replica each", collectionName, clusterShape(2, 1));
+    DocCollection docCollection = assertNumberOfReplicas(2, 0, 0, false, true);
+    assertEquals(2, docCollection.getSlices().size());
+    
+    CollectionAdminRequest.addReplicaToShard(collectionName, "shard1", Replica.Type.PASSIVE).process(cluster.getSolrClient());
+    docCollection = assertNumberOfReplicas(2, 0, 1, true, false);
+    CollectionAdminRequest.addReplicaToShard(collectionName, "shard2", Replica.Type.PASSIVE).process(cluster.getSolrClient());    
+    docCollection = assertNumberOfReplicas(2, 0, 2, true, false);
+    
+    waitForState("Expecting collection to have 2 shards and 2 replica each", collectionName, clusterShape(2, 2));
+    
+    //Delete passive replica from shard1
+    CollectionAdminRequest.deleteReplica(
+        collectionName, 
+        "shard1", 
+        docCollection.getSlice("shard1").getReplicas(EnumSet.of(Replica.Type.PASSIVE)).get(0).getName())
+    .process(cluster.getSolrClient());
+    assertNumberOfReplicas(2, 0, 1, true, true);
+  }
+  
+  public void testRemoveAllWriterReplicas() throws Exception {
+    doTestNoLeader(true);
+  }
+  
+  public void testKillLeader() throws Exception {
+    doTestNoLeader(false);
+  }
+  
+  public void testPassiveReplicaStates() {
+    // Validate that passive replicas go through the correct states when starting, stopping, reconnecting
+  }
+  
+  public void testPassiveReplicaCantConnectToZooKeeper() {
+    
+  }
+  
+  public void testRealTimeGet() {
+    // should be redirected to writers
+  }
+  
+  /*
+   * validate that replication still happens on a new leader
+   */
+  private void doTestNoLeader(boolean removeReplica) throws Exception {
+    CollectionAdminRequest.createCollection(collectionName, "conf", 1, 1, 0, 1)
+      .setMaxShardsPerNode(100)
+      .process(cluster.getSolrClient());
+    cluster.getSolrClient().getZkStateReader().registerCore(collectionName); //TODO: Is this needed? 
+    waitForState("Expected collection to be created with 1 shard and 2 replicas", collectionName, clusterShape(1, 2));
+    DocCollection docCollection = assertNumberOfReplicas(1, 0, 1, false, true);
+    
+    // Add a document and commit
+    cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "1", "foo", "bar"));
+    cluster.getSolrClient().commit(collectionName);
+    Slice s = docCollection.getSlices().iterator().next();
+    try (HttpSolrClient leaderClient = getHttpSolrClient(s.getLeader().getCoreUrl())) {
+      assertEquals(1, leaderClient.query(new SolrQuery("*:*")).getResults().getNumFound());
+    }
+    
+    waitForNumDocsInAllReplicas(1, docCollection.getReplicas(EnumSet.of(Replica.Type.PASSIVE)));
+    
+    // Delete leader replica from shard1
+    ignoreException("No registered leader was found"); //These are expected
+    JettySolrRunner leaderJetty = null;
+    if (removeReplica) {
+      CollectionAdminRequest.deleteReplica(
+          collectionName, 
+          "shard1", 
+          s.getLeader().getName())
+      .process(cluster.getSolrClient());
+    } else {
+      leaderJetty = cluster.getReplicaJetty(s.getLeader());
+      ChaosMonkey.kill(leaderJetty);
+      waitForState("Leader replica not removed", collectionName, clusterShape(1, 1));
+      // Wait for cluster state to be updated
+      waitForState("Replica state not updated in cluster state", 
+          collectionName, clusterStateReflectsActiveAndDownReplicas());
+    }
+    docCollection = assertNumberOfReplicas(0, 0, 1, true, true);
+    
+    // Check that there is no leader for the shard
+    Replica leader = docCollection.getSlice("shard1").getLeader();
+    assertTrue(leader == null || !leader.isActive(cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes()));
+    
+    // Passive replica on the other hand should be active
+    Replica passiveReplica = docCollection.getSlice("shard1").getReplicas(EnumSet.of(Replica.Type.PASSIVE)).get(0);
+    assertTrue(passiveReplica.isActive(cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes()));
+
+    // add document, this should fail since there is no leader. Passive replica should not accept the update
+    expectThrows(SolrException.class, () -> 
+      cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "zoo"))
+    );
+    
+    // Also fails if I send the update to the passive replica explicitly
+    try (HttpSolrClient passiveReplicaClient = getHttpSolrClient(docCollection.getReplicas(EnumSet.of(Replica.Type.PASSIVE)).get(0).getCoreUrl())) {
+      expectThrows(SolrException.class, () -> 
+        cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "zoo"))
+      );
+    }
+    
+    // Queries should still work
+    waitForNumDocsInAllReplicas(1, docCollection.getReplicas(EnumSet.of(Replica.Type.PASSIVE)));
+    // Add writer back. Since there is no writer now, new writer will have no docs. There will be data loss, since the it will become the leader
+    // and passive replicas will replicate from it. Maybe we want to change this. Replicate from passive replicas is not a good idea, since they
+    // are by definition out of date.
+    if (removeReplica) {
+      CollectionAdminRequest.addReplicaToShard(collectionName, "shard1", Replica.Type.REALTIME).process(cluster.getSolrClient());
+    } else {
+      ChaosMonkey.start(leaderJetty);
+    }
+    waitForState("Expected collection to be 1x2", collectionName, clusterShape(1, 2));
+    unIgnoreException("No registered leader was found"); // Should have a leader from now on
+
+    // Validate that the new writer is the leader now
+    cluster.getSolrClient().getZkStateReader().forceUpdateCollection(collectionName);
+    docCollection = getCollectionState(collectionName);
+    leader = docCollection.getSlice("shard1").getLeader();
+    assertTrue(leader != null && leader.isActive(cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes()));
+
+    //nocommit: If jetty is restarted, the replication is not forced, and replica doesn't replicate from leader until new docs are added. Is this the correct behavior? Why should these two cases be different?
+    if (removeReplica) {
+      // Passive replicas will replicate the empty index if a new replica was added and becomes leader
+      waitForNumDocsInAllReplicas(0, docCollection.getReplicas(EnumSet.of(Replica.Type.PASSIVE)));
+    }
+    
+    // add docs agin
+    cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "zoo"));
+    s = docCollection.getSlices().iterator().next();
+    try (HttpSolrClient leaderClient = getHttpSolrClient(s.getLeader().getCoreUrl())) {
+      leaderClient.commit();
+      assertEquals(1, leaderClient.query(new SolrQuery("*:*")).getResults().getNumFound());
+    }
+    waitForNumDocsInAllReplicas(1, docCollection.getReplicas(EnumSet.of(Replica.Type.PASSIVE)), "id:2");
+    waitForNumDocsInAllReplicas(1, docCollection.getReplicas(EnumSet.of(Replica.Type.PASSIVE)));
+  }
+  
+  public void testKillReadOnlyReplica() throws Exception {
+    CollectionAdminRequest.createCollection(collectionName, "conf", 1, 1, 0, 1)
+      .setMaxShardsPerNode(100)
+      .process(cluster.getSolrClient());
+    cluster.getSolrClient().getZkStateReader().registerCore(collectionName); //TODO: Is this needed? 
+    waitForState("Expected collection to be created with 1 shard and 2 replicas", collectionName, clusterShape(1, 2));
+    DocCollection docCollection = assertNumberOfReplicas(1, 0, 1, false, true);
+    assertEquals(1, docCollection.getSlices().size());
+    
+    waitForNumDocsInAllActiveReplicas(0);
+    cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "1", "foo", "bar"));
+    cluster.getSolrClient().commit(collectionName);
+    waitForNumDocsInAllActiveReplicas(1);
+    
+    JettySolrRunner passiveReplicaJetty = cluster.getReplicaJetty(docCollection.getSlice("shard1").getReplicas(EnumSet.of(Replica.Type.PASSIVE)).get(0));
+    ChaosMonkey.kill(passiveReplicaJetty);
+    waitForState("Replica not removed", collectionName, activeReplicaCount(1, 0, 0));
+    // Also wait for the replica to be placed in state="down"
+    waitForState("Didn't update state", collectionName, clusterStateReflectsActiveAndDownReplicas());
+    
+    cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "bar"));
+    cluster.getSolrClient().commit(collectionName);
+    waitForNumDocsInAllActiveReplicas(2);
+    
+    ChaosMonkey.start(passiveReplicaJetty);
+    waitForState("Replica not added", collectionName, activeReplicaCount(1, 0, 1));
+    waitForNumDocsInAllActiveReplicas(2);
+  }
+  
+  public void testAddDocsToPassive() {
+    
+  }
+  
+  public void testSearchWhileReplicationHappens() {
+      
+  }
+  
+  private void waitForNumDocsInAllActiveReplicas(int numDocs) throws IOException, SolrServerException, InterruptedException {
+    DocCollection docCollection = getCollectionState(collectionName);
+    waitForNumDocsInAllReplicas(numDocs, docCollection.getReplicas().stream().filter(r -> r.getState() == Replica.State.ACTIVE).collect(Collectors.toList()));
+  }
+    
+  private void waitForNumDocsInAllReplicas(int numDocs, Collection<Replica> replicas) throws IOException, SolrServerException, InterruptedException {
+    waitForNumDocsInAllReplicas(numDocs, replicas, "*:*");
+  }
+  
+  private void waitForNumDocsInAllReplicas(int numDocs, Collection<Replica> replicas, String query) throws IOException, SolrServerException, InterruptedException {
+    TimeOut t = new TimeOut(REPLICATION_TIMEOUT_SECS, TimeUnit.SECONDS);
+    for (Replica r:replicas) {
+      try (HttpSolrClient replicaClient = getHttpSolrClient(r.getCoreUrl())) {
+        while (true) {
+          try {
+            assertEquals("Replica " + r.getName() + " not up to date after " + REPLICATION_TIMEOUT_SECS + " seconds",
+                numDocs, replicaClient.query(new SolrQuery(query)).getResults().getNumFound());
+            break;
+          } catch (AssertionError e) {
+            if (t.hasTimedOut()) {
+              throw e;
+            } else {
+              Thread.sleep(100);
+            }
+          }
+        }
+      }
+    }
+  }
+  
+  private void waitForDeletion(String collection) throws InterruptedException, KeeperException {
+    TimeOut t = new TimeOut(10, TimeUnit.SECONDS);
+    while (cluster.getSolrClient().getZkStateReader().getClusterState().hasCollection(collection)) {
+      try {
+        Thread.sleep(100);
+        if (t.hasTimedOut()) {
+          fail("Timed out waiting for collection " + collection + " to be deleted.");
+        }
+        cluster.getSolrClient().getZkStateReader().forceUpdateCollection(collection);
+      } catch(SolrException e) {
+        return;
+      }
+      
+    }
+  }
+  
+  private DocCollection assertNumberOfReplicas(int numWriter, int numActive, int numPassive, boolean updateCollection, boolean activeOnly) throws KeeperException, InterruptedException {
+    if (updateCollection) {
+      cluster.getSolrClient().getZkStateReader().forceUpdateCollection(collectionName);
+    }
+    DocCollection docCollection = getCollectionState(collectionName);
+    assertNotNull(docCollection);
+    assertEquals("Unexpected number of writer replicas: " + docCollection, numWriter, 
+        docCollection.getReplicas(EnumSet.of(Replica.Type.REALTIME)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+    assertEquals("Unexpected number of passive replicas: " + docCollection, numPassive, 
+        docCollection.getReplicas(EnumSet.of(Replica.Type.PASSIVE)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+    assertEquals("Unexpected number of active replicas: " + docCollection, numActive, 
+        docCollection.getReplicas(EnumSet.of(Replica.Type.APPEND)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+    return docCollection;
+  }
+  
+  /*
+   * passes only if all replicas are active or down, and the "liveNodes" reflect the same status
+   */
+  private CollectionStatePredicate clusterStateReflectsActiveAndDownReplicas() {
+    return (liveNodes, collectionState) -> {
+      for (Replica r:collectionState.getReplicas()) {
+        if (r.getState() != Replica.State.DOWN && r.getState() != Replica.State.ACTIVE) {
+          return false;
+        }
+        if (r.getState() == Replica.State.DOWN && liveNodes.contains(r.getNodeName())) {
+          return false;
+        }
+        if (r.getState() == Replica.State.ACTIVE && !liveNodes.contains(r.getNodeName())) {
+          return false;
+        }
+      }
+      return true;
+    };
+  }
+  
+  
+  private CollectionStatePredicate activeReplicaCount(int numWriter, int numActive, int numPassive) {
+    return (liveNodes, collectionState) -> {
+      int writersFound = 0, activesFound = 0, passivesFound = 0;
+      if (collectionState == null)
+        return false;
+      for (Slice slice : collectionState) {
+        for (Replica replica : slice) {
+          if (replica.isActive(liveNodes))
+            switch (replica.getType()) {
+              case APPEND:
+                activesFound++;
+                break;
+              case PASSIVE:
+                passivesFound++;
+                break;
+              case REALTIME:
+                writersFound++;
+                break;
+              default:
+                throw new AssertionError("Unexpected replica type");
+            }
+        }
+      }
+      return numWriter == writersFound && numActive == activesFound && numPassive == passivesFound;
+    };
+  }
+  
+  private void addDocs(int numDocs) throws SolrServerException, IOException {
+    for (int i = 0; i < numDocs; i++) {
+      cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", String.valueOf(i), "fieldName_s", String.valueOf(i)));
+    }
+    cluster.getSolrClient().commit(collectionName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
index 3147d4e..2ff6573 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -880,7 +881,7 @@ public class CloudSolrClient extends SolrClient {
       String url = zkProps.getCoreUrl();
       urls.add(url);
       if (!directUpdatesToLeadersOnly) {
-        for (Replica replica : slice.getReplicas()) {
+        for (Replica replica : slice.getReplicas(EnumSet.of(Replica.Type.APPEND, Replica.Type.REALTIME))) {
           if (!replica.getNodeName().equals(leader.getNodeName()) &&
               !replica.getName().equals(leader.getName())) {
             ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index 8beb6ed..048f7b5 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.solr.client.solrj.request;
 
+import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
+
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
@@ -34,6 +36,7 @@ import org.apache.solr.client.solrj.util.SolrIdentifierValidator;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CollectionAdminParams;
 import org.apache.solr.common.params.CollectionParams.CollectionAction;
@@ -45,8 +48,6 @@ import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.ContentStream;
 import org.apache.solr.common.util.NamedList;
 
-import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
-
 /**
  * This class is experimental and subject to change.
  *
@@ -321,6 +322,19 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
    * @param collection the collection name
    * @param config     the collection config
    * @param numShards  the number of shards in the collection
+   * @param numRealtimeReplicas the number of {@link org.apache.solr.common.cloud.Replica.Type#REALTIME} replicas
+   * @param numAppendReplicas the number of {@link org.apache.solr.common.cloud.Replica.Type#APPEND} replicas
+   * @param numPassiveReplicas the number of {@link org.apache.solr.common.cloud.Replica.Type#PASSIVE} replicas
+   */
+  public static Create createCollection(String collection, String config, int numShards, int numRealtimeReplicas, int numAppendReplicas, int numPassiveReplicas) {
+    return new Create(collection, config, numShards, numRealtimeReplicas, numPassiveReplicas); // TODO: Use numAppendReplicas
+  }
+  
+  /**
+   * Returns a SolrRequest for creating a collection
+   * @param collection the collection name
+   * @param config     the collection config
+   * @param numShards  the number of shards in the collection
    * @param numReplicas the replication factor of the collection
    */
   public static Create createCollection(String collection, String config, int numShards, int numReplicas) {
@@ -362,11 +376,11 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     protected String routerField;
     protected Integer numShards;
     protected Integer maxShardsPerNode;
-    protected Integer replicationFactor;
+    protected Integer realtimeReplicas;
+    protected Integer passiveReplicas;
 
     private Properties properties;
     protected Boolean autoAddReplicas;
-    protected Integer realtimeReplicas;
     protected Integer stateFormat;
     private String[] rule , snitch;
 
@@ -377,24 +391,29 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     public Create() {
       super(CollectionAction.CREATE, null);
     }
-
-    private Create(String collection, String config, int numShards, int numReplicas) {
+    
+    private Create(String collection, String config, int numShards, int numRealtimeReplicas, int readPassiveReplicas) { // TODO: maybe add other constructors
       super(CollectionAction.CREATE, SolrIdentifierValidator.validateCollectionName(collection));
       this.configName = config;
       this.numShards = numShards;
-      this.replicationFactor = numReplicas;
+      this.realtimeReplicas = numRealtimeReplicas;
+      this.passiveReplicas = readPassiveReplicas;
+    }
+
+    private Create(String collection, String config, int numShards, int numReplicas) {
+      this(collection, config, numShards, numReplicas, 0);
     }
 
-    private Create(String collection, int numShards, int numReplicas) {
+    private Create(String collection, int numShards, int numRealtimeReplicas) {
       super(CollectionAction.CREATE, SolrIdentifierValidator.validateCollectionName(collection));
       this.numShards = numShards;
-      this.replicationFactor = numReplicas;
+      this.realtimeReplicas = numRealtimeReplicas;
     }
 
-    private Create(String collection, String config, String shards, int numReplicas) {
+    private Create(String collection, String config, String shards, int numRealtimeReplicas) {
       super(CollectionAction.CREATE, SolrIdentifierValidator.validateCollectionName(collection));
       this.configName = config;
-      this.replicationFactor = numReplicas;
+      this.realtimeReplicas = numRealtimeReplicas;
       this.shards = shards;
       this.routerName = ImplicitDocRouter.NAME;
     }
@@ -410,7 +429,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     public Create setAutoAddReplicas(boolean autoAddReplicas) { this.autoAddReplicas = autoAddReplicas; return this; }
     public Create setRealtimeReplicas(Integer realtimeReplicas) { this.realtimeReplicas = realtimeReplicas; return this;}
     @Deprecated
-    public Create setReplicationFactor(Integer repl) { this.replicationFactor = repl; return this; }
+    public Create setReplicationFactor(Integer repl) { this.realtimeReplicas = repl; return this; }
     public Create setStateFormat(Integer stateFormat) { this.stateFormat = stateFormat; return this; }
     public Create setRule(String... s){ this.rule = s; return this; }
     public Create setSnitch(String... s){ this.snitch = s; return this; }
@@ -421,7 +440,13 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     public String getShards() { return  shards; }
     public Integer getNumShards() { return numShards; }
     public Integer getMaxShardsPerNode() { return maxShardsPerNode; }
-    public Integer getReplicationFactor() { return replicationFactor; }
+    /**
+     * 
+     * @deprecated Use {@link #getNumRealtimeReplicas()}
+     */
+    @Deprecated
+    public Integer getReplicationFactor() { return getNumRealtimeReplicas(); }
+    public Integer getNumRealtimeReplicas() { return realtimeReplicas; }
     public Boolean getAutoAddReplicas() { return autoAddReplicas; }
     public Integer getRealtimeReplicas() { return realtimeReplicas; }
     public Integer getStateFormat() { return stateFormat; }
@@ -504,8 +529,9 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
       if (routerField != null) {
         params.set("router.field", routerField);
       }
-      if (replicationFactor != null) {
-        params.set( "replicationFactor", replicationFactor);
+      if (realtimeReplicas != null) {
+        params.set( "replicationFactor", realtimeReplicas);// Keep both for compatibility?
+        params.set( ZkStateReader.REALTIME_REPLICAS, realtimeReplicas);
       }
       if (autoAddReplicas != null) {
         params.set(ZkStateReader.AUTO_ADD_REPLICAS, autoAddReplicas);
@@ -519,6 +545,9 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
       if (stateFormat != null) {
         params.set(DocCollection.STATE_FORMAT, stateFormat);
       }
+      if (passiveReplicas != null) {
+        params.set(ZkStateReader.PASSIVE_REPLICAS, passiveReplicas);
+      }
       if(rule != null) params.set("rule", rule);
       if(snitch != null) params.set("snitch", snitch);
       return params;
@@ -1577,19 +1606,26 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
 
 
   }
-
+  
   /**
    * Returns a SolrRequest to add a replica to a shard in a collection
    */
   public static AddReplica addReplicaToShard(String collection, String shard) {
-    return new AddReplica(collection, shard, null);
+    return addReplicaToShard(collection, shard, Replica.Type.REALTIME);
+  }
+
+  /**
+   * Returns a SolrRequest to add a replica to a shard in a collection
+   */
+  public static AddReplica addReplicaToShard(String collection, String shard, Replica.Type replicaType) {
+    return new AddReplica(collection, shard, null, replicaType);
   }
 
   /**
    * Returns a SolrRequest to add a replica to a collection using a route key
    */
   public static AddReplica addReplicaByRouteKey(String collection, String routeKey) {
-    return new AddReplica(collection, null, routeKey);
+    return new AddReplica(collection, null, routeKey, Replica.Type.REALTIME);
   }
 
   // ADDREPLICA request
@@ -1602,6 +1638,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     protected String instanceDir;
     protected String dataDir;
     protected Properties properties;
+    protected Replica.Type type = Replica.Type.REALTIME;
 
     /**
      * @deprecated Use {@link #addReplicaByRouteKey(String, String)} or {@link #addReplicaToShard(String, String)}
@@ -1611,11 +1648,12 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
       super(CollectionAction.ADDREPLICA);
     }
 
-    private AddReplica(String collection, String shard, String routeKey) {
+    private AddReplica(String collection, String shard, String routeKey, Replica.Type type) {
       super(CollectionAction.ADDREPLICA);
       this.collection = collection;
       this.shard = shard;
       this.routeKey = routeKey;
+      this.type = type;
     }
 
     public Properties getProperties() {
@@ -1689,6 +1727,11 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
       this.asyncId = id;
       return this;
     }
+    
+    public AddReplica setType(Replica.Type type) {
+      this.type = type;
+      return this;
+    }
 
     @Override
     public SolrParams getParams() {
@@ -1714,6 +1757,9 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
       if (dataDir != null)  {
         params.add("dataDir", dataDir);
       }
+      if (type != null) {
+        params.add(ZkStateReader.REPLICA_TYPE, type.name());
+      }
       if (properties != null) {
         addProperties(params, properties);
       }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
index bf0f04f..e3d6544 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
@@ -18,6 +18,7 @@ package org.apache.solr.common.cloud;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -62,7 +63,6 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
   private final Boolean autoAddReplicas;
   private final Integer realtimeReplicas;
 
-
   public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) {
     this(name, slices, props, router, Integer.MAX_VALUE, ZkStateReader.CLUSTER_STATE);
   }
@@ -311,6 +311,14 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
     }
     return replicas;
   }
+  
+  public List<Replica> getReplicas(EnumSet<Replica.Type> s) {
+    List<Replica> replicas = new ArrayList<>();
+    for (Slice slice : this) {
+      replicas.addAll(slice.getReplicas(s));
+    }
+    return replicas;
+  }
 
   /**
    * Get the shardId of a core on a specific node

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
index 4968cf2..8f3ed15 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
@@ -84,9 +84,31 @@ public class Replica extends ZkNodeProps {
     }
   }
 
+  public enum Type {
+    /**
+     * Writes updates to transaction log and indexes locally. Replicas of type {@link #REALTIME} support NRT (soft commits) and RTG. 
+     * Any {@link #REALTIME} replica can become a leader. A shard leader will forward updates to all active {@link #REALTIME} and
+     * {@link #APPEND} replicas. 
+     */
+    REALTIME,
+    /**
+     * Writes to transaction log, but not to index, uses replication. Any {@link #APPEND} replica can become leader (by first
+     * applying all local transaction log elements). If a replica is of type {@link #APPEND} but is also the leader, it will behave 
+     * as a {@link #REALTIME}. A shard leader will forward updates to all active {@link #REALTIME} and {@link #APPEND} replicas.
+     */
+    APPEND,
+    /**
+     * Doesn\u2019t index or writes to transaction log. Just replicates from {@link #REALTIME} or {@link #APPEND} replicas. {@link #PASSIVE}
+     * replicas can\u2019t become shard leaders (i.e., if there are only passive replicas in the collection at some point, updates will fail
+     * same as if there is no leaders, queries continue to work), so they don\u2019t even participate in elections.
+     */
+    PASSIVE
+  }
+
   private final String name;
   private final String nodeName;
   private final State state;
+  private final Type type;
 
   public Replica(String name, Map<String,Object> propMap) {
     super(propMap);
@@ -98,6 +120,12 @@ public class Replica extends ZkNodeProps {
       this.state = State.ACTIVE;                         //Default to ACTIVE
       propMap.put(ZkStateReader.STATE_PROP, state.toString());
     }
+    String typeString = (String)propMap.get(ZkStateReader.REPLICA_TYPE);
+    if (typeString == null) {
+      this.type = Type.REALTIME;
+    } else {
+      this.type = Type.valueOf(typeString);
+    }
 
   }
 
@@ -129,6 +157,10 @@ public class Replica extends ZkNodeProps {
   public boolean isActive(Set<String> liveNodes) {
     return liveNodes.contains(this.nodeName) && this.state == State.ACTIVE;
   }
+  
+  public Type getType() {
+    return this.type;
+  }
 
   @Override
   public String toString() {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
index bd3bafd..2cd716c 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
@@ -18,6 +18,7 @@ package org.apache.solr.common.cloud;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -27,6 +28,7 @@ import java.util.Map;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
+import org.apache.solr.common.cloud.Replica.Type;
 import org.noggit.JSONUtil;
 import org.noggit.JSONWriter;
 
@@ -161,7 +163,7 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
     // add the replicas *after* the other properties (for aesthetics, so it's easy to find slice properties in the JSON output)
     this.replicas = replicas != null ? replicas : makeReplicas((Map<String,Object>)propMap.get(REPLICAS));
     propMap.put(REPLICAS, this.replicas);
-
+    
     Map<String, Object> rules = (Map<String, Object>) propMap.get("routingRules");
     if (rules != null) {
       this.routingRules = new HashMap<>();
@@ -202,7 +204,10 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
 
   private Replica findLeader() {
     for (Replica replica : replicas.values()) {
-      if (replica.getStr(LEADER) != null) return replica;
+      if (replica.getStr(LEADER) != null) {
+        assert replica.getType() == Type.APPEND || replica.getType() == Type.REALTIME;
+        return replica;
+      }
     }
     return null;
   }
@@ -215,7 +220,7 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
   }
 
   /**
-   * Gets the list of replicas for this slice.
+   * Gets the list of all replicas for this slice.
    */
   public Collection<Replica> getReplicas() {
     return replicas.values();
@@ -227,6 +232,13 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
   public List<Replica> getReplicas(Predicate<Replica> pred) {
     return replicas.values().stream().filter(pred).collect(Collectors.toList());
   }
+  
+  /**
+   * Gets the list of replicas that have a state present in s
+   */
+  public List<Replica> getReplicas(EnumSet<Replica.Type> s) {
+    return this.getReplicas(r->s.contains(r.getType()));
+  }
 
   /**
    * Get the map of coreNodeName to replicas for this slice.
@@ -238,7 +250,7 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
   public Map<String,Replica> getReplicasCopy() {
     return new LinkedHashMap<>(replicas);
   }
-
+  
   public Replica getLeader() {
     return leader;
   }
@@ -272,4 +284,5 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
   public void write(JSONWriter jsonWriter) {
     jsonWriter.write(propMap);
   }
+  
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 51b4b59..1a201f1 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 import org.apache.solr.common.Callable;
 import org.apache.solr.common.SolrException;
@@ -92,10 +93,15 @@ public class ZkStateReader implements Closeable {
   public static final String REJOIN_AT_HEAD_PROP = "rejoinAtHead";
   public static final String SOLR_SECURITY_CONF_PATH = "/security.json";
 
+  /**
+   *@deprecated Use {@link #REALTIME_REPLICAS} 
+   */
+  @Deprecated
   public static final String REPLICATION_FACTOR = "replicationFactor";
   public static final String MAX_SHARDS_PER_NODE = "maxShardsPerNode";
   public static final String AUTO_ADD_REPLICAS = "autoAddReplicas";
   public static final String MAX_CORES_PER_NODE = "maxCoresPerNode";
+  public static final String PASSIVE_REPLICAS = "passiveReplicas";
   public static final String REALTIME_REPLICAS = "realtimeReplicas";
 
   public static final String ROLES = "/roles.json";
@@ -106,6 +112,8 @@ public class ZkStateReader implements Closeable {
   public static final String LEGACY_CLOUD = "legacyCloud";
 
   public static final String URL_SCHEME = "urlScheme";
+  
+  public static final String REPLICA_TYPE = "type";
 
 
   /** A view of the current state of all collections; combines all the different state sources into a single view. */
@@ -798,7 +806,9 @@ public class ZkStateReader implements Closeable {
     
     Map<String,Replica> shardMap = replicas.getReplicasMap();
     List<ZkCoreNodeProps> nodes = new ArrayList<>(shardMap.size());
-    for (Entry<String,Replica> entry : shardMap.entrySet()) {
+    // nocommit: This method is used for finding replicas to write to, We don't need passive replicas. Should be renamed or should receive a parameter to
+    // skip certain types
+    for (Entry<String,Replica> entry : shardMap.entrySet().stream().filter((e)->e.getValue().getType() != Replica.Type.PASSIVE).collect(Collectors.toList())) {
       ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());
       
       String coreNodeName = entry.getValue().getName();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
index f3e0d7e..e64892d 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
@@ -128,6 +128,11 @@ public abstract class CoreAdminParams
    */
   public static final String NEW_COLLECTION = "newCollection";
 
+  /**
+   * Tells the CoreAdminHandler that the new Core will be a replica of this type. 
+   */
+  public static final String REPLICA_TYPE = "replicaType";
+
   public enum CoreAdminAction {
     STATUS(true),
     UNLOAD,

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
----------------------------------------------------------------------
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
index 825e7c7..75a93df4 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java
@@ -775,8 +775,8 @@ public abstract class SolrTestCaseJ4 extends LuceneTestCase {
    * is set.
    */
   public static void deleteCore() {
-    log.info("###deleteCore" );
     if (h != null) {
+      log.info("###deleteCore" );
       // If the test case set up Zk, it should still have it as available,
       // otherwise the core close will just be unnecessarily delayed.
       CoreContainer cc = h.getCoreContainer();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/043d5cb2/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java
----------------------------------------------------------------------
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java b/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java
index 5cae356..2ad42d1 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/ChaosMonkey.java
@@ -187,7 +187,7 @@ public class ChaosMonkey {
   
   private static void stopJettySolrRunner(JettySolrRunner jetty) throws Exception {
     assert(jetty != null);
-    monkeyLog("stop shard! " + jetty.getLocalPort());
+    monkeyLog("stop jetty! " + jetty.getLocalPort());
     SolrDispatchFilter sdf = jetty.getSolrDispatchFilter();
     if (sdf != null) {
       try {
@@ -231,7 +231,7 @@ public class ChaosMonkey {
 
     IpTables.blockPort(jetty.getLocalPort());
     
-    monkeyLog("kill shard! " + jetty.getLocalPort());
+    monkeyLog("kill jetty! " + jetty.getLocalPort());
     
     jetty.stop();