You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by cp...@apache.org on 2017/05/25 17:39:39 UTC

[3/6] lucene-solr:jira/solr-8668: SOLR-10233: Add support for replica types

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/DeleteNodeTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteNodeTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteNodeTest.java
index 8d2f6f2..9f461f0 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteNodeTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteNodeTest.java
@@ -18,7 +18,6 @@
 package org.apache.solr.cloud;
 
 
-import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Set;
@@ -29,12 +28,9 @@ import org.apache.solr.client.solrj.response.RequestStatusState;
 import org.apache.solr.common.util.StrUtils;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class DeleteNodeTest extends SolrCloudTestCase {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
+  
   @BeforeClass
   public static void setupCluster() throws Exception {
     configureCluster(6)
@@ -54,7 +50,10 @@ public class DeleteNodeTest extends SolrCloudTestCase {
     Set<String> liveNodes = cloudClient.getZkStateReader().getClusterState().getLiveNodes();
     ArrayList<String> l = new ArrayList<>(liveNodes);
     Collections.shuffle(l, random());
-    CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 5, 2);
+    CollectionAdminRequest.Create create = pickRandom(
+        CollectionAdminRequest.createCollection(coll, "conf1", 5, 2, 0, 0),
+        CollectionAdminRequest.createCollection(coll, "conf1", 5, 1, 1, 0),
+        CollectionAdminRequest.createCollection(coll, "conf1", 5, 0, 1, 1));
     create.setCreateNodeSet(StrUtils.join(l, ',')).setMaxShardsPerNode(3);
     cloudClient.request(create);
     String node2bdecommissioned = l.get(0);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
index 5699a8f..4c6253e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
@@ -19,9 +19,11 @@ package org.apache.solr.cloud;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.EnumSet;
 
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.CoreStatus;
+import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
@@ -101,11 +103,29 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
   public void deleteReplicaByCount() throws Exception {
 
     final String collectionName = "deleteByCount";
-    CollectionAdminRequest.createCollection(collectionName, "conf", 1, 3).process(cluster.getSolrClient());
+    pickRandom(
+        CollectionAdminRequest.createCollection(collectionName, "conf", 1, 3),
+        CollectionAdminRequest.createCollection(collectionName, "conf", 1, 1, 1, 1),
+        CollectionAdminRequest.createCollection(collectionName, "conf", 1, 1, 0, 2),
+        CollectionAdminRequest.createCollection(collectionName, "conf", 1, 0, 1, 2))
+    .process(cluster.getSolrClient());
     waitForState("Expected a single shard with three replicas", collectionName, clusterShape(1, 3));
 
     CollectionAdminRequest.deleteReplicasFromShard(collectionName, "shard1", 2).process(cluster.getSolrClient());
     waitForState("Expected a single shard with a single replica", collectionName, clusterShape(1, 1));
+    
+    try {
+      CollectionAdminRequest.deleteReplicasFromShard(collectionName, "shard1", 1).process(cluster.getSolrClient());
+      fail("Expected Exception, Can't delete the last replica by count");
+    } catch (SolrException e) {
+      // expected
+      assertEquals(SolrException.ErrorCode.BAD_REQUEST.code, e.code());
+      assertTrue(e.getMessage().contains("There is only one replica available"));
+    }
+    DocCollection docCollection = getCollectionState(collectionName);
+    // We know that since leaders are preserved, PULL replicas should not be left alone in the shard
+    assertEquals(0, docCollection.getSlice("shard1").getReplicas(EnumSet.of(Replica.Type.PULL)).size());
+    
 
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
index 8904ea8..8f35c88 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
@@ -58,8 +58,8 @@ public class ForceLeaderTest extends HttpPartitionTest {
   private final boolean onlyLeaderIndexes = random().nextBoolean();
 
   @Override
-  protected int getRealtimeReplicas() {
-    return onlyLeaderIndexes? 1 : -1;
+  protected boolean useTlogReplicas() {
+    return onlyLeaderIndexes;
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/FullThrottleStoppableIndexingThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/FullThrottleStoppableIndexingThread.java b/solr/core/src/test/org/apache/solr/cloud/FullThrottleStoppableIndexingThread.java
new file mode 100644
index 0000000..b9e177a
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/FullThrottleStoppableIndexingThread.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.net.ConnectException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.http.client.HttpClient;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient;
+import org.apache.solr.client.solrj.impl.HttpClientUtil;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class FullThrottleStoppableIndexingThread extends StoppableIndexingThread {
+  
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  /**
+   * 
+   */
+  private CloseableHttpClient httpClient = HttpClientUtil.createClient(null);
+  private volatile boolean stop = false;
+  int clientIndex = 0;
+  private ConcurrentUpdateSolrClient cusc;
+  private List<SolrClient> clients;
+  private AtomicInteger fails = new AtomicInteger();
+  
+  public FullThrottleStoppableIndexingThread(SolrClient controlClient, CloudSolrClient cloudClient, List<SolrClient> clients,
+                                             String id, boolean doDeletes, int clientSoTimeout) {
+    super(controlClient, cloudClient, id, doDeletes);
+    setName("FullThrottleStopableIndexingThread");
+    setDaemon(true);
+    this.clients = clients;
+
+    cusc = new ErrorLoggingConcurrentUpdateSolrClient(((HttpSolrClient) clients.get(0)).getBaseURL(), httpClient, 8, 2);
+    cusc.setConnectionTimeout(10000);
+    cusc.setSoTimeout(clientSoTimeout);
+  }
+  
+  @Override
+  public void run() {
+    int i = 0;
+    int numDeletes = 0;
+    int numAdds = 0;
+
+    while (true && !stop) {
+      String id = this.id + "-" + i;
+      ++i;
+      
+      if (doDeletes && LuceneTestCase.random().nextBoolean() && deletes.size() > 0) {
+        String delete = deletes.remove(0);
+        try {
+          numDeletes++;
+          cusc.deleteById(delete);
+        } catch (Exception e) {
+          changeUrlOnError(e);
+          fails.incrementAndGet();
+        }
+      }
+      
+      try {
+        numAdds++;
+        if (numAdds > (LuceneTestCase.TEST_NIGHTLY ? 4002 : 197))
+          continue;
+        SolrInputDocument doc = AbstractFullDistribZkTestBase.getDoc(
+            "id",
+            id,
+            i1,
+            50,
+            t1,
+            "Saxon heptarchies that used to rip around so in old times and raise Cain.  My, you ought to seen old Henry the Eight when he was in bloom.  He WAS a blossom.  He used to marry a new wife every day, and chop off her head next morning.  And he would do it just as indifferent as if ");
+        cusc.add(doc);
+      } catch (Exception e) {
+        changeUrlOnError(e);
+        fails.incrementAndGet();
+      }
+      
+      if (doDeletes && LuceneTestCase.random().nextBoolean()) {
+        deletes.add(id);
+      }
+      
+    }
+
+    log.info("FT added docs:" + numAdds + " with " + fails + " fails" + " deletes:" + numDeletes);
+  }
+
+  private void changeUrlOnError(Exception e) {
+    if (e instanceof ConnectException) {
+      clientIndex++;
+      if (clientIndex > clients.size() - 1) {
+        clientIndex = 0;
+      }
+      cusc.shutdownNow();
+      cusc = new ErrorLoggingConcurrentUpdateSolrClient(((HttpSolrClient) clients.get(clientIndex)).getBaseURL(),
+          httpClient, 30, 3);
+    }
+  }
+  
+  @Override
+  public void safeStop() {
+    stop = true;
+    cusc.blockUntilFinished();
+    cusc.shutdownNow();
+    IOUtils.closeQuietly(httpClient);
+  }
+
+  @Override
+  public int getFailCount() {
+    return fails.get();
+  }
+  
+  @Override
+  public Set<String> getAddFails() {
+    throw new UnsupportedOperationException();
+  }
+  
+  @Override
+  public Set<String> getDeleteFails() {
+    throw new UnsupportedOperationException();
+  }
+  
+  static class ErrorLoggingConcurrentUpdateSolrClient extends ConcurrentUpdateSolrClient {
+    @SuppressWarnings("deprecation")
+    public ErrorLoggingConcurrentUpdateSolrClient(String serverUrl, HttpClient httpClient, int queueSize, int threadCount) {
+      super(serverUrl, httpClient, queueSize, threadCount, null, false);
+    }
+    @Override
+    public void handleError(Throwable ex) {
+      log.warn("cusc error", ex);
+    }
+  }
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
index 01002cf..2cc1c30 100644
--- a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
@@ -85,8 +85,8 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
   }
 
   @Override
-  protected int getRealtimeReplicas() {
-    return onlyLeaderIndexes? 1 : -1;
+  protected boolean useTlogReplicas() {
+    return onlyLeaderIndexes;
   }
 
   /**
@@ -110,10 +110,10 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
    */
   @Override
   public JettySolrRunner createJetty(File solrHome, String dataDir,
-      String shardList, String solrConfigOverride, String schemaOverride)
+      String shardList, String solrConfigOverride, String schemaOverride, Replica.Type replicaType)
       throws Exception
   {
-    return createProxiedJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride);
+    return createProxiedJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride, replicaType);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java
index 457b9d9..f3965ac 100644
--- a/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java
@@ -46,8 +46,8 @@ public class LeaderInitiatedRecoveryOnCommitTest extends BasicDistributedZkTest
   }
 
   @Override
-  protected int getRealtimeReplicas() {
-    return onlyLeaderIndexes? 1 : -1;
+  protected boolean useTlogReplicas() {
+    return onlyLeaderIndexes;
   }
 
   @Override
@@ -161,9 +161,9 @@ public class LeaderInitiatedRecoveryOnCommitTest extends BasicDistributedZkTest
    */
   @Override
   public JettySolrRunner createJetty(File solrHome, String dataDir,
-                                     String shardList, String solrConfigOverride, String schemaOverride)
+                                     String shardList, String solrConfigOverride, String schemaOverride, Replica.Type replicaType)
       throws Exception {
-    return createProxiedJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride);
+    return createProxiedJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride, replicaType);
   }
 
   protected void sendCommitWithRetry(Replica replica) throws Exception {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/OnlyLeaderIndexesTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/OnlyLeaderIndexesTest.java b/solr/core/src/test/org/apache/solr/cloud/OnlyLeaderIndexesTest.java
deleted file mode 100644
index a4e8d6f..0000000
--- a/solr/core/src/test/org/apache/solr/cloud/OnlyLeaderIndexesTest.java
+++ /dev/null
@@ -1,435 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Semaphore;
-
-import org.apache.lucene.index.IndexWriter;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrQuery;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.embedded.JettySolrRunner;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.client.solrj.response.QueryResponse;
-import org.apache.solr.common.SolrDocument;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.update.DirectUpdateHandler2;
-import org.apache.solr.update.SolrIndexWriter;
-import org.apache.solr.update.UpdateHandler;
-import org.apache.solr.update.UpdateLog;
-import org.apache.solr.util.RefCounted;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class OnlyLeaderIndexesTest extends SolrCloudTestCase {
-  private static final String COLLECTION = "collection1";
-
-  @BeforeClass
-  public static void setupCluster() throws Exception {
-    System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
-    System.setProperty("solr.ulog.numRecordsToKeep", "1000");
-
-    configureCluster(3)
-        .addConfig("config", TEST_PATH().resolve("configsets")
-        .resolve("cloud-minimal-inplace-updates").resolve("conf"))
-        .configure();
-
-    CollectionAdminRequest
-        .createCollection(COLLECTION, "config", 1, 3)
-        .setRealtimeReplicas(1)
-        .setMaxShardsPerNode(1)
-        .process(cluster.getSolrClient());
-    AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
-        false, true, 30);
-  }
-
-  @Test
-  public void test() throws Exception {
-    basicTest();
-    recoveryTest();
-    dbiTest();
-    basicLeaderElectionTest();
-    outOfOrderDBQWithInPlaceUpdatesTest();
-  }
-
-  public void basicTest() throws Exception {
-    CloudSolrClient cloudClient = cluster.getSolrClient();
-    new UpdateRequest()
-        .add(sdoc("id", "1"))
-        .add(sdoc("id", "2"))
-        .add(sdoc("id", "3"))
-        .add(sdoc("id", "4"))
-        .process(cloudClient, COLLECTION);
-
-    {
-      UpdateHandler updateHandler = getSolrCore(true).get(0).getUpdateHandler();
-      RefCounted<IndexWriter> iwRef = updateHandler.getSolrCoreState().getIndexWriter(null);
-      assertTrue("IndexWriter at leader must see updates ", iwRef.get().hasUncommittedChanges());
-      iwRef.decref();
-    }
-
-    for (SolrCore solrCore : getSolrCore(false)) {
-      RefCounted<IndexWriter> iwRef = solrCore.getUpdateHandler().getSolrCoreState().getIndexWriter(null);
-      assertFalse("IndexWriter at replicas must not see updates ", iwRef.get().hasUncommittedChanges());
-      iwRef.decref();
-    }
-
-    checkRTG(1, 4, cluster.getJettySolrRunners());
-
-    new UpdateRequest()
-        .deleteById("1")
-        .deleteByQuery("id:2")
-        .process(cloudClient, COLLECTION);
-
-    // The DBQ is not processed at replicas, so we still can get doc2 and other docs by RTG
-    checkRTG(2,4, getSolrRunner(false));
-
-    new UpdateRequest()
-        .commit(cloudClient, COLLECTION);
-
-    checkShardConsistency(2, 1);
-
-    // Update log roll over
-    for (SolrCore solrCore : getSolrCore(false)) {
-      UpdateLog updateLog = solrCore.getUpdateHandler().getUpdateLog();
-      assertFalse(updateLog.hasUncommittedChanges());
-    }
-
-    // UpdateLog copy over old updates
-    for (int i = 15; i <= 150; i++) {
-      cloudClient.add(COLLECTION, sdoc("id",String.valueOf(i)));
-      if (random().nextInt(100) < 15 & i != 150) {
-        cloudClient.commit(COLLECTION);
-      }
-    }
-    checkRTG(120,150, cluster.getJettySolrRunners());
-    waitForReplicasCatchUp(20);
-  }
-
-  public void recoveryTest() throws Exception {
-    CloudSolrClient cloudClient = cluster.getSolrClient();
-    new UpdateRequest()
-        .deleteByQuery("*:*")
-        .commit(cluster.getSolrClient(), COLLECTION);
-    new UpdateRequest()
-        .add(sdoc("id", "3"))
-        .add(sdoc("id", "4"))
-        .commit(cloudClient, COLLECTION);
-    // Replica recovery
-    new UpdateRequest()
-        .add(sdoc("id", "5"))
-        .process(cloudClient, COLLECTION);
-    JettySolrRunner solrRunner = getSolrRunner(false).get(0);
-    ChaosMonkey.stop(solrRunner);
-    new UpdateRequest()
-        .add(sdoc("id", "6"))
-        .process(cloudClient, COLLECTION);
-    ChaosMonkey.start(solrRunner);
-    AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
-        false, true, 30);
-    // We skip peerSync, so replica will always trigger commit on leader
-    checkShardConsistency(4, 20);
-
-    // LTR can be kicked off, so waiting for replicas recovery
-    new UpdateRequest()
-        .add(sdoc("id", "7"))
-        .commit(cloudClient, COLLECTION);
-    AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
-        false, true, 30);
-    checkShardConsistency(5, 20);
-
-    // More Replica recovery testing
-    new UpdateRequest()
-        .add(sdoc("id", "8"))
-        .process(cloudClient, COLLECTION);
-    checkRTG(3,8, cluster.getJettySolrRunners());
-    DirectUpdateHandler2.commitOnClose = false;
-    ChaosMonkey.stop(solrRunner);
-    DirectUpdateHandler2.commitOnClose = true;
-    ChaosMonkey.start(solrRunner);
-    AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
-        false, true, 30);
-    checkRTG(3,8, cluster.getJettySolrRunners());
-    checkShardConsistency(6, 20);
-
-    // Test replica recovery apply buffer updates
-    Semaphore waitingForBufferUpdates = new Semaphore(0);
-    Semaphore waitingForReplay = new Semaphore(0);
-    RecoveryStrategy.testing_beforeReplayBufferingUpdates = () -> {
-      try {
-        waitingForReplay.release();
-        waitingForBufferUpdates.acquire();
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    };
-    ChaosMonkey.stop(solrRunner);
-    ChaosMonkey.start(solrRunner);
-    waitingForReplay.acquire();
-    new UpdateRequest()
-        .add(sdoc("id", "9"))
-        .add(sdoc("id", "10"))
-        .process(cloudClient, COLLECTION);
-    waitingForBufferUpdates.release();
-    RecoveryStrategy.testing_beforeReplayBufferingUpdates = null;
-    AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
-        false, true, 30);
-    checkRTG(3,10, cluster.getJettySolrRunners());
-    checkShardConsistency(6, 20);
-    for (SolrCore solrCore : getSolrCore(false)) {
-      RefCounted<IndexWriter> iwRef = solrCore.getUpdateHandler().getSolrCoreState().getIndexWriter(null);
-      assertFalse("IndexWriter at replicas must not see updates ", iwRef.get().hasUncommittedChanges());
-      iwRef.decref();
-    }
-  }
-
-  public void dbiTest() throws Exception{
-    CloudSolrClient cloudClient = cluster.getSolrClient();
-    new UpdateRequest()
-        .deleteByQuery("*:*")
-        .commit(cluster.getSolrClient(), COLLECTION);
-    new UpdateRequest()
-        .add(sdoc("id", "1"))
-        .commit(cloudClient, COLLECTION);
-    checkShardConsistency(1, 1);
-    new UpdateRequest()
-        .deleteById("1")
-        .process(cloudClient, COLLECTION);
-    try {
-      checkRTG(1, 1, cluster.getJettySolrRunners());
-    } catch (AssertionError e) {
-      return;
-    }
-    fail("Doc1 is deleted but it's still exist");
-  }
-
-  public void basicLeaderElectionTest() throws Exception {
-    CloudSolrClient cloudClient = cluster.getSolrClient();
-    new UpdateRequest()
-        .deleteByQuery("*:*")
-        .commit(cluster.getSolrClient(), COLLECTION);
-    new UpdateRequest()
-        .add(sdoc("id", "1"))
-        .add(sdoc("id", "2"))
-        .process(cloudClient, COLLECTION);
-    String oldLeader = getLeader();
-    JettySolrRunner oldLeaderJetty = getSolrRunner(true).get(0);
-    ChaosMonkey.kill(oldLeaderJetty);
-    for (int i = 0; i < 60; i++) { // wait till leader is changed
-      if (!oldLeader.equals(getLeader())) {
-        break;
-      }
-      Thread.sleep(100);
-    }
-    new UpdateRequest()
-        .add(sdoc("id", "3"))
-        .add(sdoc("id", "4"))
-        .process(cloudClient, COLLECTION);
-    ChaosMonkey.start(oldLeaderJetty);
-    AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
-        false, true, 60);
-    checkRTG(1,4, cluster.getJettySolrRunners());
-    new UpdateRequest()
-        .commit(cloudClient, COLLECTION);
-    checkShardConsistency(4,1);
-  }
-
-  private String getLeader() throws InterruptedException {
-    ZkNodeProps props = cluster.getSolrClient().getZkStateReader().getLeaderRetry("collection1", "shard1", 30000);
-    return props.getStr(ZkStateReader.NODE_NAME_PROP);
-  }
-
-  public void outOfOrderDBQWithInPlaceUpdatesTest() throws Exception {
-    new UpdateRequest()
-        .deleteByQuery("*:*")
-        .commit(cluster.getSolrClient(), COLLECTION);
-    List<UpdateRequest> updates = new ArrayList<>();
-    updates.add(simulatedUpdateRequest(null, "id", 1, "title_s", "title0_new", "inplace_updatable_int", 5, "_version_", Long.MAX_VALUE-100)); // full update
-    updates.add(simulatedDBQ("inplace_updatable_int:5", Long.MAX_VALUE-98));
-    updates.add(simulatedUpdateRequest(Long.MAX_VALUE-100, "id", 1, "inplace_updatable_int", 6, "_version_", Long.MAX_VALUE-99));
-    for (JettySolrRunner solrRunner: getSolrRunner(false)) {
-      try (SolrClient client = solrRunner.newClient()) {
-        for (UpdateRequest up : updates) {
-          up.process(client, COLLECTION);
-        }
-      }
-    }
-    JettySolrRunner oldLeaderJetty = getSolrRunner(true).get(0);
-    ChaosMonkey.kill(oldLeaderJetty);
-    AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
-        false, true, 30);
-    ChaosMonkey.start(oldLeaderJetty);
-    AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
-        false, true, 30);
-    new UpdateRequest()
-        .add(sdoc("id", "2"))
-        .commit(cluster.getSolrClient(), COLLECTION);
-    checkShardConsistency(2,20);
-    SolrDocument doc = cluster.getSolrClient().getById(COLLECTION,"1");
-    assertNotNull(doc.get("title_s"));
-  }
-
-  private UpdateRequest simulatedUpdateRequest(Long prevVersion, Object... fields) throws SolrServerException, IOException {
-    SolrInputDocument doc = sdoc(fields);
-
-    // get baseUrl of the leader
-    String baseUrl = getBaseUrl();
-
-    UpdateRequest ur = new UpdateRequest();
-    ur.add(doc);
-    ur.setParam("update.distrib", "FROMLEADER");
-    if (prevVersion != null) {
-      ur.setParam("distrib.inplace.prevversion", String.valueOf(prevVersion));
-      ur.setParam("distrib.inplace.update", "true");
-    }
-    ur.setParam("distrib.from", baseUrl);
-    return ur;
-  }
-
-  private UpdateRequest simulatedDBQ(String query, long version) throws SolrServerException, IOException {
-    String baseUrl = getBaseUrl();
-
-    UpdateRequest ur = new UpdateRequest();
-    ur.deleteByQuery(query);
-    ur.setParam("_version_", ""+version);
-    ur.setParam("update.distrib", "FROMLEADER");
-    ur.setParam("distrib.from", baseUrl);
-    return ur;
-  }
-
-  private String getBaseUrl() {
-    DocCollection collection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION);
-    Slice slice = collection.getSlice("shard1");
-    return slice.getLeader().getCoreUrl();
-  }
-
-  private void checkRTG(int from, int to, List<JettySolrRunner> solrRunners) throws Exception{
-
-    for (JettySolrRunner solrRunner: solrRunners) {
-      try (SolrClient client = solrRunner.newClient()) {
-        for (int i = from; i <= to; i++) {
-          SolrQuery query = new SolrQuery("*:*");
-          query.set("distrib", false);
-          query.setRequestHandler("/get");
-          query.set("id",i);
-          QueryResponse res = client.query(COLLECTION, query);
-          assertNotNull("Can not find doc "+ i + " in " + solrRunner.getBaseUrl(),res.getResponse().get("doc"));
-        }
-      }
-    }
-
-  }
-
-  private void checkShardConsistency(int expected, int numTry) throws Exception{
-
-    for (int i = 0; i < numTry; i++) {
-      boolean inSync = true;
-      for (JettySolrRunner solrRunner: cluster.getJettySolrRunners()) {
-        try (SolrClient client = solrRunner.newClient()) {
-          SolrQuery query = new SolrQuery("*:*");
-          query.set("distrib", false);
-          long results = client.query(COLLECTION, query).getResults().getNumFound();
-          if (expected != results) {
-            inSync = false;
-            Thread.sleep(500);
-            break;
-          }
-        }
-      }
-      if (inSync) return;
-    }
-
-    fail("Some replicas are not in sync with leader");
-  }
-
-  private void waitForReplicasCatchUp(int numTry) throws IOException, InterruptedException {
-    String leaderTimeCommit = getSolrCore(true).get(0).getDeletionPolicy().getLatestCommit().getUserData().get(SolrIndexWriter.COMMIT_TIME_MSEC_KEY);
-    if (leaderTimeCommit == null) return;
-    for (int i = 0; i < numTry; i++) {
-      boolean inSync = true;
-      for (SolrCore solrCore : getSolrCore(false)) {
-        String replicateTimeCommit = solrCore.getDeletionPolicy().getLatestCommit().getUserData().get(SolrIndexWriter.COMMIT_TIME_MSEC_KEY);
-        if (!leaderTimeCommit.equals(replicateTimeCommit)) {
-          inSync = false;
-          Thread.sleep(500);
-          break;
-        }
-      }
-      if (inSync) return;
-    }
-
-    fail("Some replicas are not in sync with leader");
-
-  }
-
-  private List<SolrCore> getSolrCore(boolean isLeader) {
-    List<SolrCore> rs = new ArrayList<>();
-
-    CloudSolrClient cloudClient = cluster.getSolrClient();
-    DocCollection docCollection = cloudClient.getZkStateReader().getClusterState().getCollection(COLLECTION);
-
-    for (JettySolrRunner solrRunner : cluster.getJettySolrRunners()) {
-      if (solrRunner.getCoreContainer() == null) continue;
-      for (SolrCore solrCore : solrRunner.getCoreContainer().getCores()) {
-        CloudDescriptor cloudDescriptor = solrCore.getCoreDescriptor().getCloudDescriptor();
-        Slice slice = docCollection.getSlice(cloudDescriptor.getShardId());
-        Replica replica = docCollection.getReplica(cloudDescriptor.getCoreNodeName());
-        if (slice.getLeader() == replica && isLeader) {
-          rs.add(solrCore);
-        } else if (slice.getLeader() != replica && !isLeader) {
-          rs.add(solrCore);
-        }
-      }
-    }
-    return rs;
-  }
-
-  private List<JettySolrRunner> getSolrRunner(boolean isLeader) {
-    List<JettySolrRunner> rs = new ArrayList<>();
-
-    CloudSolrClient cloudClient = cluster.getSolrClient();
-    DocCollection docCollection = cloudClient.getZkStateReader().getClusterState().getCollection(COLLECTION);
-
-    for (JettySolrRunner solrRunner : cluster.getJettySolrRunners()) {
-      if (solrRunner.getCoreContainer() == null) continue;
-      for (SolrCore solrCore : solrRunner.getCoreContainer().getCores()) {
-        CloudDescriptor cloudDescriptor = solrCore.getCoreDescriptor().getCloudDescriptor();
-        Slice slice = docCollection.getSlice(cloudDescriptor.getShardId());
-        Replica replica = docCollection.getReplica(cloudDescriptor.getCoreNodeName());
-        if (slice.getLeader() == replica && isLeader) {
-          rs.add(solrRunner);
-        } else if (slice.getLeader() != replica && !isLeader) {
-          rs.add(solrRunner);
-        }
-      }
-    }
-    return rs;
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
index 48ac91f..91da2c1 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
@@ -27,6 +27,7 @@ import org.apache.solr.client.solrj.SolrResponse;
 import org.apache.solr.cloud.Overseer.LeaderStatus;
 import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
 import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
@@ -346,7 +347,7 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
     assertEquals(numberOfSlices * numberOfReplica, coreNames.size());
     for (int i = 1; i <= numberOfSlices; i++) {
       for (int j = 1; j <= numberOfReplica; j++) {
-        String coreName = COLLECTION_NAME + "_shard" + i + "_replica" + j;
+        String coreName = Assign.buildCoreName(COLLECTION_NAME, "shard" + i, Replica.Type.NRT, j);
         assertTrue("Shard " + coreName + " was not created",
             coreNames.contains(coreName));
         

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java b/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java
index a8e14bf..8290e12 100644
--- a/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java
@@ -40,8 +40,8 @@ public class RecoveryAfterSoftCommitTest extends AbstractFullDistribZkTestBase {
   }
 
   @Override
-  protected int getRealtimeReplicas() {
-    return onlyLeaderIndexes? 1: -1;
+  protected boolean useTlogReplicas() {
+    return onlyLeaderIndexes;
   }
 
   @BeforeClass
@@ -64,10 +64,10 @@ public class RecoveryAfterSoftCommitTest extends AbstractFullDistribZkTestBase {
    */
   @Override
   public JettySolrRunner createJetty(File solrHome, String dataDir,
-                                     String shardList, String solrConfigOverride, String schemaOverride)
+                                     String shardList, String solrConfigOverride, String schemaOverride, Replica.Type replicaType)
       throws Exception
   {
-    return createProxiedJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride);
+    return createProxiedJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride, replicaType);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java b/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
index 1c7575d..e1af607 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
@@ -21,6 +21,7 @@ package org.apache.solr.cloud;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.Set;
 
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
@@ -29,6 +30,9 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.CoreAdminRequest;
 import org.apache.solr.client.solrj.response.CoreAdminResponse;
 import org.apache.solr.client.solrj.response.RequestStatusState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.util.StrUtils;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -60,7 +64,12 @@ public class ReplaceNodeTest extends SolrCloudTestCase {
     Collections.shuffle(l, random());
     String emptyNode = l.remove(0);
     String node2bdecommissioned = l.get(0);
-    CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 5, 2);
+    CollectionAdminRequest.Create create;
+    create = pickRandom(CollectionAdminRequest.createCollection(coll, "conf1", 5, 2),
+                        CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,1,0),
+                        CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,1,1),
+                        CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,0,1),
+                        CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,2,0));
     create.setCreateNodeSet(StrUtils.join(l, ',')).setMaxShardsPerNode(3);
     cloudClient.request(create);
     log.info("excluded_node : {}  ", emptyNode);
@@ -98,7 +107,15 @@ public class ReplaceNodeTest extends SolrCloudTestCase {
     assertTrue(success);
     try (HttpSolrClient coreclient = getHttpSolrClient(cloudClient.getZkStateReader().getBaseUrlForNodeName(emptyNode))) {
       CoreAdminResponse status = CoreAdminRequest.getStatus(null, coreclient);
-      assertTrue(status.getCoreStatus().size() == 0);
+      assertEquals("Expecting no cores but found some: " + status.getCoreStatus(), 0, status.getCoreStatus().size());
+    }
+    
+    DocCollection collection = cloudClient.getZkStateReader().getClusterState().getCollection(coll);
+    assertEquals(create.getNumShards().intValue(), collection.getSlices().size());
+    for (Slice s:collection.getSlices()) {
+      assertEquals(create.getNumNrtReplicas().intValue(), s.getReplicas(EnumSet.of(Replica.Type.NRT)).size());
+      assertEquals(create.getNumTlogReplicas().intValue(), s.getReplicas(EnumSet.of(Replica.Type.TLOG)).size());
+      assertEquals(create.getNumPullReplicas().intValue(), s.getReplicas(EnumSet.of(Replica.Type.PULL)).size());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/ReplicationFactorTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ReplicationFactorTest.java b/solr/core/src/test/org/apache/solr/cloud/ReplicationFactorTest.java
index 9100eee..abd394a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ReplicationFactorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ReplicationFactorTest.java
@@ -64,10 +64,10 @@ public class ReplicationFactorTest extends AbstractFullDistribZkTestBase {
    */
   @Override
   public JettySolrRunner createJetty(File solrHome, String dataDir,
-      String shardList, String solrConfigOverride, String schemaOverride)
+      String shardList, String solrConfigOverride, String schemaOverride, Replica.Type replicaType)
       throws Exception {
 
-    return createProxiedJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride);
+    return createProxiedJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride, replicaType);
   }
   
   @Test

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java
index bf9b5e0..72f0694 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java
@@ -86,12 +86,6 @@ public class ShardSplitTest extends BasicDistributedZkTest {
     useFactory(null);
   }
 
-  //TODO for now, onlyLeaderIndexes do not work with ShardSplitTest
-  @Override
-  protected int getRealtimeReplicas() {
-    return -1;
-  }
-
   @Test
   public void test() throws Exception {
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery.java b/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery.java
index b592861..c7fc0e8 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery.java
@@ -66,8 +66,7 @@ public class TestCloudRecovery extends SolrCloudTestCase {
 
     onlyLeaderIndexes = random().nextBoolean();
     CollectionAdminRequest
-        .createCollection(COLLECTION, "config", 2, 2)
-        .setRealtimeReplicas(onlyLeaderIndexes? 1: -1)
+        .createCollection(COLLECTION, "config", 2, onlyLeaderIndexes?0:2,onlyLeaderIndexes?2:0,0)
         .setMaxShardsPerNode(2)
         .process(cluster.getSolrClient());
     AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
@@ -111,7 +110,7 @@ public class TestCloudRecovery extends SolrCloudTestCase {
     assertEquals(4, resp.getResults().getNumFound());
     // Make sure all nodes is recover from tlog
     if (onlyLeaderIndexes) {
-      // Leader election can be kicked off, so 2 append replicas will replay its tlog before becoming new leader
+      // Leader election can be kicked off, so 2 tlog replicas will replay its tlog before becoming new leader
       assertTrue( countReplayLog.get() >=2);
     } else {
       assertEquals(4, countReplayLog.get());

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java b/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java
index 8fbfee3..74ad7bd 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java
@@ -60,8 +60,12 @@ public class TestCollectionAPI extends ReplicaPropertiesBase {
   @ShardsFixed(num = 2)
   public void test() throws Exception {
     try (CloudSolrClient client = createCloudClient(null)) {
-      CollectionAdminRequest.Create req = CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf1",2,2);
-      req.setRealtimeReplicas(1);
+      CollectionAdminRequest.Create req;
+      if (useTlogReplicas()) {
+        req = CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf1",2, 0, 1, 1);
+      } else {
+        req = CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf1",2, 1, 0, 1);
+      }
       req.setMaxShardsPerNode(2);
       client.request(req);
       createCollection(null, COLLECTION_NAME1, 1, 1, 1, client, null, "conf1");
@@ -173,7 +177,7 @@ public class TestCollectionAPI extends ReplicaPropertiesBase {
       Map<String, Object> collection = (Map<String, Object>) collections.get(COLLECTION_NAME);
       assertNotNull(collection);
       assertEquals("conf1", collection.get("configName"));
-      assertEquals("1", collection.get("realtimeReplicas"));
+//      assertEquals("1", collection.get("nrtReplicas"));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java b/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
new file mode 100644
index 0000000..cb732ff
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
@@ -0,0 +1,576 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.http.client.HttpClient;
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.CollectionStatePredicate;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.util.TestInjection;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.KeeperException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+
+@Slow
+public class TestPullReplica extends SolrCloudTestCase {
+  
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  
+  private String collectionName = null;
+  private final static int REPLICATION_TIMEOUT_SECS = 10;
+  
+  private String suggestedCollectionName() {
+    return (getTestClass().getSimpleName().replace("Test", "") + "_" + getTestName().split(" ")[0]).replaceAll("(.)(\\p{Upper})", "$1_$2").toLowerCase(Locale.ROOT);
+  }
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    TestInjection.waitForReplicasInSync = null; // We'll be explicit about this in this test
+    configureCluster(2) // 2 + random().nextInt(3) 
+        .addConfig("conf", configset("cloud-minimal"))
+        .configure();
+    Boolean useLegacyCloud = rarely();
+    LOG.info("Using legacyCloud?: {}", useLegacyCloud);
+    CollectionAdminRequest.ClusterProp clusterPropRequest = CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, String.valueOf(useLegacyCloud));
+    CollectionAdminResponse response = clusterPropRequest.process(cluster.getSolrClient());
+    assertEquals(0, response.getStatus());
+  }
+  
+  @AfterClass
+  public static void tearDownCluster() {
+    TestInjection.reset();
+  }
+  
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    collectionName = suggestedCollectionName();
+    expectThrows(SolrException.class, () -> getCollectionState(collectionName));
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    for (JettySolrRunner jetty:cluster.getJettySolrRunners()) {
+      if (!jetty.isRunning()) {
+        LOG.warn("Jetty {} not running, probably some bad test. Starting it", jetty.getLocalPort());
+        ChaosMonkey.start(jetty);
+      }
+    }
+    if (cluster.getSolrClient().getZkStateReader().getClusterState().getCollectionOrNull(collectionName) != null) {
+      LOG.info("tearDown deleting collection");
+      CollectionAdminRequest.deleteCollection(collectionName).process(cluster.getSolrClient());
+      LOG.info("Collection deleted");
+      waitForDeletion(collectionName);
+    }
+    super.tearDown();
+  }
+  
+  @Repeat(iterations=2) // 2 times to make sure cleanup is complete and we can create the same collection
+  public void testCreateDelete() throws Exception {
+    try {
+      CollectionAdminRequest.createCollection(collectionName, "conf", 2, 1, 0, 3)
+      .setMaxShardsPerNode(100)
+      .process(cluster.getSolrClient());
+      boolean reloaded = false;
+      while (true) {
+        DocCollection docCollection = getCollectionState(collectionName);
+        assertNotNull(docCollection);
+        assertEquals("Expecting 4 relpicas per shard",
+            8, docCollection.getReplicas().size());
+        assertEquals("Expecting 6 pull replicas, 3 per shard",
+            6, docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).size());
+        assertEquals("Expecting 2 writer replicas, one per shard",
+            2, docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).size());
+        for (Slice s:docCollection.getSlices()) {
+          // read-only replicas can never become leaders
+          assertFalse(s.getLeader().getType() == Replica.Type.PULL);
+          List<String> shardElectionNodes = cluster.getZkClient().getChildren(ZkStateReader.getShardLeadersElectPath(collectionName, s.getName()), null, true);
+          assertEquals("Unexpected election nodes for Shard: " + s.getName() + ": " + Arrays.toString(shardElectionNodes.toArray()), 
+              1, shardElectionNodes.size());
+        }
+        assertUlogPresence(docCollection);
+        if (reloaded) {
+          break;
+        } else {
+          // reload
+          CollectionAdminResponse response = CollectionAdminRequest.reloadCollection(collectionName)
+          .process(cluster.getSolrClient());
+          assertEquals(0, response.getStatus());
+          reloaded = true;
+        }
+      }
+    } finally {
+      zkClient().printLayoutToStdOut();
+    }
+  }
+  
+  /**
+   * Asserts that Update logs don't exist for replicas of type {@link org.apache.solr.common.cloud.Replica.Type#PULL}
+   */
+  private void assertUlogPresence(DocCollection collection) {
+    for (Slice s:collection.getSlices()) {
+      for (Replica r:s.getReplicas()) {
+        if (r.getType() == Replica.Type.NRT) {
+          continue;
+        }
+        SolrCore core = null;
+        try {
+          core = cluster.getReplicaJetty(r).getCoreContainer().getCore(r.getCoreName());
+          assertNotNull(core);
+          assertFalse("Update log should not exist for replicas of type Passive but file is present: " + core.getUlogDir(),
+              new java.io.File(core.getUlogDir()).exists());
+        } finally {
+          core.close();
+        }
+      }
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  public void testAddDocs() throws Exception {
+    int numReadOnlyReplicas = 1 + random().nextInt(3);
+    CollectionAdminRequest.createCollection(collectionName, "conf", 1, 1, 0, numReadOnlyReplicas)
+    .setMaxShardsPerNode(100)
+    .process(cluster.getSolrClient());
+    waitForState("Expected collection to be created with 1 shard and " + (numReadOnlyReplicas + 1) + " replicas", collectionName, clusterShape(1, numReadOnlyReplicas + 1));
+    DocCollection docCollection = assertNumberOfReplicas(1, 0, numReadOnlyReplicas, false, true);
+    assertEquals(1, docCollection.getSlices().size());
+    
+    cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "1", "foo", "bar"));
+    cluster.getSolrClient().commit(collectionName);
+    
+    Slice s = docCollection.getSlices().iterator().next();
+    try (HttpSolrClient leaderClient = getHttpSolrClient(s.getLeader().getCoreUrl())) {
+      assertEquals(1, leaderClient.query(new SolrQuery("*:*")).getResults().getNumFound());
+    }
+    
+    TimeOut t = new TimeOut(REPLICATION_TIMEOUT_SECS, TimeUnit.SECONDS);
+    for (Replica r:s.getReplicas(EnumSet.of(Replica.Type.PULL))) {
+      //TODO: assert replication < REPLICATION_TIMEOUT_SECS
+      try (HttpSolrClient readOnlyReplicaClient = getHttpSolrClient(r.getCoreUrl())) {
+        while (true) {
+          try {
+            assertEquals("Replica " + r.getName() + " not up to date after 10 seconds",
+                1, readOnlyReplicaClient.query(new SolrQuery("*:*")).getResults().getNumFound());
+            break;
+          } catch (AssertionError e) {
+            if (t.hasTimedOut()) {
+              throw e;
+            } else {
+              Thread.sleep(100);
+            }
+          }
+        }
+        SolrQuery req = new SolrQuery(
+            "qt", "/admin/plugins",
+            "stats", "true");
+        QueryResponse statsResponse = readOnlyReplicaClient.query(req);
+        assertEquals("Replicas shouldn't process the add document request: " + statsResponse, 
+            0L, ((Map<String, Object>)((NamedList<Object>)statsResponse.getResponse()).findRecursive("plugins", "UPDATE", "updateHandler", "stats")).get("UPDATE.updateHandler.adds"));
+      }
+    }
+    assertUlogPresence(docCollection);
+  }
+  
+  public void testAddRemovePullReplica() throws Exception {
+    CollectionAdminRequest.createCollection(collectionName, "conf", 2, 1, 0, 0)
+      .setMaxShardsPerNode(100)
+      .process(cluster.getSolrClient());
+    cluster.getSolrClient().getZkStateReader().registerCore(collectionName); //TODO: Why is this needed? see SOLR-9440
+    waitForState("Expected collection to be created with 2 shards and 1 replica each", collectionName, clusterShape(2, 1));
+    DocCollection docCollection = assertNumberOfReplicas(2, 0, 0, false, true);
+    assertEquals(2, docCollection.getSlices().size());
+    
+    CollectionAdminRequest.addReplicaToShard(collectionName, "shard1", Replica.Type.PULL).process(cluster.getSolrClient());
+    docCollection = assertNumberOfReplicas(2, 0, 1, true, false);
+    CollectionAdminRequest.addReplicaToShard(collectionName, "shard2", Replica.Type.PULL).process(cluster.getSolrClient());    
+    docCollection = assertNumberOfReplicas(2, 0, 2, true, false);
+    
+    waitForState("Expecting collection to have 2 shards and 2 replica each", collectionName, clusterShape(2, 2));
+    
+    //Delete pull replica from shard1
+    CollectionAdminRequest.deleteReplica(
+        collectionName, 
+        "shard1", 
+        docCollection.getSlice("shard1").getReplicas(EnumSet.of(Replica.Type.PULL)).get(0).getName())
+    .process(cluster.getSolrClient());
+    assertNumberOfReplicas(2, 0, 1, true, true);
+  }
+  
+  public void testRemoveAllWriterReplicas() throws Exception {
+    doTestNoLeader(true);
+  }
+  
+  public void testKillLeader() throws Exception {
+    doTestNoLeader(false);
+  }
+  
+  @Ignore("Ignore until I figure out a way to reliably record state transitions")
+  public void testPullReplicaStates() throws Exception {
+    // Validate that pull replicas go through the correct states when starting, stopping, reconnecting
+    CollectionAdminRequest.createCollection(collectionName, "conf", 1, 1, 0, 0)
+      .setMaxShardsPerNode(100)
+      .process(cluster.getSolrClient());
+//    cluster.getSolrClient().getZkStateReader().registerCore(collectionName); //TODO: Is this needed? 
+    waitForState("Replica not added", collectionName, activeReplicaCount(1, 0, 0));
+    addDocs(500);
+    List<Replica.State> statesSeen = new ArrayList<>(3);
+    cluster.getSolrClient().registerCollectionStateWatcher(collectionName, (liveNodes, collectionState) -> {
+      Replica r = collectionState.getSlice("shard1").getReplica("core_node2");
+      LOG.info("CollectionStateWatcher state change: {}", r);
+      if (r == null) {
+        return false;
+      }
+      statesSeen.add(r.getState());
+      LOG.info("CollectionStateWatcher saw state: {}", r.getState());
+      return r.getState() == Replica.State.ACTIVE;
+    });
+    CollectionAdminRequest.addReplicaToShard(collectionName, "shard1", Replica.Type.PULL).process(cluster.getSolrClient());
+    waitForState("Replica not added", collectionName, activeReplicaCount(1, 0, 1));
+    zkClient().printLayoutToStdOut();
+    LOG.info("Saw states: " + Arrays.toString(statesSeen.toArray()));
+    assertEquals("Expecting DOWN->RECOVERING->ACTIVE but saw: " + Arrays.toString(statesSeen.toArray()), 3, statesSeen.size());
+    assertEquals("Expecting DOWN->RECOVERING->ACTIVE but saw: " + Arrays.toString(statesSeen.toArray()), Replica.State.DOWN, statesSeen.get(0));
+    assertEquals("Expecting DOWN->RECOVERING->ACTIVE but saw: " + Arrays.toString(statesSeen.toArray()), Replica.State.RECOVERING, statesSeen.get(0));
+    assertEquals("Expecting DOWN->RECOVERING->ACTIVE but saw: " + Arrays.toString(statesSeen.toArray()), Replica.State.ACTIVE, statesSeen.get(0));
+  }
+  
+  public void testRealTimeGet() throws SolrServerException, IOException, KeeperException, InterruptedException {
+    // should be redirected to Replica.Type.NRT
+    int numReplicas = random().nextBoolean()?1:2;
+    CollectionAdminRequest.createCollection(collectionName, "conf", 1, numReplicas, 0, numReplicas)
+      .setMaxShardsPerNode(100)
+      .process(cluster.getSolrClient());
+    waitForState("Unexpected replica count", collectionName, activeReplicaCount(numReplicas, 0, numReplicas));
+    DocCollection docCollection = assertNumberOfReplicas(numReplicas, 0, numReplicas, false, true);
+    HttpClient httpClient = cluster.getSolrClient().getHttpClient();
+    int id = 0;
+    Slice slice = docCollection.getSlice("shard1");
+    List<String> ids = new ArrayList<>(slice.getReplicas().size());
+    for (Replica rAdd:slice.getReplicas()) {
+      try (HttpSolrClient client = getHttpSolrClient(rAdd.getCoreUrl(), httpClient)) {
+        client.add(new SolrInputDocument("id", String.valueOf(id), "foo_s", "bar"));
+      }
+      SolrDocument docCloudClient = cluster.getSolrClient().getById(collectionName, String.valueOf(id));
+      assertEquals("bar", docCloudClient.getFieldValue("foo_s"));
+      for (Replica rGet:slice.getReplicas()) {
+        try (HttpSolrClient client = getHttpSolrClient(rGet.getCoreUrl(), httpClient)) {
+          SolrDocument doc = client.getById(String.valueOf(id));
+          assertEquals("bar", doc.getFieldValue("foo_s"));
+        }
+      }
+      ids.add(String.valueOf(id));
+      id++;
+    }
+    SolrDocumentList previousAllIdsResult = null;
+    for (Replica rAdd:slice.getReplicas()) {
+      try (HttpSolrClient client = getHttpSolrClient(rAdd.getCoreUrl(), httpClient)) {
+        SolrDocumentList allIdsResult = client.getById(ids);
+        if (previousAllIdsResult != null) {
+          assertTrue(compareSolrDocumentList(previousAllIdsResult, allIdsResult));
+        } else {
+          // set the first response here
+          previousAllIdsResult = allIdsResult;
+          assertEquals("Unexpected number of documents", ids.size(), allIdsResult.getNumFound());
+        }
+      }
+      id++;
+    }
+  }
+  
+  /*
+   * validate that replication still happens on a new leader
+   */
+  private void doTestNoLeader(boolean removeReplica) throws Exception {
+    CollectionAdminRequest.createCollection(collectionName, "conf", 1, 1, 0, 1)
+      .setMaxShardsPerNode(100)
+      .process(cluster.getSolrClient());
+    cluster.getSolrClient().getZkStateReader().registerCore(collectionName); //TODO: Why is this needed? see SOLR-9440 
+    waitForState("Expected collection to be created with 1 shard and 2 replicas", collectionName, clusterShape(1, 2));
+    DocCollection docCollection = assertNumberOfReplicas(1, 0, 1, false, true);
+    
+    // Add a document and commit
+    cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "1", "foo", "bar"));
+    cluster.getSolrClient().commit(collectionName);
+    Slice s = docCollection.getSlices().iterator().next();
+    try (HttpSolrClient leaderClient = getHttpSolrClient(s.getLeader().getCoreUrl())) {
+      assertEquals(1, leaderClient.query(new SolrQuery("*:*")).getResults().getNumFound());
+    }
+    
+    waitForNumDocsInAllReplicas(1, docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)));
+    
+    // Delete leader replica from shard1
+    ignoreException("No registered leader was found"); //These are expected
+    JettySolrRunner leaderJetty = null;
+    if (removeReplica) {
+      CollectionAdminRequest.deleteReplica(
+          collectionName, 
+          "shard1", 
+          s.getLeader().getName())
+      .process(cluster.getSolrClient());
+    } else {
+      leaderJetty = cluster.getReplicaJetty(s.getLeader());
+      ChaosMonkey.kill(leaderJetty);
+      waitForState("Leader replica not removed", collectionName, clusterShape(1, 1));
+      // Wait for cluster state to be updated
+      waitForState("Replica state not updated in cluster state", 
+          collectionName, clusterStateReflectsActiveAndDownReplicas());
+    }
+    docCollection = assertNumberOfReplicas(0, 0, 1, true, true);
+    
+    // Check that there is no leader for the shard
+    Replica leader = docCollection.getSlice("shard1").getLeader();
+    assertTrue(leader == null || !leader.isActive(cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes()));
+    
+    // Pull replica on the other hand should be active
+    Replica pullReplica = docCollection.getSlice("shard1").getReplicas(EnumSet.of(Replica.Type.PULL)).get(0);
+    assertTrue(pullReplica.isActive(cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes()));
+
+    // add document, this should fail since there is no leader. Pull replica should not accept the update
+    expectThrows(SolrException.class, () -> 
+      cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "zoo"))
+    );
+    
+    // Also fails if I send the update to the pull replica explicitly
+    try (HttpSolrClient pullReplicaClient = getHttpSolrClient(docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0).getCoreUrl())) {
+      expectThrows(SolrException.class, () -> 
+        cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "zoo"))
+      );
+    }
+    
+    // Queries should still work
+    waitForNumDocsInAllReplicas(1, docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)));
+    // Add nrt replica back. Since there is no nrt now, new nrt will have no docs. There will be data loss, since the it will become the leader
+    // and pull replicas will replicate from it. Maybe we want to change this. Replicate from pull replicas is not a good idea, since they
+    // are by definition out of date.
+    if (removeReplica) {
+      CollectionAdminRequest.addReplicaToShard(collectionName, "shard1", Replica.Type.NRT).process(cluster.getSolrClient());
+    } else {
+      ChaosMonkey.start(leaderJetty);
+    }
+    waitForState("Expected collection to be 1x2", collectionName, clusterShape(1, 2));
+    unIgnoreException("No registered leader was found"); // Should have a leader from now on
+
+    // Validate that the new nrt replica is the leader now
+    cluster.getSolrClient().getZkStateReader().forceUpdateCollection(collectionName);
+    docCollection = getCollectionState(collectionName);
+    leader = docCollection.getSlice("shard1").getLeader();
+    assertTrue(leader != null && leader.isActive(cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes()));
+
+    // If jetty is restarted, the replication is not forced, and replica doesn't replicate from leader until new docs are added. Is this the correct behavior? Why should these two cases be different?
+    if (removeReplica) {
+      // Pull replicas will replicate the empty index if a new replica was added and becomes leader
+      waitForNumDocsInAllReplicas(0, docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)));
+    }
+    
+    // add docs agin
+    cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "zoo"));
+    s = docCollection.getSlices().iterator().next();
+    try (HttpSolrClient leaderClient = getHttpSolrClient(s.getLeader().getCoreUrl())) {
+      leaderClient.commit();
+      assertEquals(1, leaderClient.query(new SolrQuery("*:*")).getResults().getNumFound());
+    }
+    waitForNumDocsInAllReplicas(1, docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)), "id:2");
+    waitForNumDocsInAllReplicas(1, docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)));
+  }
+  
+  public void testKillPullReplica() throws Exception {
+    CollectionAdminRequest.createCollection(collectionName, "conf", 1, 1, 0, 1)
+      .setMaxShardsPerNode(100)
+      .process(cluster.getSolrClient());
+//    cluster.getSolrClient().getZkStateReader().registerCore(collectionName); //TODO: Is this needed? 
+    waitForState("Expected collection to be created with 1 shard and 2 replicas", collectionName, clusterShape(1, 2));
+    DocCollection docCollection = assertNumberOfReplicas(1, 0, 1, false, true);
+    assertEquals(1, docCollection.getSlices().size());
+    
+    waitForNumDocsInAllActiveReplicas(0);
+    cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "1", "foo", "bar"));
+    cluster.getSolrClient().commit(collectionName);
+    waitForNumDocsInAllActiveReplicas(1);
+    
+    JettySolrRunner pullReplicaJetty = cluster.getReplicaJetty(docCollection.getSlice("shard1").getReplicas(EnumSet.of(Replica.Type.PULL)).get(0));
+    ChaosMonkey.kill(pullReplicaJetty);
+    waitForState("Replica not removed", collectionName, activeReplicaCount(1, 0, 0));
+    // Also wait for the replica to be placed in state="down"
+    waitForState("Didn't update state", collectionName, clusterStateReflectsActiveAndDownReplicas());
+    
+    cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "bar"));
+    cluster.getSolrClient().commit(collectionName);
+    waitForNumDocsInAllActiveReplicas(2);
+    
+    ChaosMonkey.start(pullReplicaJetty);
+    waitForState("Replica not added", collectionName, activeReplicaCount(1, 0, 1));
+    waitForNumDocsInAllActiveReplicas(2);
+  }
+  
+  public void testSearchWhileReplicationHappens() {
+      
+  }
+  
+  private void waitForNumDocsInAllActiveReplicas(int numDocs) throws IOException, SolrServerException, InterruptedException {
+    DocCollection docCollection = getCollectionState(collectionName);
+    waitForNumDocsInAllReplicas(numDocs, docCollection.getReplicas().stream().filter(r -> r.getState() == Replica.State.ACTIVE).collect(Collectors.toList()));
+  }
+    
+  private void waitForNumDocsInAllReplicas(int numDocs, Collection<Replica> replicas) throws IOException, SolrServerException, InterruptedException {
+    waitForNumDocsInAllReplicas(numDocs, replicas, "*:*");
+  }
+  
+  private void waitForNumDocsInAllReplicas(int numDocs, Collection<Replica> replicas, String query) throws IOException, SolrServerException, InterruptedException {
+    TimeOut t = new TimeOut(REPLICATION_TIMEOUT_SECS, TimeUnit.SECONDS);
+    for (Replica r:replicas) {
+      try (HttpSolrClient replicaClient = getHttpSolrClient(r.getCoreUrl())) {
+        while (true) {
+          try {
+            assertEquals("Replica " + r.getName() + " not up to date after " + REPLICATION_TIMEOUT_SECS + " seconds",
+                numDocs, replicaClient.query(new SolrQuery(query)).getResults().getNumFound());
+            break;
+          } catch (AssertionError e) {
+            if (t.hasTimedOut()) {
+              throw e;
+            } else {
+              Thread.sleep(100);
+            }
+          }
+        }
+      }
+    }
+  }
+  
+  private void waitForDeletion(String collection) throws InterruptedException, KeeperException {
+    TimeOut t = new TimeOut(10, TimeUnit.SECONDS);
+    while (cluster.getSolrClient().getZkStateReader().getClusterState().hasCollection(collection)) {
+      LOG.info("Collection not yet deleted");
+      try {
+        Thread.sleep(100);
+        if (t.hasTimedOut()) {
+          fail("Timed out waiting for collection " + collection + " to be deleted.");
+        }
+        cluster.getSolrClient().getZkStateReader().forceUpdateCollection(collection);
+      } catch(SolrException e) {
+        return;
+      }
+      
+    }
+  }
+  
+  private DocCollection assertNumberOfReplicas(int numNrtReplicas, int numTlogReplicas, int numPullReplicas, boolean updateCollection, boolean activeOnly) throws KeeperException, InterruptedException {
+    if (updateCollection) {
+      cluster.getSolrClient().getZkStateReader().forceUpdateCollection(collectionName);
+    }
+    DocCollection docCollection = getCollectionState(collectionName);
+    assertNotNull(docCollection);
+    assertEquals("Unexpected number of writer replicas: " + docCollection, numNrtReplicas, 
+        docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+    assertEquals("Unexpected number of pull replicas: " + docCollection, numPullReplicas, 
+        docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+    assertEquals("Unexpected number of active replicas: " + docCollection, numTlogReplicas, 
+        docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+    return docCollection;
+  }
+  
+  /*
+   * passes only if all replicas are active or down, and the "liveNodes" reflect the same status
+   */
+  private CollectionStatePredicate clusterStateReflectsActiveAndDownReplicas() {
+    return (liveNodes, collectionState) -> {
+      for (Replica r:collectionState.getReplicas()) {
+        if (r.getState() != Replica.State.DOWN && r.getState() != Replica.State.ACTIVE) {
+          return false;
+        }
+        if (r.getState() == Replica.State.DOWN && liveNodes.contains(r.getNodeName())) {
+          return false;
+        }
+        if (r.getState() == Replica.State.ACTIVE && !liveNodes.contains(r.getNodeName())) {
+          return false;
+        }
+      }
+      return true;
+    };
+  }
+  
+  
+  private CollectionStatePredicate activeReplicaCount(int numNrtReplicas, int numTlogReplicas, int numPullReplicas) {
+    return (liveNodes, collectionState) -> {
+      int nrtFound = 0, tlogFound = 0, pullFound = 0;
+      if (collectionState == null)
+        return false;
+      for (Slice slice : collectionState) {
+        for (Replica replica : slice) {
+          if (replica.isActive(liveNodes))
+            switch (replica.getType()) {
+              case TLOG:
+                tlogFound++;
+                break;
+              case PULL:
+                pullFound++;
+                break;
+              case NRT:
+                nrtFound++;
+                break;
+              default:
+                throw new AssertionError("Unexpected replica type");
+            }
+        }
+      }
+      return numNrtReplicas == nrtFound && numTlogReplicas == tlogFound && numPullReplicas == pullFound;
+    };
+  }
+  
+  private void addDocs(int numDocs) throws SolrServerException, IOException {
+    List<SolrInputDocument> docs = new ArrayList<>(numDocs);
+    for (int i = 0; i < numDocs; i++) {
+      docs.add(new SolrInputDocument("id", String.valueOf(i), "fieldName_s", String.valueOf(i)));
+    }
+    cluster.getSolrClient().add(collectionName, docs);
+    cluster.getSolrClient().commit(collectionName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/TestPullReplicaErrorHandling.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestPullReplicaErrorHandling.java b/solr/core/src/test/org/apache/solr/cloud/TestPullReplicaErrorHandling.java
new file mode 100644
index 0000000..6a22d99
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/TestPullReplicaErrorHandling.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.CollectionStatePredicate;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.util.TestInjection;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.KeeperException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
+public class TestPullReplicaErrorHandling extends SolrCloudTestCase {
+  
+  private final static int REPLICATION_TIMEOUT_SECS = 10;
+  
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static Map<URI, SocketProxy> proxies;
+  private static Map<URI, JettySolrRunner> jettys;
+
+  private String collectionName = null;
+  
+  private String suggestedCollectionName() {
+    return (getTestClass().getSimpleName().replace("Test", "") + "_" + getTestName().split(" ")[0]).replaceAll("(.)(\\p{Upper})", "$1_$2").toLowerCase(Locale.ROOT);
+  }
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    TestInjection.waitForReplicasInSync = null; // We'll be explicit about this in this test
+    configureCluster(4) 
+        .addConfig("conf", configset("cloud-minimal"))
+        .configure();
+    // Add proxies
+    proxies = new HashMap<>(cluster.getJettySolrRunners().size());
+    jettys = new HashMap<>(cluster.getJettySolrRunners().size());
+    for (JettySolrRunner jetty:cluster.getJettySolrRunners()) {
+      SocketProxy proxy = new SocketProxy();
+      jetty.setProxyPort(proxy.getListenPort());
+      cluster.stopJettySolrRunner(jetty);//TODO: Can we avoid this restart
+      cluster.startJettySolrRunner(jetty);
+      proxy.open(jetty.getBaseUrl().toURI());
+      LOG.info("Adding proxy for URL: " + jetty.getBaseUrl() + ". Proxy: " + proxy.getUrl());
+      proxies.put(proxy.getUrl(), proxy);
+      jettys.put(proxy.getUrl(), jetty);
+    }
+    TimeOut t = new TimeOut(10, TimeUnit.SECONDS);
+    while (true) {
+      try {
+        CollectionAdminRequest.ClusterProp clusterPropRequest = CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "false");
+        CollectionAdminResponse response = clusterPropRequest.process(cluster.getSolrClient());
+        assertEquals(0, response.getStatus());
+        break;
+      } catch (SolrServerException e) {
+        Thread.sleep(50);
+        if (t.hasTimedOut()) {
+          throw e;
+        }
+      }
+    }
+  }
+  
+  @AfterClass
+  public static void tearDownCluster() throws Exception {
+    for (SocketProxy proxy:proxies.values()) {
+      proxy.close();
+    }
+    proxies = null;
+    jettys = null;
+    TestInjection.reset();
+  }
+  
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    collectionName = suggestedCollectionName();
+    expectThrows(SolrException.class, () -> getCollectionState(collectionName));
+    cluster.getSolrClient().setDefaultCollection(collectionName);
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    if (cluster.getSolrClient().getZkStateReader().getClusterState().getCollectionOrNull(collectionName) != null) {
+      LOG.info("tearDown deleting collection");
+      CollectionAdminRequest.deleteCollection(collectionName).process(cluster.getSolrClient());
+      LOG.info("Collection deleted");
+      waitForDeletion(collectionName);
+    }
+    collectionName = null;
+    super.tearDown();
+  }
+  
+//  @Repeat(iterations=10)
+  public void testCantConnectToPullReplica() throws Exception {
+    int numShards = 2;
+    CollectionAdminRequest.createCollection(collectionName, "conf", numShards, 1, 0, 1)
+      .setMaxShardsPerNode(1)
+      .process(cluster.getSolrClient());
+    addDocs(10);
+    DocCollection docCollection = assertNumberOfReplicas(numShards, 0, numShards, false, true);
+    Slice s = docCollection.getSlices().iterator().next();
+    SocketProxy proxy = getProxyForReplica(s.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0));
+    try {
+      proxy.close();
+      for (int i = 1; i <= 10; i ++) {
+        addDocs(10 + i);
+        try (HttpSolrClient leaderClient = getHttpSolrClient(s.getLeader().getCoreUrl())) {
+          assertNumDocs(10 + i, leaderClient);
+        }
+      }
+      try (HttpSolrClient pullReplicaClient = getHttpSolrClient(s.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0).getCoreUrl())) {
+        pullReplicaClient.query(new SolrQuery("*:*")).getResults().getNumFound();
+        fail("Shouldn't be able to query the pull replica");
+      } catch (SolrServerException e) {
+        //expected
+      }
+      assertNumberOfReplicas(numShards, 0, numShards, true, true);// Replica should still be active, since it doesn't disconnect from ZooKeeper
+      {
+        long numFound = 0;
+        TimeOut t = new TimeOut(REPLICATION_TIMEOUT_SECS, TimeUnit.SECONDS);
+        while (numFound < 20 && !t.hasTimedOut()) {
+          Thread.sleep(200);
+          numFound = cluster.getSolrClient().query(collectionName, new SolrQuery("*:*")).getResults().getNumFound();
+        }
+      }
+    } finally {
+      proxy.reopen();
+    }
+    
+    try (HttpSolrClient pullReplicaClient = getHttpSolrClient(s.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0).getCoreUrl())) {
+      assertNumDocs(20, pullReplicaClient);
+    }
+  }
+  
+  public void testCantConnectToLeader() throws Exception {
+    int numShards = 1;
+    CollectionAdminRequest.createCollection(collectionName, "conf", numShards, 1, 0, 1)
+      .setMaxShardsPerNode(1)
+      .process(cluster.getSolrClient());
+    addDocs(10);
+    DocCollection docCollection = assertNumberOfReplicas(numShards, 0, numShards, false, true);
+    Slice s = docCollection.getSlices().iterator().next();
+    SocketProxy proxy = getProxyForReplica(s.getLeader());
+    try {
+      // wait for replication
+      try (HttpSolrClient pullReplicaClient = getHttpSolrClient(s.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0).getCoreUrl())) {
+        assertNumDocs(10, pullReplicaClient);
+      }
+      proxy.close();
+      expectThrows(SolrException.class, ()->addDocs(1));
+      try (HttpSolrClient pullReplicaClient = getHttpSolrClient(s.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0).getCoreUrl())) {
+        assertNumDocs(10, pullReplicaClient);
+      }
+      assertNumDocs(10, cluster.getSolrClient());
+    } finally {
+      LOG.info("Opening leader node");
+      proxy.reopen();
+    }
+//     Back to normal
+//    Even if the leader is back to normal, the replica can get broken pipe for some time when trying to connect to it. The commit
+//    can fail if it's sent to the replica and it forwards it to the leader, and since it uses CUSC the error is hidden! That breaks
+//    the last part of this test.
+//    addDocs(20);
+//    assertNumDocs(20, cluster.getSolrClient(), 300);
+//    try (HttpSolrClient pullReplicaClient = getHttpSolrClient(s.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0).getCoreUrl())) {
+//      assertNumDocs(20, pullReplicaClient);
+//    }
+  }
+  
+  public void testPullReplicaDisconnectsFromZooKeeper() throws Exception {
+    int numShards = 1;
+    CollectionAdminRequest.createCollection(collectionName, "conf", numShards, 1, 0, 1)
+      .setMaxShardsPerNode(1)
+      .process(cluster.getSolrClient());
+    addDocs(10);
+    DocCollection docCollection = assertNumberOfReplicas(numShards, 0, numShards, false, true);
+    Slice s = docCollection.getSlices().iterator().next();
+    try (HttpSolrClient pullReplicaClient = getHttpSolrClient(s.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0).getCoreUrl())) {
+      assertNumDocs(10, pullReplicaClient);
+    }
+    addDocs(20);
+    JettySolrRunner jetty = getJettyForReplica(s.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0));
+    cluster.expireZkSession(jetty);
+    addDocs(30);
+    waitForState("Expecting node to be disconnected", collectionName, activeReplicaCount(1, 0, 0));
+    addDocs(40);
+    waitForState("Expecting node to be disconnected", collectionName, activeReplicaCount(1, 0, 1));
+    try (HttpSolrClient pullReplicaClient = getHttpSolrClient(s.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0).getCoreUrl())) {
+      assertNumDocs(40, pullReplicaClient);
+    }
+  }
+  
+  private void assertNumDocs(int numDocs, SolrClient client, int timeoutSecs) throws InterruptedException, SolrServerException, IOException {
+    TimeOut t = new TimeOut(timeoutSecs, TimeUnit.SECONDS);
+    long numFound = -1;
+    while (!t.hasTimedOut()) {
+      Thread.sleep(200);
+      numFound = client.query(new SolrQuery("*:*")).getResults().getNumFound();
+      if (numFound == numDocs) {
+        return;
+      }
+    }
+    fail("Didn't get expected doc count. Expected: " + numDocs + ", Found: " + numFound);
+  }
+  
+  
+  private void assertNumDocs(int numDocs, SolrClient client) throws InterruptedException, SolrServerException, IOException {
+    assertNumDocs(numDocs, client, REPLICATION_TIMEOUT_SECS);
+  }
+
+  private void addDocs(int numDocs) throws SolrServerException, IOException {
+    List<SolrInputDocument> docs = new ArrayList<>(numDocs);
+    for (int i = 0; i < numDocs; i++) {
+      docs.add(new SolrInputDocument("id", String.valueOf(i), "fieldName_s", String.valueOf(i)));
+    }
+    cluster.getSolrClient().add(collectionName, docs);
+    cluster.getSolrClient().commit(collectionName);
+  }
+  
+  private DocCollection assertNumberOfReplicas(int numWriter, int numActive, int numPassive, boolean updateCollection, boolean activeOnly) throws KeeperException, InterruptedException {
+    if (updateCollection) {
+      cluster.getSolrClient().getZkStateReader().forceUpdateCollection(collectionName);
+    }
+    DocCollection docCollection = getCollectionState(collectionName);
+    assertNotNull(docCollection);
+    assertEquals("Unexpected number of writer replicas: " + docCollection, numWriter, 
+        docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+    assertEquals("Unexpected number of pull replicas: " + docCollection, numPassive, 
+        docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+    assertEquals("Unexpected number of active replicas: " + docCollection, numActive, 
+        docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count());
+    return docCollection;
+  }
+  
+  protected JettySolrRunner getJettyForReplica(Replica replica) throws Exception {
+    String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
+    assertNotNull(replicaBaseUrl);
+    URL baseUrl = new URL(replicaBaseUrl);
+
+    JettySolrRunner proxy = jettys.get(baseUrl.toURI());
+    assertNotNull("No proxy found for " + baseUrl + "!", proxy);
+    return proxy;
+  }  
+  
+  protected SocketProxy getProxyForReplica(Replica replica) throws Exception {
+    String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
+    assertNotNull(replicaBaseUrl);
+    URL baseUrl = new URL(replicaBaseUrl);
+
+    SocketProxy proxy = proxies.get(baseUrl.toURI());
+    if (proxy == null && !baseUrl.toExternalForm().endsWith("/")) {
+      baseUrl = new URL(baseUrl.toExternalForm() + "/");
+      proxy = proxies.get(baseUrl.toURI());
+    }
+    assertNotNull("No proxy found for " + baseUrl + "!", proxy);
+    return proxy;
+  }
+  
+  private void waitForDeletion(String collection) throws InterruptedException, KeeperException {
+    TimeOut t = new TimeOut(10, TimeUnit.SECONDS);
+    while (cluster.getSolrClient().getZkStateReader().getClusterState().hasCollection(collection)) {
+      LOG.info("Collection not yet deleted");
+      try {
+        Thread.sleep(100);
+        if (t.hasTimedOut()) {
+          fail("Timed out waiting for collection " + collection + " to be deleted.");
+        }
+        cluster.getSolrClient().getZkStateReader().forceUpdateCollection(collection);
+      } catch(SolrException e) {
+        return;
+      }
+      
+    }
+  }
+  
+  private CollectionStatePredicate activeReplicaCount(int numWriter, int numActive, int numPassive) {
+    return (liveNodes, collectionState) -> {
+      int writersFound = 0, activesFound = 0, passivesFound = 0;
+      if (collectionState == null)
+        return false;
+      for (Slice slice : collectionState) {
+        for (Replica replica : slice) {
+          if (replica.isActive(liveNodes))
+            switch (replica.getType()) {
+              case TLOG:
+                activesFound++;
+                break;
+              case PULL:
+                passivesFound++;
+                break;
+              case NRT:
+                writersFound++;
+                break;
+              default:
+                throw new AssertionError("Unexpected replica type");
+            }
+        }
+      }
+      return numWriter == writersFound && numActive == activesFound && numPassive == passivesFound;
+    };
+  }
+
+}