You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by tf...@apache.org on 2017/05/23 03:00:55 UTC

[4/5] lucene-solr:master: SOLR-10233: Add support for replica types

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index 96e505a..7d15701 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -178,11 +178,13 @@ public class IndexFetcher {
     public static final IndexFetchResult INDEX_FETCH_FAILURE = new IndexFetchResult("Fetching lastest index is failed", false, null);
     public static final IndexFetchResult INDEX_FETCH_SUCCESS = new IndexFetchResult("Fetching latest index is successful", true, null);
     public static final IndexFetchResult LOCK_OBTAIN_FAILED = new IndexFetchResult("Obtaining SnapPuller lock failed", false, null);
+    public static final IndexFetchResult CONTAINER_IS_SHUTTING_DOWN = new IndexFetchResult("I was asked to replicate but CoreContainer is shutting down", false, null);
     public static final IndexFetchResult MASTER_VERSION_ZERO = new IndexFetchResult("Index in peer is empty and never committed yet", true, null);
     public static final IndexFetchResult NO_INDEX_COMMIT_EXIST = new IndexFetchResult("No IndexCommit in local index", false, null);
     public static final IndexFetchResult PEER_INDEX_COMMIT_DELETED = new IndexFetchResult("No files to download because IndexCommit in peer was deleted", false, null);
     public static final IndexFetchResult LOCAL_ACTIVITY_DURING_REPLICATION = new IndexFetchResult("Local index modification during replication", false, null);
     public static final IndexFetchResult EXPECTING_NON_LEADER = new IndexFetchResult("Replicating from leader but I'm the shard leader", false, null);
+    public static final IndexFetchResult LEADER_IS_NOT_ACTIVE = new IndexFetchResult("Replicating from leader but leader is not active", false, null);
 
     IndexFetchResult(String message, boolean successful, Throwable exception) {
       this.message = message;
@@ -352,17 +354,32 @@ public class IndexFetcher {
       // when we are a bit more confident we may want to try a partial replication
       // if the error is connection related or something, but we have to be careful
       forceReplication = true;
+      LOG.info("Last replication failed, so I'll force replication");
     }
 
     try {
       if (fetchFromLeader) {
+        assert !solrCore.isClosed(): "Replication should be stopped before closing the core";
         Replica replica = getLeaderReplica();
         CloudDescriptor cd = solrCore.getCoreDescriptor().getCloudDescriptor();
         if (cd.getCoreNodeName().equals(replica.getName())) {
           return IndexFetchResult.EXPECTING_NON_LEADER;
         }
-        masterUrl = replica.getCoreUrl();
-        LOG.info("Updated masterUrl to " + masterUrl);
+        if (replica.getState() != Replica.State.ACTIVE) {
+          LOG.info("Replica {} is leader but it's state is {}, skipping replication", replica.getName(), replica.getState());
+          return IndexFetchResult.LEADER_IS_NOT_ACTIVE;
+        }
+        if (!solrCore.getCoreContainer().getZkController().getClusterState().liveNodesContain(replica.getNodeName())) {
+          LOG.info("Replica {} is leader but it's not hosted on a live node, skipping replication", replica.getName());
+          return IndexFetchResult.LEADER_IS_NOT_ACTIVE;
+        }
+        if (!replica.getCoreUrl().equals(masterUrl)) {
+          masterUrl = replica.getCoreUrl();
+          LOG.info("Updated masterUrl to {}", masterUrl);
+          // TODO: Do we need to set forceReplication = true?
+        } else {
+          LOG.debug("masterUrl didn't change");
+        }
       }
       //get the current 'replicateable' index version in the master
       NamedList response;
@@ -410,6 +427,7 @@ public class IndexFetcher {
         if (forceReplication && commit.getGeneration() != 0) {
           // since we won't get the files for an empty index,
           // we just clear ours and commit
+          LOG.info("New index in Master. Deleting mine...");
           RefCounted<IndexWriter> iw = solrCore.getUpdateHandler().getSolrCoreState().getIndexWriter(solrCore);
           try {
             iw.get().deleteAll();
@@ -422,6 +440,7 @@ public class IndexFetcher {
 
         //there is nothing to be replicated
         successfulInstall = true;
+        LOG.debug("Nothing to replicate, master's version is 0");
         return IndexFetchResult.MASTER_VERSION_ZERO;
       }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/handler/RealTimeGetHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/RealTimeGetHandler.java b/solr/core/src/java/org/apache/solr/handler/RealTimeGetHandler.java
index bce374f..9f2b693 100644
--- a/solr/core/src/java/org/apache/solr/handler/RealTimeGetHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/RealTimeGetHandler.java
@@ -16,14 +16,18 @@
  */
 package org.apache.solr.handler;
 
-import org.apache.solr.api.Api;
-import org.apache.solr.api.ApiBag;
-import org.apache.solr.handler.component.*;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
+import org.apache.solr.api.Api;
+import org.apache.solr.api.ApiBag;
+import org.apache.solr.handler.component.HttpShardHandler;
+import org.apache.solr.handler.component.RealTimeGetComponent;
+import org.apache.solr.handler.component.SearchHandler;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+
 
 public class RealTimeGetHandler extends SearchHandler {
   @Override
@@ -33,6 +37,14 @@ public class RealTimeGetHandler extends SearchHandler {
     names.add(RealTimeGetComponent.COMPONENT_NAME);
     return names;
   }
+  
+  
+  @Override
+  public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
+    // Tell HttpShardHandlerthat this request should only be distributed to NRT replicas
+    req.getContext().put(HttpShardHandler.ONLY_NRT_REPLICAS, Boolean.TRUE);
+    super.handleRequestBody(req, rsp);
+  }
 
   //////////////////////// SolrInfoMBeans methods //////////////////////
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
index 94ff189..f3dcdeb 100644
--- a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
@@ -403,6 +403,10 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
     String masterUrl = solrParams == null ? null : solrParams.get(MASTER_URL);
     if (!indexFetchLock.tryLock())
       return IndexFetchResult.LOCK_OBTAIN_FAILED;
+    if (core.getCoreContainer().isShutDown()) {
+      LOG.warn("I was asked to replicate but CoreContainer is shutting down");
+      return IndexFetchResult.CONTAINER_IS_SHUTTING_DOWN; 
+    }
     try {
       if (masterUrl != null) {
         if (currentIndexFetcher != null && currentIndexFetcher != pollingIndexFetcher) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 02d8e8f..b05dd84 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -114,15 +114,7 @@ import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
 import static org.apache.solr.common.cloud.DocCollection.RULE;
 import static org.apache.solr.common.cloud.DocCollection.SNITCH;
 import static org.apache.solr.common.cloud.DocCollection.STATE_FORMAT;
-import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
-import static org.apache.solr.common.cloud.ZkStateReader.REALTIME_REPLICAS;
-import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.*;
 import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
 import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
@@ -408,7 +400,9 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
           AUTO_ADD_REPLICAS,
           RULE,
           SNITCH,
-          REALTIME_REPLICAS);
+          PULL_REPLICAS,
+          TLOG_REPLICAS,
+          NRT_REPLICAS);
 
       if (props.get(STATE_FORMAT) == null) {
         props.put(STATE_FORMAT, "2");
@@ -635,7 +629,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
           _ROUTE_,
           CoreAdminParams.NAME,
           INSTANCE_DIR,
-          DATA_DIR);
+          DATA_DIR,
+          REPLICA_TYPE);
       return copyPropertiesWithPrefix(req.getParams(), props, COLL_PROP_PREFIX);
     }),
     OVERSEERSTATUS_OP(OVERSEERSTATUS, (req, rsp, h) -> (Map) new LinkedHashMap<>()),

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
index 6746332..1710da9 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
@@ -227,6 +227,7 @@ public class CoreAdminHandler extends RequestHandlerBase implements PermissionNa
       .put(CoreAdminParams.ROLES, CoreDescriptor.CORE_ROLES)
       .put(CoreAdminParams.CORE_NODE_NAME, CoreDescriptor.CORE_NODE_NAME)
       .put(ZkStateReader.NUM_SHARDS_PROP, CloudDescriptor.NUM_SHARDS)
+      .put(CoreAdminParams.REPLICA_TYPE, CloudDescriptor.REPLICA_TYPE)
       .build();
 
   protected static Map<String, String> buildCoreParams(SolrParams params) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
index 0c2c903..748982d 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
@@ -18,6 +18,7 @@
 package org.apache.solr.handler.admin;
 
 import java.lang.invoke.MethodHandles;
+import java.util.Objects;
 
 import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.solr.cloud.CloudDescriptor;
@@ -63,18 +64,20 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
     Boolean onlyIfLeader = params.getBool("onlyIfLeader");
     Boolean onlyIfLeaderActive = params.getBool("onlyIfLeaderActive");
 
-    log.info("Going to wait for coreNodeName: " + coreNodeName + ", state: " + waitForState
-        + ", checkLive: " + checkLive + ", onlyIfLeader: " + onlyIfLeader
-        + ", onlyIfLeaderActive: " + onlyIfLeaderActive);
 
-    int maxTries = 0;
+    CoreContainer coreContainer = it.handler.coreContainer;
+    // wait long enough for the leader conflict to work itself out plus a little extra
+    int conflictWaitMs = coreContainer.getZkController().getLeaderConflictResolveWait();
+    int maxTries = (int) Math.round(conflictWaitMs / 1000) + 3;
+    log.info("Going to wait for coreNodeName: {}, state: {}, checkLive: {}, onlyIfLeader: {}, onlyIfLeaderActive: {}, maxTime: {} s",
+        coreNodeName, waitForState, checkLive, onlyIfLeader, onlyIfLeaderActive, maxTries);
+    
     Replica.State state = null;
     boolean live = false;
     int retry = 0;
     while (true) {
-      CoreContainer coreContainer = it.handler.coreContainer;
       try (SolrCore core = coreContainer.getCore(cname)) {
-        if (core == null && retry == 30) {
+        if (core == null && retry == Math.min(30, maxTries)) {
           throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "core not found:"
               + cname);
         }
@@ -102,15 +105,6 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
             coreContainer.getZkController().getZkStateReader().forceUpdateCollection(collectionName);
           }
 
-          if (maxTries == 0) {
-            // wait long enough for the leader conflict to work itself out plus a little extra
-            int conflictWaitMs = coreContainer.getZkController().getLeaderConflictResolveWait();
-            maxTries = (int) Math.round(conflictWaitMs / 1000) + 3;
-            log.info("Will wait a max of " + maxTries + " seconds to see " + cname + " (" +
-                cloudDescriptor.getShardId() + " of " +
-                cloudDescriptor.getCollectionName() + ") have state: " + waitForState);
-          }
-
           ClusterState clusterState = coreContainer.getZkController().getClusterState();
           DocCollection collection = clusterState.getCollection(collectionName);
           Slice slice = collection.getSlice(cloudDescriptor.getShardId());
@@ -160,6 +154,7 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
           String collection = null;
           String leaderInfo = null;
           String shardId = null;
+          
           try {
             CloudDescriptor cloudDescriptor =
                 core.getCoreDescriptor().getCloudDescriptor();
@@ -175,8 +170,7 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
               "I was asked to wait on state " + waitForState + " for "
                   + shardId + " in " + collection + " on " + nodeName
                   + " but I still do not see the requested state. I see state: "
-                  + state.toString() + " live:" + live + " leader from ZK: " + leaderInfo
-          );
+                  + Objects.toString(state) + " live:" + live + " leader from ZK: " + leaderInfo);
         }
 
         if (coreContainer.isShutDown()) {
@@ -185,7 +179,7 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
         }
 
         // solrcloud_debug
-        if (log.isDebugEnabled()) {
+        if (log.isDebugEnabled() && core != null) {
           try {
             LocalSolrQueryRequest r = new LocalSolrQueryRequest(core,
                 new ModifiableSolrParams());

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
index 8c0a9cb..4ec3b79 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
@@ -60,6 +60,14 @@ import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
 public class HttpShardHandler extends ShardHandler {
+  
+  /**
+   * If the request context map has an entry with this key and Boolean.TRUE as value,
+   * {@link #prepDistributed(ResponseBuilder)} will only include {@link org.apache.solr.common.cloud.Replica.Type#NRT} replicas as possible
+   * destination of the distributed request (or a leader replica of type {@link org.apache.solr.common.cloud.Replica.Type#TLOG}). This is used 
+   * by the RealtimeGet handler, since other types of replicas shouldn't respond to RTG requests
+   */
+  public static String ONLY_NRT_REPLICAS = "distribOnlyRealtime";
 
   private HttpShardHandlerFactory httpShardHandlerFactory;
   private CompletionService<ShardResponse> completionService;
@@ -349,9 +357,12 @@ public class HttpShardHandler extends ShardHandler {
       // and make it a non-distributed request.
       String ourSlice = cloudDescriptor.getShardId();
       String ourCollection = cloudDescriptor.getCollectionName();
+      // Some requests may only be fulfilled by replicas of type Replica.Type.NRT
+      boolean onlyNrtReplicas = Boolean.TRUE == req.getContext().get(ONLY_NRT_REPLICAS);
       if (rb.slices.length == 1 && rb.slices[0] != null
           && ( rb.slices[0].equals(ourSlice) || rb.slices[0].equals(ourCollection + "_" + ourSlice) )  // handle the <collection>_<slice> format
-          && cloudDescriptor.getLastPublished() == Replica.State.ACTIVE) {
+          && cloudDescriptor.getLastPublished() == Replica.State.ACTIVE
+          && (!onlyNrtReplicas || cloudDescriptor.getReplicaType() == Replica.Type.NRT)) {
         boolean shortCircuit = params.getBool("shortCircuit", true);       // currently just a debugging parameter to check distrib search on a single node
 
         String targetHandler = params.get(ShardParams.SHARDS_QT);
@@ -387,14 +398,36 @@ public class HttpShardHandler extends ShardHandler {
             continue;
             // throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "no such shard: " + sliceName);
           }
+          Replica shardLeader = null;
 
           final Collection<Replica> allSliceReplicas = slice.getReplicasMap().values();
           final List<Replica> eligibleSliceReplicas = new ArrayList<>(allSliceReplicas.size());
           for (Replica replica : allSliceReplicas) {
             if (!clusterState.liveNodesContain(replica.getNodeName())
-                || replica.getState() != Replica.State.ACTIVE) {
+                || replica.getState() != Replica.State.ACTIVE
+                || (onlyNrtReplicas && replica.getType() == Replica.Type.PULL)) {
               continue;
             }
+            
+            if (onlyNrtReplicas && replica.getType() == Replica.Type.TLOG) {
+              if (shardLeader == null) {
+                try {
+                  shardLeader = zkController.getZkStateReader().getLeaderRetry(cloudDescriptor.getCollectionName(), slice.getName());
+                } catch (InterruptedException e) {
+                  throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + slice.getName() + " in collection " 
+                      + cloudDescriptor.getCollectionName(), e);
+                } catch (SolrException e) {
+                  if (log.isDebugEnabled()) {
+                    log.debug("Exception finding leader for shard {} in collection {}. Collection State: {}", 
+                        slice.getName(), cloudDescriptor.getCollectionName(), zkController.getZkStateReader().getClusterState().getCollectionOrNull(cloudDescriptor.getCollectionName()));
+                  }
+                  throw e;
+                }
+              }
+              if (!replica.getName().equals(shardLeader.getName())) {
+                continue;
+              }
+            }
             eligibleSliceReplicas.add(replica);
           }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
index c0ceddb..6d70435 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
@@ -16,6 +16,10 @@
  */
 package org.apache.solr.handler.component;
 
+import static org.apache.solr.common.params.CommonParams.DISTRIB;
+import static org.apache.solr.common.params.CommonParams.ID;
+import static org.apache.solr.common.params.CommonParams.VERSION_FIELD;
+
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
@@ -24,6 +28,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
@@ -68,9 +73,9 @@ import org.apache.solr.schema.FieldType;
 import org.apache.solr.schema.IndexSchema;
 import org.apache.solr.schema.SchemaField;
 import org.apache.solr.search.DocList;
-import org.apache.solr.search.SolrDocumentFetcher;
 import org.apache.solr.search.QParser;
 import org.apache.solr.search.ReturnFields;
+import org.apache.solr.search.SolrDocumentFetcher;
 import org.apache.solr.search.SolrIndexSearcher;
 import org.apache.solr.search.SolrReturnFields;
 import org.apache.solr.search.SyntaxError;
@@ -82,10 +87,6 @@ import org.apache.solr.util.RefCounted;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.common.params.CommonParams.DISTRIB;
-import static org.apache.solr.common.params.CommonParams.ID;
-import static org.apache.solr.common.params.CommonParams.VERSION_FIELD;
-
 public class RealTimeGetComponent extends SearchComponent
 {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -105,7 +106,21 @@ public class RealTimeGetComponent extends SearchComponent
     SolrQueryRequest req = rb.req;
     SolrQueryResponse rsp = rb.rsp;
     SolrParams params = req.getParams();
-
+    CloudDescriptor cloudDesc = req.getCore().getCoreDescriptor().getCloudDescriptor();
+
+    if (cloudDesc != null) {
+      Replica.Type replicaType = cloudDesc.getReplicaType();
+      if (replicaType != null) {
+        if (replicaType == Replica.Type.PULL) {
+          throw new SolrException(ErrorCode.BAD_REQUEST, 
+              String.format(Locale.ROOT, "%s can't handle realtime get requests. Replicas of type %s do not support these type of requests", 
+                  cloudDesc.getCoreNodeName(),
+                  Replica.Type.PULL));
+        } 
+        // non-leader TLOG replicas should not respond to distrib /get requests, but internal requests are OK
+      }
+    }
+    
     if (!params.getBool(COMPONENT_NAME, true)) {
       return;
     }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
index e7f6a7b..e481109 100644
--- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
+++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
@@ -45,7 +45,7 @@ import org.apache.lucene.search.TermQuery;
 import org.apache.lucene.util.BytesRefHash;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.core.SolrConfig.UpdateHandlerInfo;
 import org.apache.solr.core.SolrCore;
@@ -122,12 +122,9 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
     indexWriterCloseWaitsForMerges = updateHandlerInfo.indexWriterCloseWaitsForMerges;
 
     ZkController zkController = core.getCoreContainer().getZkController();
-    if (zkController != null) {
-      DocCollection dc = zkController.getClusterState().getCollection(core.getCoreDescriptor().getCollectionName());
-      if (dc.getRealtimeReplicas() == 1) {
-        commitWithinSoftCommit = false;
-        commitTracker.setOpenSearcher(true);
-      }
+    if (zkController != null && core.getCoreDescriptor().getCloudDescriptor().getReplicaType() == Replica.Type.TLOG) {
+      commitWithinSoftCommit = false;
+      commitTracker.setOpenSearcher(true);
     }
 
   }
@@ -249,7 +246,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
       cmd.overwrite = false;
     }
     try {
-      if ( (cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0) {
+      if ((cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0) {
         if (ulog != null) ulog.add(cmd);
         return 1;
       }
@@ -425,7 +422,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
     deleteByIdCommands.increment();
     deleteByIdCommandsCumulative.mark();
 
-    if ( (cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0 ) {
+    if ((cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0 ) {
       if (ulog != null) ulog.delete(cmd);
       return;
     }
@@ -489,7 +486,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
     deleteByQueryCommandsCumulative.mark();
     boolean madeIt=false;
     try {
-      if ( (cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0) {
+      if ((cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0) {
         if (ulog != null) ulog.deleteByQuery(cmd);
         madeIt = true;
         return;
@@ -548,7 +545,6 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
     }
   }
 
-
   @Override
   public int mergeIndexes(MergeIndexesCommand cmd) throws IOException {
     mergeIndexesCommands.mark();
@@ -921,7 +917,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
    * Calls either {@link IndexWriter#updateDocValues} or {@link IndexWriter#updateDocument} as 
    * needed based on {@link AddUpdateCommand#isInPlaceUpdate}.
    * <p>
-   * If the this is an UPDATE_INPLACE cmd, then all fields inclued in 
+   * If the this is an UPDATE_INPLACE cmd, then all fields included in 
    * {@link AddUpdateCommand#getLuceneDocument} must either be the uniqueKey field, or be DocValue 
    * only fields.
    * </p>

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
index 49d2664..f0eb8bc 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
@@ -115,7 +115,7 @@ public abstract class UpdateHandler implements SolrInfoBean {
   public UpdateHandler(SolrCore core)  {
     this(core, null);
   }
-
+  
   public UpdateHandler(SolrCore core, UpdateLog updateLog)  {
     this.core=core;
     idField = core.getLatestSchema().getUniqueKeyField();
@@ -124,7 +124,9 @@ public abstract class UpdateHandler implements SolrInfoBean {
     PluginInfo ulogPluginInfo = core.getSolrConfig().getPluginInfo(UpdateLog.class.getName());
 
 
-    if (updateLog == null && ulogPluginInfo != null && ulogPluginInfo.isEnabled()) {
+    // If this is a replica of type PULL, don't create the update log
+    boolean skipUpdateLog = core.getCoreDescriptor().getCloudDescriptor() != null && !core.getCoreDescriptor().getCloudDescriptor().requiresTransactionLog();
+    if (updateLog == null && ulogPluginInfo != null && ulogPluginInfo.isEnabled() && !skipUpdateLog) {
       String dataDir = (String)ulogPluginInfo.initArgs.get("dir");
 
       String ulogDir = core.getCoreDescriptor().getUlogDir();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/update/UpdateLog.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index c50add4..87b93f4 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -1092,7 +1092,7 @@ public static final int VERSION_IDX = 1;
 
   /**
    * Replay current tlog, so all updates will be written to index.
-   * This is must do task for a append replica become a new leader.
+   * This is must do task for a tlog replica become a new leader.
    * @return future of this task
    */
   public Future<RecoveryInfo> recoverFromCurrentLog() {
@@ -1706,7 +1706,7 @@ public static final int VERSION_IDX = 1;
 
     public void doReplay(TransactionLog translog) {
       try {
-        loglog.warn("Starting log replay " + translog + " active=" + activeLog + " starting pos=" + recoveryInfo.positionOfStart);
+        loglog.warn("Starting log replay " + translog + " active=" + activeLog + " starting pos=" + recoveryInfo.positionOfStart + " inSortedOrder=" + inSortedOrder);
         long lastStatusTime = System.nanoTime();
         if (inSortedOrder) {
           tlogReader = translog.getSortedReader(recoveryInfo.positionOfStart);
@@ -1786,7 +1786,7 @@ public static final int VERSION_IDX = 1;
                 recoveryInfo.adds++;
                 AddUpdateCommand cmd = convertTlogEntryToAddUpdateCommand(req, entry, oper, version);
                 cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT);
-                log.debug("{} {}", oper == ADD ? "add" : "update", cmd);
+                if (debug) log.debug("{} {}", oper == ADD ? "add" : "update", cmd);
                 proc.processAdd(cmd);
                 break;
               }
@@ -1854,6 +1854,7 @@ public static final int VERSION_IDX = 1;
             // something wrong with the request?
           }
           assert TestInjection.injectUpdateLogReplayRandomPause();
+          
         }
 
         CommitUpdateCommand cmd = new CommitUpdateCommand(req, false);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index cb1b2fb..5269ecb 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -279,7 +280,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
   // this is set to true in the constructor if the next processors in the chain
   // are custom and may modify the SolrInputDocument racing with its serialization for replication
   private final boolean cloneRequiredOnLeader;
-  private final boolean onlyLeaderIndexes;
+  private final Replica.Type replicaType;
 
   public DistributedUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
     this(req, rsp, new AtomicUpdateDocumentMerger(req), next);
@@ -324,12 +325,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     
     if (cloudDesc != null) {
       collection = cloudDesc.getCollectionName();
-      ClusterState cstate = zkController.getClusterState();
-      DocCollection coll = cstate.getCollection(collection);
-      onlyLeaderIndexes = coll.getRealtimeReplicas() == 1;
+      replicaType = cloudDesc.getReplicaType();
     } else {
       collection = null;
-      onlyLeaderIndexes = false;
+      replicaType = Replica.Type.NRT;
     }
 
     boolean shouldClone = false;
@@ -666,7 +665,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
 
 
   // used for deleteByQuery to get the list of nodes this leader should forward to
-  private List<Node> setupRequest() {
+  private List<Node> setupRequestForDBQ() {
     List<Node> nodes = null;
     String shardId = cloudDesc.getShardId();
 
@@ -680,7 +679,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
 
       forwardToLeader = false;
       List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
-          .getReplicaProps(collection, shardId, leaderReplica.getName(), null, Replica.State.DOWN);
+          .getReplicaProps(collection, shardId, leaderReplica.getName(), null, Replica.State.DOWN, EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
       if (replicaProps != null) {
         nodes = new ArrayList<>(replicaProps.size());
         for (ZkCoreNodeProps props : replicaProps) {
@@ -1190,7 +1189,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
                 checkDeleteByQueries = true;
               }
             }
-            if (onlyLeaderIndexes && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+            if (replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
               cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
             }
           }
@@ -1576,7 +1575,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     if (zkEnabled && DistribPhase.TOLEADER == phase) {
       // This core should be a leader
       isLeader = true;
-      replicas = setupRequest();
+      replicas = setupRequestForDBQ();
     } else if (DistribPhase.FROMLEADER == phase) {
       isLeader = false;
     }
@@ -1610,8 +1609,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
           String myShardId = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId();
           Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
               collection, myShardId);
+          // DBQ forwarded to NRT and TLOG replicas
           List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
-              .getReplicaProps(collection, myShardId, leaderReplica.getName(), null, Replica.State.DOWN);
+              .getReplicaProps(collection, myShardId, leaderReplica.getName(), null, Replica.State.DOWN, EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
           if (replicaProps != null) {
             final List<Node> myReplicas = new ArrayList<>(replicaProps.size());
             for (ZkCoreNodeProps replicaProp : replicaProps) {
@@ -1699,10 +1699,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
             return;
           }
 
-          if (onlyLeaderIndexes && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+          if (replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+            // TLOG replica not leader, don't write the DBQ to IW
             cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
           }
-
           doLocalDelete(cmd);
         }
       }
@@ -1857,7 +1857,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
               }
             }
 
-            if (onlyLeaderIndexes && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
+            if (replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
               cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
             }
           }
@@ -1884,14 +1884,19 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
       zkCheck();
       
       nodes = getCollectionUrls(req, req.getCore().getCoreDescriptor()
-          .getCloudDescriptor().getCollectionName());
-      if (isLeader && nodes.size() == 1) {
+          .getCloudDescriptor().getCollectionName(), EnumSet.of(Replica.Type.TLOG,Replica.Type.NRT));
+      if (nodes == null) {
+        // This could happen if there are only pull replicas
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 
+            "Unable to distribute commit operation. No replicas available of types " + Replica.Type.TLOG + " or " + Replica.Type.NRT);
+      }
+      if (isLeader && nodes.size() == 1 && replicaType != Replica.Type.PULL) {
         singleLeader = true;
       }
     }
     
     if (!zkEnabled || req.getParams().getBool(COMMIT_END_POINT, false) || singleLeader) {
-      if (onlyLeaderIndexes) {
+      if (replicaType == Replica.Type.TLOG) {
         try {
           Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
               collection, cloudDesc.getShardId());
@@ -1904,12 +1909,19 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
             doLocalCommit(cmd);
           } else {
             assert TestInjection.waitForInSyncWithLeader(req.getCore(),
-                zkController, collection, cloudDesc.getShardId());
+                zkController, collection, cloudDesc.getShardId()): "Core " + req.getCore() + " not in sync with leader";
           }
         } catch (InterruptedException e) {
           throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + cloudDesc.getShardId(), e);
         }
+      } else if (replicaType == Replica.Type.PULL) {
+        log.warn("Commit not supported on replicas of type " + Replica.Type.PULL);
       } else {
+        // NRT replicas will always commit
+        if (vinfo != null) {
+          long commitVersion = vinfo.getNewClock();
+          cmd.setVersion(commitVersion);
+        }
         doLocalCommit(cmd);
       }
     } else {
@@ -1958,7 +1970,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
  
 
   
-  private List<Node> getCollectionUrls(SolrQueryRequest req, String collection) {
+  private List<Node> getCollectionUrls(SolrQueryRequest req, String collection, EnumSet<Replica.Type> types) {
     ClusterState clusterState = req.getCore()
         .getCoreContainer().getZkController().getClusterState();
     Map<String,Slice> slices = clusterState.getSlicesMap(collection);
@@ -1973,6 +1985,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
       Map<String,Replica> shardMap = replicas.getReplicasMap();
       
       for (Entry<String,Replica> entry : shardMap.entrySet()) {
+        if (!types.contains(entry.getValue().getType())) {
+          continue;
+        }
         ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(entry.getValue());
         if (clusterState.liveNodesContain(nodeProps.getNodeName())) {
           urls.add(new StdNode(nodeProps, collection, replicas.getName()));

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/util/TestInjection.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/util/TestInjection.java b/solr/core/src/java/org/apache/solr/util/TestInjection.java
index 6b7b1f8..5b0d047 100644
--- a/solr/core/src/java/org/apache/solr/util/TestInjection.java
+++ b/solr/core/src/java/org/apache/solr/util/TestInjection.java
@@ -75,7 +75,7 @@ public class TestInjection {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   
   private static final Pattern ENABLED_PERCENT = Pattern.compile("(true|false)(?:\\:(\\d+))?$", Pattern.CASE_INSENSITIVE);
-
+  
   private static final String LUCENE_TEST_CASE_FQN = "org.apache.lucene.util.LuceneTestCase";
 
   /** 
@@ -151,6 +151,7 @@ public class TestInjection {
     splitFailureBeforeReplicaCreation = null;
     prepRecoveryOpPauseForever = null;
     countPrepRecoveryOpPauseForever = new AtomicInteger(0);
+    waitForReplicasInSync = "true:60";
 
     for (Timer timer : timers) {
       timer.cancel();
@@ -387,9 +388,10 @@ public class TestInjection {
             String localVersion = searcher.get().getIndexReader().getIndexCommit().getUserData().get(SolrIndexWriter.COMMIT_TIME_MSEC_KEY);
             if (localVersion == null && leaderVersion == 0 && !core.getUpdateHandler().getUpdateLog().hasUncommittedChanges()) return true;
             if (localVersion != null && Long.parseLong(localVersion) == leaderVersion && (leaderVersion >= t || i >= 6)) {
-              log.info("Waiting time for replica in sync with leader: {}", System.currentTimeMillis()-currentTime);
+              log.info("Waiting time for tlog replica to be in sync with leader: {}", System.currentTimeMillis()-currentTime);
               return true;
             } else {
+              log.debug("Tlog replica not in sync with leader yet. Attempt: {}. Local Version={}, leader Version={}", i, localVersion, leaderVersion);
               Thread.sleep(500);
             }
           } finally {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test-files/solr/collection1/conf/solrconfig.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/collection1/conf/solrconfig.xml b/solr/core/src/test-files/solr/collection1/conf/solrconfig.xml
index 58f9551..a63f6cb 100644
--- a/solr/core/src/test-files/solr/collection1/conf/solrconfig.xml
+++ b/solr/core/src/test-files/solr/collection1/conf/solrconfig.xml
@@ -63,6 +63,10 @@
 
   <updateHandler class="solr.DirectUpdateHandler2">
 
+    <autoCommit>
+      <maxTime>${solr.autoCommit.maxTime:-1}</maxTime>
+    </autoCommit>
+
     <!-- autocommit pending docs if certain criteria are met
     <autoCommit>
       <maxDocs>10000</maxDocs>
@@ -478,7 +482,7 @@
       <str name="facet.query">foo_s:bar</str>
     </lst>
   </requestHandler>
-
+  
   <admin>
     <defaultQuery>solr</defaultQuery>
     <gettableFiles>solrconfig.xml schema.xml admin-extra.html</gettableFiles>
@@ -577,6 +581,8 @@
       <str name="df">text</str>
     </lst>
   </initParams>
+  
+  
 
 </config>
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test-files/solr/configsets/cloud-minimal/conf/solrconfig.xml
----------------------------------------------------------------------
diff --git a/solr/core/src/test-files/solr/configsets/cloud-minimal/conf/solrconfig.xml b/solr/core/src/test-files/solr/configsets/cloud-minimal/conf/solrconfig.xml
index 059e58f..8da7d28 100644
--- a/solr/core/src/test-files/solr/configsets/cloud-minimal/conf/solrconfig.xml
+++ b/solr/core/src/test-files/solr/configsets/cloud-minimal/conf/solrconfig.xml
@@ -33,7 +33,7 @@
     <commitWithin>
       <softCommit>${solr.commitwithin.softcommit:true}</softCommit>
     </commitWithin>
-    <updateLog></updateLog>
+    <updateLog class="${solr.ulog:solr.UpdateLog}"></updateLog>
   </updateHandler>
 
   <requestHandler name="/select" class="solr.SearchHandler">

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/AbstractCloudBackupRestoreTestCase.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/AbstractCloudBackupRestoreTestCase.java b/solr/core/src/test/org/apache/solr/cloud/AbstractCloudBackupRestoreTestCase.java
index 3414759..a6d130e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/AbstractCloudBackupRestoreTestCase.java
+++ b/solr/core/src/test/org/apache/solr/cloud/AbstractCloudBackupRestoreTestCase.java
@@ -32,6 +32,7 @@ import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest.ClusterProp;
 import org.apache.solr.client.solrj.response.RequestStatusState;
@@ -48,8 +49,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.common.params.ShardParams._ROUTE_;
-
 /**
  * This class implements the logic required to test Solr cloud backup/restore capability.
  */
@@ -84,11 +83,17 @@ public abstract class AbstractCloudBackupRestoreTestCase extends SolrCloudTestCa
   @Test
   public void test() throws Exception {
     boolean isImplicit = random().nextBoolean();
+    boolean doSplitShardOperation = !isImplicit && random().nextBoolean();
     int replFactor = TestUtil.nextInt(random(), 1, 2);
+    int numTlogReplicas = TestUtil.nextInt(random(), 0, 1);
+    int numPullReplicas = TestUtil.nextInt(random(), 0, 1);
     CollectionAdminRequest.Create create =
-        CollectionAdminRequest.createCollection(getCollectionName(), "conf1", NUM_SHARDS, replFactor);
-    if (NUM_SHARDS * replFactor > cluster.getJettySolrRunners().size() || random().nextBoolean()) {
-      create.setMaxShardsPerNode(NUM_SHARDS);//just to assert it survives the restoration
+        CollectionAdminRequest.createCollection(getCollectionName(), "conf1", NUM_SHARDS, replFactor, numTlogReplicas, numPullReplicas);
+    if (NUM_SHARDS * (replFactor + numTlogReplicas + numPullReplicas) > cluster.getJettySolrRunners().size() || random().nextBoolean()) {
+      create.setMaxShardsPerNode((int)Math.ceil(NUM_SHARDS * (replFactor + numTlogReplicas + numPullReplicas) / cluster.getJettySolrRunners().size()));//just to assert it survives the restoration
+      if (doSplitShardOperation) {
+        create.setMaxShardsPerNode(create.getMaxShardsPerNode() * 2);
+      }
     }
     if (random().nextBoolean()) {
       create.setAutoAddReplicas(true);//just to assert it survives the restoration
@@ -112,7 +117,7 @@ public abstract class AbstractCloudBackupRestoreTestCase extends SolrCloudTestCa
 
     indexDocs(getCollectionName());
 
-    if (!isImplicit && random().nextBoolean()) {
+    if (doSplitShardOperation) {
       // shard split the first shard
       int prevActiveSliceCount = getActiveSliceCount(getCollectionName());
       CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(getCollectionName());
@@ -235,9 +240,9 @@ public abstract class AbstractCloudBackupRestoreTestCase extends SolrCloudTestCa
       CollectionAdminRequest.Restore restore = CollectionAdminRequest.restoreCollection(restoreCollectionName, backupName)
           .setLocation(backupLocation).setRepositoryName(getBackupRepoName());
 
-      if (origShardToDocCount.size() > cluster.getJettySolrRunners().size()) {
+      if (backupCollection.getReplicas().size() > cluster.getJettySolrRunners().size()) {
         // may need to increase maxShardsPerNode (e.g. if it was shard split, then now we need more)
-        restore.setMaxShardsPerNode(origShardToDocCount.size());
+        restore.setMaxShardsPerNode((int)Math.ceil(backupCollection.getReplicas().size()/cluster.getJettySolrRunners().size()));
       }
 
       if (rarely()) { // Try with createNodeSet configuration
@@ -304,9 +309,11 @@ public abstract class AbstractCloudBackupRestoreTestCase extends SolrCloudTestCa
     Map<String,Integer> shardToDocCount = new TreeMap<>();
     for (Slice slice : docCollection.getActiveSlices()) {
       String shardName = slice.getName();
-      long docsInShard = client.query(docCollection.getName(), new SolrQuery("*:*").setParam(_ROUTE_, shardName))
-          .getResults().getNumFound();
-      shardToDocCount.put(shardName, (int) docsInShard);
+      try (HttpSolrClient leaderClient = new HttpSolrClient.Builder(slice.getLeader().getCoreUrl()).withHttpClient(client.getHttpClient()).build()) {
+        long docsInShard = leaderClient.query(new SolrQuery("*:*").setParam("distrib", "false"))
+            .getResults().getNumFound();
+        shardToDocCount.put(shardName, (int) docsInShard);
+      }
     }
     return shardToDocCount;
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/AssignTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/AssignTest.java b/solr/core/src/test/org/apache/solr/cloud/AssignTest.java
index 7593f3b..8e32510 100644
--- a/solr/core/src/test/org/apache/solr/cloud/AssignTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/AssignTest.java
@@ -87,4 +87,10 @@ public class AssignTest extends SolrTestCaseJ4 {
     assertEquals("core_node2", nodeName);
   }
   
+  @Test
+  public void testBuildCoreName() {
+    assertEquals("Core name pattern changed", "collection1_shard1_replica_n1", Assign.buildCoreName("collection1", "shard1", Replica.Type.NRT, 1));
+    assertEquals("Core name pattern changed", "collection1_shard2_replica_p2", Assign.buildCoreName("collection1", "shard2", Replica.Type.PULL,2));
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java
index 5eb4b3b..c8e92fc 100644
--- a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java
+++ b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java
@@ -62,8 +62,8 @@ public class BasicDistributedZk2Test extends AbstractFullDistribZkTestBase {
   }
 
   @Override
-  protected int getRealtimeReplicas() {
-    return onlyLeaderIndexes? 1 : -1;
+  protected boolean useTlogReplicas() {
+    return onlyLeaderIndexes;
   }
   
   @Test

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
index 1c23c9c..18caa58 100644
--- a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java
@@ -119,8 +119,8 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
   }
 
   @Override
-  protected int getRealtimeReplicas() {
-    return onlyLeaderIndexes? 1 : -1;
+  protected boolean useTlogReplicas() {
+    return onlyLeaderIndexes;
   }
 
   @Override
@@ -1075,12 +1075,6 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
     assertEquals(collection3Docs, collection2Docs - 1);
   }
   
-  protected SolrInputDocument getDoc(Object... fields) throws Exception {
-    SolrInputDocument doc = new SolrInputDocument();
-    addFields(doc, fields);
-    return doc;
-  }
-  
   protected void indexDoc(String collection, SolrInputDocument doc) throws IOException, SolrServerException {
     List<SolrClient> clients = otherCollectionClients.get(collection);
     int which = (doc.getField(id).toString().hashCode() & 0x7fffffff) % clients.size();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
index ffc5262..2e31520 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java
@@ -16,33 +16,21 @@
  */
 package org.apache.solr.cloud;
 
-import java.lang.invoke.MethodHandles;
-import java.net.ConnectException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.http.client.HttpClient;
-import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.SolrTestCaseJ4.SuppressObjectReleaseTracker;
 import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
-import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient;
-import org.apache.solr.client.solrj.impl.HttpClientUtil;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.IOUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @Slow
 @SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
@@ -51,8 +39,6 @@ import org.slf4j.LoggerFactory;
 public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase {
   private static final int FAIL_TOLERANCE = 100;
 
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  
   private static final Integer RUN_LENGTH = Integer.parseInt(System.getProperty("solr.tests.cloud.cm.runlength", "-1"));
 
   private final boolean onlyLeaderIndexes = random().nextBoolean();
@@ -112,8 +98,8 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
   }
 
   @Override
-  protected int getRealtimeReplicas() {
-    return onlyLeaderIndexes? 1 : -1;
+  protected boolean useTlogReplicas() {
+    return onlyLeaderIndexes;
   }
 
   @Test
@@ -158,8 +144,8 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
       // it's currently hard to know what requests failed when using ConcurrentSolrUpdateServer
       boolean runFullThrottle = random().nextBoolean();
       if (runFullThrottle) {
-        FullThrottleStoppableIndexingThread ftIndexThread = new FullThrottleStoppableIndexingThread(
-            clients, "ft1", true);
+        FullThrottleStoppableIndexingThread ftIndexThread = 
+            new FullThrottleStoppableIndexingThread(controlClient, cloudClient, clients, "ft1", true, this.clientSoTimeout);
         threads.add(ftIndexThread);
         ftIndexThread.start();
       }
@@ -289,111 +275,6 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
     return deleteFails;
   }
 
-  class FullThrottleStoppableIndexingThread extends StoppableIndexingThread {
-    private CloseableHttpClient httpClient = HttpClientUtil.createClient(null);
-    private volatile boolean stop = false;
-    int clientIndex = 0;
-    private ConcurrentUpdateSolrClient cusc;
-    private List<SolrClient> clients;
-    private AtomicInteger fails = new AtomicInteger();
-    
-    public FullThrottleStoppableIndexingThread(List<SolrClient> clients,
-                                               String id, boolean doDeletes) {
-      super(controlClient, cloudClient, id, doDeletes);
-      setName("FullThrottleStopableIndexingThread");
-      setDaemon(true);
-      this.clients = clients;
-
-      cusc = new ErrorLoggingConcurrentUpdateSolrClient(((HttpSolrClient) clients.get(0)).getBaseURL(), httpClient, 8, 2);
-      cusc.setConnectionTimeout(10000);
-      cusc.setSoTimeout(clientSoTimeout);
-    }
-    
-    @Override
-    public void run() {
-      int i = 0;
-      int numDeletes = 0;
-      int numAdds = 0;
-
-      while (true && !stop) {
-        String id = this.id + "-" + i;
-        ++i;
-        
-        if (doDeletes && random().nextBoolean() && deletes.size() > 0) {
-          String delete = deletes.remove(0);
-          try {
-            numDeletes++;
-            cusc.deleteById(delete);
-          } catch (Exception e) {
-            changeUrlOnError(e);
-            fails.incrementAndGet();
-          }
-        }
-        
-        try {
-          numAdds++;
-          if (numAdds > (TEST_NIGHTLY ? 4002 : 197))
-            continue;
-          SolrInputDocument doc = getDoc(
-              "id",
-              id,
-              i1,
-              50,
-              t1,
-              "Saxon heptarchies that used to rip around so in old times and raise Cain.  My, you ought to seen old Henry the Eight when he was in bloom.  He WAS a blossom.  He used to marry a new wife every day, and chop off her head next morning.  And he would do it just as indifferent as if ");
-          cusc.add(doc);
-        } catch (Exception e) {
-          changeUrlOnError(e);
-          fails.incrementAndGet();
-        }
-        
-        if (doDeletes && random().nextBoolean()) {
-          deletes.add(id);
-        }
-        
-      }
-
-      log.info("FT added docs:" + numAdds + " with " + fails + " fails" + " deletes:" + numDeletes);
-    }
-
-    private void changeUrlOnError(Exception e) {
-      if (e instanceof ConnectException) {
-        clientIndex++;
-        if (clientIndex > clients.size() - 1) {
-          clientIndex = 0;
-        }
-        cusc.shutdownNow();
-        cusc = new ErrorLoggingConcurrentUpdateSolrClient(((HttpSolrClient) clients.get(clientIndex)).getBaseURL(),
-            httpClient, 30, 3);
-      }
-    }
-    
-    @Override
-    public void safeStop() {
-      stop = true;
-      cusc.blockUntilFinished();
-      cusc.shutdownNow();
-      IOUtils.closeQuietly(httpClient);
-    }
-
-    @Override
-    public int getFailCount() {
-      return fails.get();
-    }
-    
-    @Override
-    public Set<String> getAddFails() {
-      throw new UnsupportedOperationException();
-    }
-    
-    @Override
-    public Set<String> getDeleteFails() {
-      throw new UnsupportedOperationException();
-    }
-    
-  };
-  
-  
   // skip the randoms - they can deadlock...
   @Override
   protected void indexr(Object... fields) throws Exception {
@@ -401,13 +282,4 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
     indexDoc(doc);
   }
 
-  static class ErrorLoggingConcurrentUpdateSolrClient extends ConcurrentUpdateSolrClient {
-    public ErrorLoggingConcurrentUpdateSolrClient(String serverUrl, HttpClient httpClient, int queueSize, int threadCount) {
-      super(serverUrl, httpClient, queueSize, threadCount, null, false);
-    }
-    @Override
-    public void handleError(Throwable ex) {
-      log.warn("cusc error", ex);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeWithPullReplicasTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeWithPullReplicasTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeWithPullReplicasTest.java
new file mode 100644
index 0000000..11c25d3
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeWithPullReplicasTest.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.SolrTestCaseJ4.SuppressObjectReleaseTracker;
+import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.util.TestInjection;
+import org.apache.solr.util.TimeOut;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
+
+@Slow
+@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
+@ThreadLeakLingering(linger = 60000)
+@SuppressObjectReleaseTracker(bugUrl="Testing purposes")
+public class ChaosMonkeyNothingIsSafeWithPullReplicasTest extends AbstractFullDistribZkTestBase {
+  private static final int FAIL_TOLERANCE = 100;
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  
+  private static final Integer RUN_LENGTH = Integer.parseInt(System.getProperty("solr.tests.cloud.cm.runlength", "-1"));
+
+  private final boolean useTlogReplicas = random().nextBoolean();
+  
+  private final int numPullReplicas;
+  private final int numRealtimeOrTlogReplicas;
+  
+  protected int getPullReplicaCount() {
+    return numPullReplicas;
+  }
+
+  @BeforeClass
+  public static void beforeSuperClass() {
+    schemaString = "schema15.xml";      // we need a string id
+    if (usually()) {
+      System.setProperty("solr.autoCommit.maxTime", "15000");
+    }
+    TestInjection.waitForReplicasInSync = null;
+    setErrorHook();
+  }
+  
+  @AfterClass
+  public static void afterSuperClass() {
+    System.clearProperty("solr.autoCommit.maxTime");
+    clearErrorHook();
+    TestInjection.reset();
+  }
+  
+  protected static final String[] fieldNames = new String[]{"f_i", "f_f", "f_d", "f_l", "f_dt"};
+  protected static final RandVal[] randVals = new RandVal[]{rint, rfloat, rdouble, rlong, rdate};
+
+  private int clientSoTimeout;
+  
+  public String[] getFieldNames() {
+    return fieldNames;
+  }
+
+  public RandVal[] getRandValues() {
+    return randVals;
+  }
+  
+  @Override
+  public void distribSetUp() throws Exception {
+    super.distribSetUp();
+    // can help to hide this when testing and looking at logs
+    //ignoreException("shard update error");
+    useFactory("solr.StandardDirectoryFactory");
+  }
+  
+  public ChaosMonkeyNothingIsSafeWithPullReplicasTest() {
+    super();
+    numPullReplicas = random().nextInt(TEST_NIGHTLY ? 2 : 1) + 1;
+    numRealtimeOrTlogReplicas = random().nextInt(TEST_NIGHTLY ? 4 : 3) + 1;
+    sliceCount = Integer.parseInt(System.getProperty("solr.tests.cloud.cm.slicecount", "-1"));
+    if (sliceCount == -1) {
+      sliceCount = random().nextInt(TEST_NIGHTLY ? 3 : 2) + 1;
+    }
+
+    int numNodes = sliceCount * (numRealtimeOrTlogReplicas + numPullReplicas);
+    fixShardCount(numNodes);
+    log.info("Starting ChaosMonkey test with {} shards and {} nodes", sliceCount, numNodes);
+
+    // None of the operations used here are particularly costly, so this should work.
+    // Using this low timeout will also help us catch index stalling.
+    clientSoTimeout = 5000;
+  }
+
+  @Override
+  protected boolean useTlogReplicas() {
+    return useTlogReplicas;
+  }
+
+  @Test
+  public void test() throws Exception {
+    cloudClient.setSoTimeout(clientSoTimeout);
+    DocCollection docCollection = cloudClient.getZkStateReader().getClusterState().getCollection(DEFAULT_COLLECTION);
+    assertEquals(this.sliceCount, docCollection.getSlices().size());
+    Slice s = docCollection.getSlice("shard1");
+    assertNotNull(s);
+    assertEquals("Unexpected number of replicas. Collection: " + docCollection, numRealtimeOrTlogReplicas + numPullReplicas, s.getReplicas().size());
+    assertEquals("Unexpected number of pull replicas. Collection: " + docCollection, numPullReplicas, s.getReplicas(EnumSet.of(Replica.Type.PULL)).size());
+    assertEquals(useTlogReplicas()?0:numRealtimeOrTlogReplicas, s.getReplicas(EnumSet.of(Replica.Type.NRT)).size());
+    assertEquals(useTlogReplicas()?numRealtimeOrTlogReplicas:0, s.getReplicas(EnumSet.of(Replica.Type.TLOG)).size());
+    
+    boolean testSuccessful = false;
+    try {
+      handle.clear();
+      handle.put("timestamp", SKIPVAL);
+      ZkStateReader zkStateReader = cloudClient.getZkStateReader();
+      // make sure we have leaders for each shard
+      for (int j = 1; j < sliceCount; j++) {
+        zkStateReader.getLeaderRetry(DEFAULT_COLLECTION, "shard" + j, 10000);
+      }      // make sure we again have leaders for each shard
+      
+      waitForRecoveriesToFinish(false);
+      
+      // we cannot do delete by query
+      // as it's not supported for recovery
+      del("*:*");
+      
+      List<StoppableThread> threads = new ArrayList<>();
+      List<StoppableIndexingThread> indexTreads = new ArrayList<>();
+      int threadCount = TEST_NIGHTLY ? 3 : 1;
+      int i = 0;
+      for (i = 0; i < threadCount; i++) {
+        StoppableIndexingThread indexThread = new StoppableIndexingThread(controlClient, cloudClient, Integer.toString(i), true);
+        threads.add(indexThread);
+        indexTreads.add(indexThread);
+        indexThread.start();
+      }
+      
+      threadCount = 1;
+      i = 0;
+      for (i = 0; i < threadCount; i++) {
+        StoppableSearchThread searchThread = new StoppableSearchThread(cloudClient);
+        threads.add(searchThread);
+        searchThread.start();
+      }
+      
+      if (usually()) {
+        StoppableCommitThread commitThread = new StoppableCommitThread(cloudClient, 1000, false);
+        threads.add(commitThread);
+        commitThread.start();
+      }
+      
+      // TODO: we only do this sometimes so that we can sometimes compare against control,
+      // it's currently hard to know what requests failed when using ConcurrentSolrUpdateServer
+      boolean runFullThrottle = random().nextBoolean();
+      if (runFullThrottle) {
+        FullThrottleStoppableIndexingThread ftIndexThread = 
+            new FullThrottleStoppableIndexingThread(controlClient, cloudClient, clients, "ft1", true, this.clientSoTimeout);
+        threads.add(ftIndexThread);
+        ftIndexThread.start();
+      }
+      
+      chaosMonkey.startTheMonkey(true, 10000);
+      try {
+        long runLength;
+        if (RUN_LENGTH != -1) {
+          runLength = RUN_LENGTH;
+        } else {
+          int[] runTimes;
+          if (TEST_NIGHTLY) {
+            runTimes = new int[] {5000, 6000, 10000, 15000, 25000, 30000,
+                30000, 45000, 90000, 120000};
+          } else {
+            runTimes = new int[] {5000, 7000, 15000};
+          }
+          runLength = runTimes[random().nextInt(runTimes.length - 1)];
+        }
+        ChaosMonkey.wait(runLength, DEFAULT_COLLECTION, zkStateReader);
+      } finally {
+        chaosMonkey.stopTheMonkey();
+      }
+
+      // ideally this should go into chaosMonkey
+      restartZk(1000 * (5 + random().nextInt(4)));
+
+      for (StoppableThread indexThread : threads) {
+        indexThread.safeStop();
+      }
+      
+      // start any downed jetties to be sure we still will end up with a leader per shard...
+      
+      // wait for stop...
+      for (StoppableThread indexThread : threads) {
+        indexThread.join();
+      }
+      
+      // try and wait for any replications and what not to finish...
+      
+      ChaosMonkey.wait(2000, DEFAULT_COLLECTION, zkStateReader);
+      
+      // wait until there are no recoveries...
+      waitForThingsToLevelOut(Integer.MAX_VALUE);//Math.round((runLength / 1000.0f / 3.0f)));
+      
+      // make sure we again have leaders for each shard
+      for (int j = 1; j < sliceCount; j++) {
+        zkStateReader.getLeaderRetry(DEFAULT_COLLECTION, "shard" + j, 30000);
+      }
+      
+      commit();
+      
+      // TODO: assert we didnt kill everyone
+      
+      zkStateReader.updateLiveNodes();
+      assertTrue(zkStateReader.getClusterState().getLiveNodes().size() > 0);
+      
+      
+      // we expect full throttle fails, but cloud client should not easily fail
+      for (StoppableThread indexThread : threads) {
+        if (indexThread instanceof StoppableIndexingThread && !(indexThread instanceof FullThrottleStoppableIndexingThread)) {
+          int failCount = ((StoppableIndexingThread) indexThread).getFailCount();
+          assertFalse("There were too many update fails (" + failCount + " > " + FAIL_TOLERANCE
+              + ") - we expect it can happen, but shouldn't easily", failCount > FAIL_TOLERANCE);
+        }
+      }
+      
+      waitForReplicationFromReplicas(DEFAULT_COLLECTION, zkStateReader, new TimeOut(30, TimeUnit.SECONDS));
+//      waitForAllWarmingSearchers();
+      
+      Set<String> addFails = getAddFails(indexTreads);
+      Set<String> deleteFails = getDeleteFails(indexTreads);
+      // full throttle thread can
+      // have request fails
+      checkShardConsistency(!runFullThrottle, true, addFails, deleteFails);      
+      
+      long ctrlDocs = controlClient.query(new SolrQuery("*:*")).getResults()
+      .getNumFound(); 
+      
+      // ensure we have added more than 0 docs
+      long cloudClientDocs = cloudClient.query(new SolrQuery("*:*"))
+          .getResults().getNumFound();
+      
+      assertTrue("Found " + ctrlDocs + " control docs", cloudClientDocs > 0);
+      
+      if (VERBOSE) System.out.println("control docs:"
+          + controlClient.query(new SolrQuery("*:*")).getResults()
+              .getNumFound() + "\n\n");
+      
+      // try and make a collection to make sure the overseer has survived the expiration and session loss
+
+      // sometimes we restart zookeeper as well
+      if (random().nextBoolean()) {
+        restartZk(1000 * (5 + random().nextInt(4)));
+      }
+
+      try (CloudSolrClient client = createCloudClient("collection1")) {
+        // We don't really know how many live nodes we have at this point, so "maxShardsPerNode" needs to be > 1
+        createCollection(null, "testcollection",
+              1, 1, 10, client, null, "conf1"); 
+      }
+      List<Integer> numShardsNumReplicas = new ArrayList<>(2);
+      numShardsNumReplicas.add(1);
+      numShardsNumReplicas.add(1 + getPullReplicaCount());
+      checkForCollection("testcollection", numShardsNumReplicas, null);
+      
+      testSuccessful = true;
+    } finally {
+      if (!testSuccessful) {
+        logReplicaTypesReplicationInfo(DEFAULT_COLLECTION, cloudClient.getZkStateReader());
+        printLayout();
+      }
+    }
+  }
+
+  private Set<String> getAddFails(List<StoppableIndexingThread> threads) {
+    Set<String> addFails = new HashSet<String>();
+    for (StoppableIndexingThread thread : threads)   {
+      addFails.addAll(thread.getAddFails());
+//      addFails.addAll(thread.getAddFailsMinRf());
+    }
+    return addFails;
+  }
+  
+  private Set<String> getDeleteFails(List<StoppableIndexingThread> threads) {
+    Set<String> deleteFails = new HashSet<String>();
+    for (StoppableIndexingThread thread : threads)   {
+      deleteFails.addAll(thread.getDeleteFails());
+//      deleteFails.addAll(thread.getDeleteFailsMinRf());
+    }
+    return deleteFails;
+  }
+
+  // skip the randoms - they can deadlock...
+  @Override
+  protected void indexr(Object... fields) throws Exception {
+    SolrInputDocument doc = getDoc(fields);
+    indexDoc(doc);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderWithPullReplicasTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderWithPullReplicasTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderWithPullReplicasTest.java
new file mode 100644
index 0000000..f2e8845
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderWithPullReplicasTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.SolrTestCaseJ4.SuppressObjectReleaseTracker;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.util.TestInjection;
+import org.apache.solr.util.TimeOut;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Slow
+@SuppressObjectReleaseTracker(bugUrl="Testing purposes")
+public class ChaosMonkeySafeLeaderWithPullReplicasTest extends AbstractFullDistribZkTestBase {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  
+  private static final Integer RUN_LENGTH = Integer.parseInt(System.getProperty("solr.tests.cloud.cm.runlength", "-1"));
+  
+  private final boolean useTlogReplicas = random().nextBoolean();
+  
+  private final int numPullReplicas;
+  private final int numRealtimeOrTlogReplicas;
+  
+  @Override
+  protected int getPullReplicaCount() {
+    return numPullReplicas;
+  }
+  
+  @Override
+  protected boolean useTlogReplicas() {
+    return useTlogReplicas;
+  }
+
+  @BeforeClass
+  public static void beforeSuperClass() {
+    schemaString = "schema15.xml";      // we need a string id
+    if (usually()) {
+      System.setProperty("solr.autoCommit.maxTime", "15000");
+    }
+    TestInjection.waitForReplicasInSync = null;
+    setErrorHook();
+  }
+  
+  @AfterClass
+  public static void afterSuperClass() {
+    System.clearProperty("solr.autoCommit.maxTime");
+    clearErrorHook();
+    TestInjection.reset();
+  }
+
+  protected static final String[] fieldNames = new String[]{"f_i", "f_f", "f_d", "f_l", "f_dt"};
+  protected static final RandVal[] randVals = new RandVal[]{rint, rfloat, rdouble, rlong, rdate};
+  
+  public String[] getFieldNames() {
+    return fieldNames;
+  }
+
+  public RandVal[] getRandValues() {
+    return randVals;
+  }
+  
+  @Override
+  public void distribSetUp() throws Exception {
+    useFactory("solr.StandardDirectoryFactory");
+    super.distribSetUp();
+  }
+  
+  public ChaosMonkeySafeLeaderWithPullReplicasTest() {
+    super();
+    numPullReplicas = random().nextInt(TEST_NIGHTLY ? 3 : 2) + 1;;
+    numRealtimeOrTlogReplicas = random().nextInt(TEST_NIGHTLY ? 3 : 2) + 1;;
+    sliceCount = Integer.parseInt(System.getProperty("solr.tests.cloud.cm.slicecount", "-1"));
+    if (sliceCount == -1) {
+      sliceCount = random().nextInt(TEST_NIGHTLY ? 3 : 2) + 1;
+    }
+
+    int numNodes = sliceCount * (numRealtimeOrTlogReplicas + numPullReplicas);
+    fixShardCount(numNodes);
+    log.info("Starting ChaosMonkey test with {} shards and {} nodes", sliceCount, numNodes);
+  }
+  
+  @Test
+  public void test() throws Exception {
+    DocCollection docCollection = cloudClient.getZkStateReader().getClusterState().getCollection(DEFAULT_COLLECTION);
+    assertEquals(this.sliceCount, docCollection.getSlices().size());
+    Slice s = docCollection.getSlice("shard1");
+    assertNotNull(s);
+    assertEquals("Unexpected number of replicas. Collection: " + docCollection, numRealtimeOrTlogReplicas + numPullReplicas, s.getReplicas().size());
+    assertEquals("Unexpected number of pull replicas. Collection: " + docCollection, numPullReplicas, s.getReplicas(EnumSet.of(Replica.Type.PULL)).size());
+    assertEquals(useTlogReplicas()?0:numRealtimeOrTlogReplicas, s.getReplicas(EnumSet.of(Replica.Type.NRT)).size());
+    assertEquals(useTlogReplicas()?numRealtimeOrTlogReplicas:0, s.getReplicas(EnumSet.of(Replica.Type.TLOG)).size());
+    handle.clear();
+    handle.put("timestamp", SKIPVAL);
+    
+    // randomly turn on 1 seconds 'soft' commit
+    randomlyEnableAutoSoftCommit();
+
+    tryDelete();
+    
+    List<StoppableThread> threads = new ArrayList<>();
+    int threadCount = 2;
+    int batchSize = 1;
+    if (random().nextBoolean()) {
+      batchSize = random().nextInt(98) + 2;
+    }
+    
+    boolean pauseBetweenUpdates = TEST_NIGHTLY ? random().nextBoolean() : true;
+    int maxUpdates = -1;
+    if (!pauseBetweenUpdates) {
+      maxUpdates = 1000 + random().nextInt(1000);
+    } else {
+      maxUpdates = 15000;
+    }
+    
+    for (int i = 0; i < threadCount; i++) {
+      StoppableIndexingThread indexThread = new StoppableIndexingThread(controlClient, cloudClient, Integer.toString(i), true, maxUpdates, batchSize, pauseBetweenUpdates); // random().nextInt(999) + 1
+      threads.add(indexThread);
+      indexThread.start();
+    }
+    
+    StoppableCommitThread commitThread = new StoppableCommitThread(cloudClient, 1000, false);
+    threads.add(commitThread);
+    commitThread.start();
+    
+    chaosMonkey.startTheMonkey(false, 500);
+    try {
+      long runLength;
+      if (RUN_LENGTH != -1) {
+        runLength = RUN_LENGTH;
+      } else {
+        int[] runTimes;
+        if (TEST_NIGHTLY) {
+          runTimes = new int[] {5000, 6000, 10000, 15000, 25000, 30000,
+              30000, 45000, 90000, 120000};
+        } else {
+          runTimes = new int[] {5000, 7000, 15000};
+        }
+        runLength = runTimes[random().nextInt(runTimes.length - 1)];
+      }
+      
+      ChaosMonkey.wait(runLength, DEFAULT_COLLECTION, cloudClient.getZkStateReader());
+    } finally {
+      chaosMonkey.stopTheMonkey();
+    }
+    
+    for (StoppableThread thread : threads) {
+      thread.safeStop();
+    }
+    
+    // wait for stop...
+    for (StoppableThread thread : threads) {
+      thread.join();
+    }
+    
+    for (StoppableThread thread : threads) {
+      if (thread instanceof StoppableIndexingThread) {
+        assertEquals(0, ((StoppableIndexingThread)thread).getFailCount());
+      }
+    }
+    
+    // try and wait for any replications and what not to finish...
+
+    Thread.sleep(2000);
+
+    waitForThingsToLevelOut(180000);
+    
+    // even if things were leveled out, a jetty may have just been stopped or something
+    // we wait again and wait to level out again to make sure the system is not still in flux
+    
+    Thread.sleep(3000);
+
+    waitForThingsToLevelOut(180000);
+    
+    log.info("control docs:" + controlClient.query(new SolrQuery("*:*")).getResults().getNumFound() + "\n\n");
+    
+    waitForReplicationFromReplicas(DEFAULT_COLLECTION, cloudClient.getZkStateReader(), new TimeOut(30, TimeUnit.SECONDS));
+//    waitForAllWarmingSearchers();
+
+    checkShardConsistency(batchSize == 1, true);
+    
+    // try and make a collection to make sure the overseer has survived the expiration and session loss
+
+    // sometimes we restart zookeeper as well
+    if (random().nextBoolean()) {
+      zkServer.shutdown();
+      zkServer = new ZkTestServer(zkServer.getZkDir(), zkServer.getPort());
+      zkServer.run();
+    }
+
+    try (CloudSolrClient client = createCloudClient("collection1")) {
+        createCollection(null, "testcollection", 1, 1, 100, client, null, "conf1");
+
+    }
+    List<Integer> numShardsNumReplicas = new ArrayList<>(2);
+    numShardsNumReplicas.add(1);
+    numShardsNumReplicas.add(1 + getPullReplicaCount());
+    checkForCollection("testcollection",numShardsNumReplicas, null);
+  }
+
+  private void tryDelete() throws Exception {
+    long start = System.nanoTime();
+    long timeout = start + TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS);
+    while (System.nanoTime() < timeout) {
+      try {
+        del("*:*");
+        break;
+      } catch (SolrServerException e) {
+        // cluster may not be up yet
+        e.printStackTrace();
+      }
+      Thread.sleep(100);
+    }
+  }
+  
+  // skip the randoms - they can deadlock...
+  @Override
+  protected void indexr(Object... fields) throws Exception {
+    SolrInputDocument doc = new SolrInputDocument();
+    addFields(doc, fields);
+    addFields(doc, "rnd_b", true);
+    indexDoc(doc);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java
index ed9ed41..ea8598b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPIDistributedZkTest.java
@@ -286,7 +286,7 @@ public class CollectionsAPIDistributedZkTest extends SolrCloudTestCase {
     // first we make a core with the core name the collections api
     // will try and use - this will cause our mock fail
     Create createCmd = new Create();
-    createCmd.setCoreName("halfcollection_shard1_replica1");
+    createCmd.setCoreName(Assign.buildCoreName("halfcollection", "shard1", Replica.Type.NRT, 1));
     createCmd.setCollection("halfcollectionblocker");
     String dataDir = createTempDir().toFile().getAbsolutePath();
     createCmd.setDataDir(dataDir);
@@ -298,7 +298,7 @@ public class CollectionsAPIDistributedZkTest extends SolrCloudTestCase {
     }
 
     createCmd = new Create();
-    createCmd.setCoreName("halfcollection_shard1_replica1");
+    createCmd.setCoreName(Assign.buildCoreName("halfcollection", "shard1", Replica.Type.NRT, 1));
     createCmd.setCollection("halfcollectionblocker2");
     dataDir = createTempDir().toFile().getAbsolutePath();
     createCmd.setDataDir(dataDir);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
index 0142c7a..643660b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
@@ -67,7 +67,7 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
     Map<String, NamedList<Integer>> coresStatus = response.getCollectionCoresStatus();
     assertEquals(4, coresStatus.size());
     for (int i=0; i<4; i++) {
-      NamedList<Integer> status = coresStatus.get(collectionName + "_shard" + (i/2+1) + "_replica" + (i%2+1));
+      NamedList<Integer> status = coresStatus.get(Assign.buildCoreName(collectionName, "shard" + (i/2+1), Replica.Type.NRT, (i%2+1)));
       assertEquals(0, (int)status.get("status"));
       assertTrue(status.get("QTime") > 0);
     }
@@ -117,17 +117,17 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
 
   @Test
   public void testCreateAndDeleteShard() throws IOException, SolrServerException {
-
     // Create an implicit collection
     String collectionName = "solrj_implicit";
     CollectionAdminResponse response
-        = CollectionAdminRequest.createCollectionWithImplicitRouter(collectionName, "conf", "shardA,shardB", 1)
+        = CollectionAdminRequest.createCollectionWithImplicitRouter(collectionName, "conf", "shardA,shardB", 1, 1, 1)
+        .setMaxShardsPerNode(3)
         .process(cluster.getSolrClient());
 
     assertEquals(0, response.getStatus());
     assertTrue(response.isSuccess());
     Map<String, NamedList<Integer>> coresStatus = response.getCollectionCoresStatus();
-    assertEquals(2, coresStatus.size());
+    assertEquals(6, coresStatus.size());
 
     // Add a shard to the implicit collection
     response = CollectionAdminRequest.createShard(collectionName, "shardC").process(cluster.getSolrClient());
@@ -135,8 +135,10 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
     assertEquals(0, response.getStatus());
     assertTrue(response.isSuccess());
     coresStatus = response.getCollectionCoresStatus();
-    assertEquals(1, coresStatus.size());
-    assertEquals(0, (int) coresStatus.get(collectionName + "_shardC_replica1").get("status"));
+    assertEquals(3, coresStatus.size());
+    assertEquals(0, (int) coresStatus.get(Assign.buildCoreName(collectionName,  "shardC", Replica.Type.NRT, 1)).get("status"));
+    assertEquals(0, (int) coresStatus.get(Assign.buildCoreName(collectionName,  "shardC", Replica.Type.TLOG, 1)).get("status"));
+    assertEquals(0, (int) coresStatus.get(Assign.buildCoreName(collectionName,  "shardC", Replica.Type.PULL, 1)).get("status"));
 
     response = CollectionAdminRequest.deleteShard(collectionName, "shardC").process(cluster.getSolrClient());
 
@@ -174,8 +176,8 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
     assertEquals(0, response.getStatus());
     assertTrue(response.isSuccess());
     Map<String, NamedList<Integer>> coresStatus = response.getCollectionCoresStatus();
-    assertEquals(0, (int) coresStatus.get(collectionName + "_shard1_0_replica1").get("status"));
-    assertEquals(0, (int) coresStatus.get(collectionName + "_shard1_1_replica1").get("status"));
+    assertEquals(0, (int) coresStatus.get(Assign.buildCoreName(collectionName, "shard1_0" , Replica.Type.NRT, 1)).get("status"));
+    assertEquals(0, (int) coresStatus.get(Assign.buildCoreName(collectionName, "shard1_1" , Replica.Type.NRT, 1)).get("status"));
 
     waitForState("Expected all shards to be active and parent shard to be removed", collectionName, (n, c) -> {
       if (c.getSlice("shard1").getState() == Slice.State.ACTIVE)