You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by cp...@apache.org on 2017/05/25 17:39:41 UTC

[5/6] lucene-solr:jira/solr-8668: SOLR-10233: Add support for replica types

SOLR-10233: Add support for replica types


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/2fc41d56
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/2fc41d56
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/2fc41d56

Branch: refs/heads/jira/solr-8668
Commit: 2fc41d565a4a0408a09856a37d3be7d87414ba3f
Parents: 1802d24
Author: Tomas Fernandez Lobbe <tf...@apache.org>
Authored: Mon May 22 19:44:01 2017 -0700
Committer: Tomas Fernandez Lobbe <tf...@apache.org>
Committed: Mon May 22 19:58:51 2017 -0700

----------------------------------------------------------------------
 solr/CHANGES.txt                                |  14 +
 .../org/apache/solr/cloud/AddReplicaCmd.java    |   9 +-
 .../src/java/org/apache/solr/cloud/Assign.java  |  12 +-
 .../org/apache/solr/cloud/CloudDescriptor.java  |  25 +-
 .../apache/solr/cloud/CreateCollectionCmd.java  |  40 +-
 .../org/apache/solr/cloud/CreateShardCmd.java   |  48 +-
 .../org/apache/solr/cloud/ElectionContext.java  |  13 +-
 .../java/org/apache/solr/cloud/MigrateCmd.java  |   8 +-
 .../org/apache/solr/cloud/MoveReplicaCmd.java   |   4 +-
 .../cloud/OverseerCollectionMessageHandler.java |  91 +-
 .../org/apache/solr/cloud/RecoveryStrategy.java | 166 +++-
 .../org/apache/solr/cloud/ReplaceNodeCmd.java   |   4 +-
 .../apache/solr/cloud/ReplicateFromLeader.java  |  42 +-
 .../java/org/apache/solr/cloud/RestoreCmd.java  |  71 +-
 .../org/apache/solr/cloud/SplitShardCmd.java    |   4 +-
 .../org/apache/solr/cloud/ZkController.java     |  67 +-
 .../solr/cloud/overseer/ReplicaMutator.java     |   7 +-
 .../solr/cloud/overseer/SliceMutator.java       |  24 +-
 .../solr/cloud/overseer/ZkStateWriter.java      |   1 +
 .../apache/solr/cloud/rule/ReplicaAssigner.java |   6 +-
 .../org/apache/solr/core/CoreContainer.java     |  38 +-
 .../org/apache/solr/handler/IndexFetcher.java   |  23 +-
 .../apache/solr/handler/RealTimeGetHandler.java |  20 +-
 .../apache/solr/handler/ReplicationHandler.java |   4 +
 .../solr/handler/admin/CollectionsHandler.java  |  17 +-
 .../solr/handler/admin/CoreAdminHandler.java    |   1 +
 .../solr/handler/admin/PrepRecoveryOp.java      |  30 +-
 .../handler/component/HttpShardHandler.java     |  37 +-
 .../handler/component/RealTimeGetComponent.java |  27 +-
 .../solr/update/DirectUpdateHandler2.java       |  20 +-
 .../org/apache/solr/update/UpdateHandler.java   |   6 +-
 .../java/org/apache/solr/update/UpdateLog.java  |   7 +-
 .../processor/DistributedUpdateProcessor.java   |  51 +-
 .../org/apache/solr/util/TestInjection.java     |   6 +-
 .../solr/collection1/conf/solrconfig.xml        |   8 +-
 .../cloud-minimal/conf/solrconfig.xml           |   2 +-
 .../AbstractCloudBackupRestoreTestCase.java     |  29 +-
 .../test/org/apache/solr/cloud/AssignTest.java  |   6 +
 .../solr/cloud/BasicDistributedZk2Test.java     |   4 +-
 .../solr/cloud/BasicDistributedZkTest.java      |  10 +-
 .../cloud/ChaosMonkeyNothingIsSafeTest.java     | 136 +--
 ...MonkeyNothingIsSafeWithPullReplicasTest.java | 327 +++++++
 ...aosMonkeySafeLeaderWithPullReplicasTest.java | 254 ++++++
 .../cloud/CollectionsAPIDistributedZkTest.java  |   4 +-
 .../solr/cloud/CollectionsAPISolrJTest.java     |  18 +-
 .../org/apache/solr/cloud/DeleteNodeTest.java   |  11 +-
 .../apache/solr/cloud/DeleteReplicaTest.java    |  22 +-
 .../org/apache/solr/cloud/ForceLeaderTest.java  |   4 +-
 .../FullThrottleStoppableIndexingThread.java    | 156 ++++
 .../apache/solr/cloud/HttpPartitionTest.java    |   8 +-
 .../LeaderInitiatedRecoveryOnCommitTest.java    |   8 +-
 .../solr/cloud/OnlyLeaderIndexesTest.java       | 435 ----------
 ...verseerCollectionConfigSetProcessorTest.java |   3 +-
 .../solr/cloud/RecoveryAfterSoftCommitTest.java |   8 +-
 .../org/apache/solr/cloud/ReplaceNodeTest.java  |  21 +-
 .../solr/cloud/ReplicationFactorTest.java       |   4 +-
 .../org/apache/solr/cloud/ShardSplitTest.java   |   6 -
 .../apache/solr/cloud/TestCloudRecovery.java    |   5 +-
 .../apache/solr/cloud/TestCollectionAPI.java    |  10 +-
 .../org/apache/solr/cloud/TestPullReplica.java  | 576 +++++++++++++
 .../cloud/TestPullReplicaErrorHandling.java     | 344 ++++++++
 .../org/apache/solr/cloud/TestTlogReplica.java  | 845 +++++++++++++++++++
 .../cloud/hdfs/HdfsBasicDistributedZkTest.java  |   4 +-
 .../reporters/solr/SolrCloudReportersTest.java  |   2 +-
 .../solr/update/TestInPlaceUpdatesDistrib.java  |  10 +-
 .../solrj/impl/ConcurrentUpdateSolrClient.java  |  23 +-
 .../solrj/request/CollectionAdminRequest.java   | 121 ++-
 .../apache/solr/common/cloud/DocCollection.java |  56 +-
 .../org/apache/solr/common/cloud/Replica.java   |  32 +
 .../org/apache/solr/common/cloud/Slice.java     |  21 +-
 .../apache/solr/common/cloud/ZkStateReader.java |  16 +-
 .../solr/common/params/CoreAdminParams.java     |   5 +
 ...ollectionAdminRequestRequiredParamsTest.java |   9 +
 .../solr/BaseDistributedSearchTestCase.java     |   3 +-
 .../java/org/apache/solr/SolrTestCaseJ4.java    |  31 +-
 .../cloud/AbstractFullDistribZkTestBase.java    | 264 +++++-
 .../java/org/apache/solr/cloud/ChaosMonkey.java | 159 +++-
 .../solr/cloud/StoppableCommitThread.java       |  69 ++
 .../solr/cloud/StoppableIndexingThread.java     |   2 +-
 .../solr/cloud/StoppableSearchThread.java       |   2 +-
 80 files changed, 4026 insertions(+), 1010 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 7adac97..799df74 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -95,6 +95,20 @@ New Features
 
 * SOLR-10431: Make it possible to invoke v2 api calls using SolrJ (Cao Manh Dat, Noble Paul, shalin)
 
+* SOLR-10233: Add support for different replica types, that can handle updates differently:
+  - NRT: Writes updates to transaction log and indexes locally. Replicas of type “NRT” support NRT 
+         (soft commits) and RTG. Any NRT replica can become a leader. This was the only type supported 
+         in SolrCloud until now and it’s the default type.
+  - TLOG: Writes to transaction log, but not to index, uses replication to copy segment files from the 
+          shard leader. Any TLOG replica can become leader (by first applying all local transaction log 
+          elements). If a replica is of type TLOG but is also the leader, it will behave as a NRT. This
+          is exactly what was added in SOLR-9835 (non-realtime replicas), just the API and naming changes.
+  - PULL: Doesn’t index or writes to transaction log, just replicates from the shard leader. PULL replicas 
+          can’t become shard leaders (i.e., if there are only PULL replicas in the collection at some point, 
+          updates will fail same as if there is no leaders, queries continue to work), so they don’t even 
+          participate in elections.
+  (Tomás Fernández Löbbe)
+
 Bug Fixes
 ----------------------
 * SOLR-9262: Connection and read timeouts are being ignored by UpdateShardHandler after SOLR-4509.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
index 6bb3350..7338d9e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
@@ -72,6 +72,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
     String node = message.getStr(CoreAdminParams.NODE);
     String shard = message.getStr(SHARD_ID_PROP);
     String coreName = message.getStr(CoreAdminParams.NAME);
+    Replica.Type replicaType = Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()));
     boolean parallel = message.getBool("parallel", false);
     if (StringUtils.isBlank(coreName)) {
       coreName = message.getStr(CoreAdminParams.PROPERTY_PREFIX + CoreAdminParams.NAME);
@@ -93,7 +94,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
     // Kind of unnecessary, but it does put the logic of whether to override maxShardsPerNode in one place.
     if (!skipCreateReplicaInClusterState) {
       node = getNodesForNewReplicas(clusterState, collection, shard, 1, node,
-          ocmh.overseer.getZkController().getCoreContainer()).get(0).nodeName;
+          ocmh.overseer.getZkController().getCoreContainer()).get(0).nodeName;// TODO: use replica type in this logic too
     }
     log.info("Node Identified {} for creating new replica", node);
 
@@ -101,7 +102,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Node: " + node + " is not live");
     }
     if (coreName == null) {
-      coreName = Assign.buildCoreName(coll, shard);
+      coreName = Assign.buildCoreName(coll, shard, replicaType);
     } else if (!skipCreateReplicaInClusterState) {
       //Validate that the core name is unique in that collection
       for (Slice slice : coll.getSlices()) {
@@ -126,7 +127,8 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
             ZkStateReader.CORE_NAME_PROP, coreName,
             ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
             ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(node),
-            ZkStateReader.NODE_NAME_PROP, node);
+            ZkStateReader.NODE_NAME_PROP, node,
+            ZkStateReader.REPLICA_TYPE, replicaType.name());
         Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
       }
       params.set(CoreAdminParams.CORE_NODE_NAME,
@@ -142,6 +144,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
     params.set(CoreAdminParams.NAME, coreName);
     params.set(COLL_CONF, configName);
     params.set(CoreAdminParams.COLLECTION, collection);
+    params.set(CoreAdminParams.REPLICA_TYPE, replicaType.name());
     if (shard != null) {
       params.set(CoreAdminParams.SHARD, shard);
     } else if (routeKey != null) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/Assign.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Assign.java b/solr/core/src/java/org/apache/solr/cloud/Assign.java
index ba03ccd..265e453 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Assign.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Assign.java
@@ -107,12 +107,17 @@ public class Assign {
     returnShardId = shardIdNames.get(0);
     return returnShardId;
   }
+  
+  public static String buildCoreName(String collectionName, String shard, Replica.Type type, int replicaNum) {
+    // TODO: Adding the suffix is great for debugging, but may be an issue if at some point we want to support a way to change replica type
+    return String.format(Locale.ROOT, "%s_%s_replica_%s%s", collectionName, shard, type.name().substring(0,1).toLowerCase(Locale.ROOT), replicaNum);
+  }
 
-  static String buildCoreName(DocCollection collection, String shard) {
+  public static String buildCoreName(DocCollection collection, String shard, Replica.Type type) {
     Slice slice = collection.getSlice(shard);
     int replicaNum = slice.getReplicas().size();
     for (; ; ) {
-      String replicaName = collection.getName() + "_" + shard + "_replica" + replicaNum;
+      String replicaName = buildCoreName(collection.getName(), shard, type, replicaNum);
       boolean exists = false;
       for (Replica replica : slice.getReplicas()) {
         if (replicaName.equals(replica.getStr(CORE_NAME_PROP))) {
@@ -121,9 +126,8 @@ public class Assign {
         }
       }
       if (exists) replicaNum++;
-      else break;
+      else return replicaName;
     }
-    return collection.getName() + "_" + shard + "_replica" + replicaNum;
   }
 
   static class ReplicaCount {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java b/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
index 719b1d1..32cb65b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
@@ -20,12 +20,13 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
-import com.google.common.base.Strings;
 import org.apache.solr.common.StringUtils;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.util.PropertiesUtil;
 
+import com.google.common.base.Strings;
+
 public class CloudDescriptor {
 
   private final CoreDescriptor cd;
@@ -44,6 +45,13 @@ public class CloudDescriptor {
   volatile Replica.State lastPublished = Replica.State.ACTIVE;
 
   public static final String NUM_SHARDS = "numShards";
+  
+  public static final String REPLICA_TYPE = "replicaType";
+  
+  /**
+   * The type of replica this core hosts
+   */
+  private final Replica.Type replicaType;
 
   public CloudDescriptor(String coreName, Properties props, CoreDescriptor cd) {
     this.cd = cd;
@@ -57,7 +65,12 @@ public class CloudDescriptor {
     if (Strings.isNullOrEmpty(nodeName))
       this.nodeName = null;
     this.numShards = PropertiesUtil.toInteger(props.getProperty(CloudDescriptor.NUM_SHARDS), null);
-
+    String replicaTypeStr = props.getProperty(CloudDescriptor.REPLICA_TYPE);
+    if (Strings.isNullOrEmpty(replicaTypeStr)) {
+      this.replicaType = Replica.Type.NRT;
+    } else {
+      this.replicaType = Replica.Type.valueOf(replicaTypeStr);
+    }
     for (String propName : props.stringPropertyNames()) {
       if (propName.startsWith(ZkController.COLLECTION_PARAM_PREFIX)) {
         collectionParams.put(propName.substring(ZkController.COLLECTION_PARAM_PREFIX.length()), props.getProperty(propName));
@@ -65,6 +78,10 @@ public class CloudDescriptor {
     }
   }
   
+  public boolean requiresTransactionLog() {
+    return this.replicaType != Replica.Type.PULL;
+  }
+  
   public Replica.State getLastPublished() {
     return lastPublished;
   }
@@ -155,4 +172,8 @@ public class CloudDescriptor {
       collectionParams.put(ent.getKey(), ent.getValue());
     }
   }
+
+  public Replica.Type getReplicaType() {
+    return replicaType;
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
index a1bb70e..3d1a54e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
@@ -60,8 +60,7 @@ import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
 import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET;
 import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
 import static org.apache.solr.cloud.OverseerCollectionMessageHandler.RANDOM;
-import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
+import static org.apache.solr.common.cloud.ZkStateReader.*;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
 import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
 import static org.apache.solr.common.params.CommonParams.NAME;
@@ -96,7 +95,9 @@ public class CreateCollectionCmd implements Cmd {
       // look at the replication factor and see if it matches reality
       // if it does not, find best nodes to create more cores
 
-      int repFactor = message.getInt(REPLICATION_FACTOR, 1);
+      int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, 1));
+      int numPullReplicas = message.getInt(PULL_REPLICAS, 0);
+      int numTlogReplicas = message.getInt(TLOG_REPLICAS, 0);
 
       ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
       final String async = message.getStr(ASYNC);
@@ -116,8 +117,8 @@ public class CreateCollectionCmd implements Cmd {
 
       int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, 1);
 
-      if (repFactor <= 0) {
-        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, REPLICATION_FACTOR + " must be greater than 0");
+      if (numNrtReplicas + numTlogReplicas <= 0) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0");
       }
 
       if (numSlices <= 0) {
@@ -135,32 +136,33 @@ public class CreateCollectionCmd implements Cmd {
 
         positionVsNodes = new HashMap<>();
       } else {
-        if (repFactor > nodeList.size()) {
-          log.warn("Specified "
-              + REPLICATION_FACTOR
-              + " of "
-              + repFactor
+        int totalNumReplicas = numNrtReplicas + numTlogReplicas + numPullReplicas;
+        if (totalNumReplicas > nodeList.size()) {
+          log.warn("Specified number of replicas of "
+              + totalNumReplicas
               + " on collection "
               + collectionName
-              + " is higher than or equal to the number of Solr instances currently live or live and part of your " + CREATE_NODE_SET + "("
+              + " is higher than the number of Solr instances currently live or live and part of your " + CREATE_NODE_SET + "("
               + nodeList.size()
               + "). It's unusual to run two replica of the same slice on the same Solr-instance.");
         }
 
         int maxShardsAllowedToCreate = maxShardsPerNode * nodeList.size();
-        int requestedShardsToCreate = numSlices * repFactor;
+        int requestedShardsToCreate = numSlices * totalNumReplicas;
         if (maxShardsAllowedToCreate < requestedShardsToCreate) {
           throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create collection " + collectionName + ". Value of "
               + MAX_SHARDS_PER_NODE + " is " + maxShardsPerNode
               + ", and the number of nodes currently live or live and part of your "+CREATE_NODE_SET+" is " + nodeList.size()
               + ". This allows a maximum of " + maxShardsAllowedToCreate
               + " to be created. Value of " + NUM_SLICES + " is " + numSlices
-              + " and value of " + REPLICATION_FACTOR + " is " + repFactor
+              + ", value of " + NRT_REPLICAS + " is " + numNrtReplicas
+              + ", value of " + TLOG_REPLICAS + " is " + numTlogReplicas
+              + " and value of " + PULL_REPLICAS + " is " + numPullReplicas
               + ". This requires " + requestedShardsToCreate
               + " shards to be created (higher than the allowed number)");
         }
 
-        positionVsNodes = ocmh.identifyNodes(clusterState, nodeList, message, shardNames, repFactor);
+        positionVsNodes = ocmh.identifyNodes(clusterState, nodeList, message, shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas);
       }
 
       ZkStateReader zkStateReader = ocmh.zkStateReader;
@@ -200,13 +202,13 @@ public class CreateCollectionCmd implements Cmd {
       Map<String, String> requestMap = new HashMap<>();
 
 
-      log.debug(formatString("Creating SolrCores for new collection {0}, shardNames {1} , replicationFactor : {2}",
-          collectionName, shardNames, repFactor));
+      log.debug(formatString("Creating SolrCores for new collection {0}, shardNames {1} , nrtReplicas : {2}, tlogReplicas: {3}, pullReplicas: {4}",
+          collectionName, shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas));
       Map<String,ShardRequest> coresToCreate = new LinkedHashMap<>();
       for (Map.Entry<ReplicaAssigner.Position, String> e : positionVsNodes.entrySet()) {
         ReplicaAssigner.Position position = e.getKey();
         String nodeName = e.getValue();
-        String coreName = collectionName + "_" + position.shard + "_replica" + (position.index + 1);
+        String coreName = Assign.buildCoreName(collectionName, position.shard, position.type, position.index + 1);
         log.debug(formatString("Creating core {0} as part of shard {1} of collection {2} on {3}"
             , coreName, position.shard, collectionName, nodeName));
 
@@ -221,7 +223,8 @@ public class CreateCollectionCmd implements Cmd {
               ZkStateReader.SHARD_ID_PROP, position.shard,
               ZkStateReader.CORE_NAME_PROP, coreName,
               ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
-              ZkStateReader.BASE_URL_PROP, baseUrl);
+              ZkStateReader.BASE_URL_PROP, baseUrl, 
+              ZkStateReader.REPLICA_TYPE, position.type.name());
           Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
         }
 
@@ -235,6 +238,7 @@ public class CreateCollectionCmd implements Cmd {
         params.set(CoreAdminParams.SHARD, position.shard);
         params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
         params.set(CoreAdminParams.NEW_COLLECTION, "true");
+        params.set(CoreAdminParams.REPLICA_TYPE, position.type.name());
 
         if (async != null) {
           String coreAdminAsyncId = async + Math.abs(System.nanoTime());

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java
index 52df32b..d3eb828 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java
@@ -28,6 +28,7 @@ import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CoreAdminParams;
@@ -41,7 +42,10 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.solr.cloud.Assign.getNodesForNewReplicas;
 import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
+import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
 import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
 import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
 import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
@@ -67,9 +71,18 @@ public class CreateShardCmd implements Cmd {
 
     ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
     DocCollection collection = clusterState.getCollection(collectionName);
-    int repFactor = message.getInt(REPLICATION_FACTOR, collection.getInt(REPLICATION_FACTOR, 1));
+//    int repFactor = message.getInt(REPLICATION_FACTOR, collection.getInt(REPLICATION_FACTOR, 1));
+    int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, collection.getInt(NRT_REPLICAS, collection.getInt(REPLICATION_FACTOR, 1))));
+    int numPullReplicas = message.getInt(PULL_REPLICAS, collection.getInt(PULL_REPLICAS, 0));
+    int numTlogReplicas = message.getInt(TLOG_REPLICAS, collection.getInt(TLOG_REPLICAS, 0));
+    int totalReplicas = numNrtReplicas + numPullReplicas + numTlogReplicas;
+    
+    if (numNrtReplicas + numTlogReplicas <= 0) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0");
+    }
+    
     Object createNodeSetStr = message.get(OverseerCollectionMessageHandler.CREATE_NODE_SET);
-    List<Assign.ReplicaCount> sortedNodeList = getNodesForNewReplicas(clusterState, collectionName, sliceName, repFactor,
+    List<Assign.ReplicaCount> sortedNodeList = getNodesForNewReplicas(clusterState, collectionName, sliceName, totalReplicas,
         createNodeSetStr, ocmh.overseer.getZkController().getCoreContainer());
 
     ZkStateReader zkStateReader = ocmh.zkStateReader;
@@ -90,19 +103,38 @@ public class CreateShardCmd implements Cmd {
     String async = message.getStr(ASYNC);
     Map<String, String> requestMap = null;
     if (async != null) {
-      requestMap = new HashMap<>(repFactor, 1.0f);
+      requestMap = new HashMap<>(totalReplicas, 1.0f);
     }
-
-    for (int j = 1; j <= repFactor; j++) {
+    
+    int createdNrtReplicas = 0, createdTlogReplicas = 0, createdPullReplicas = 0;
+
+    for (int j = 1; j <= totalReplicas; j++) {
+      int coreNameNumber;
+      Replica.Type typeToCreate;
+      if (createdNrtReplicas < numNrtReplicas) {
+        createdNrtReplicas++;
+        coreNameNumber = createdNrtReplicas;
+        typeToCreate = Replica.Type.NRT;
+      } else if (createdTlogReplicas < numTlogReplicas) {
+        createdTlogReplicas++;
+        coreNameNumber = createdTlogReplicas;
+        typeToCreate = Replica.Type.TLOG;
+      } else {
+        createdPullReplicas++;
+        coreNameNumber = createdPullReplicas;
+        typeToCreate = Replica.Type.PULL;
+      }
       String nodeName = sortedNodeList.get(((j - 1)) % sortedNodeList.size()).nodeName;
-      String shardName = collectionName + "_" + sliceName + "_replica" + j;
-      log.info("Creating shard " + shardName + " as part of slice " + sliceName + " of collection " + collectionName
+      String coreName = Assign.buildCoreName(collectionName, sliceName, typeToCreate, coreNameNumber);
+//      String coreName = collectionName + "_" + sliceName + "_replica" + j;
+      log.info("Creating replica " + coreName + " as part of slice " + sliceName + " of collection " + collectionName
           + " on " + nodeName);
 
       // Need to create new params for each request
       ModifiableSolrParams params = new ModifiableSolrParams();
       params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.CREATE.toString());
-      params.set(CoreAdminParams.NAME, shardName);
+      params.set(CoreAdminParams.NAME, coreName);
+      params.set(CoreAdminParams.REPLICA_TYPE, typeToCreate.name());
       params.set(COLL_CONF, configName);
       params.set(CoreAdminParams.COLLECTION, collectionName);
       params.set(CoreAdminParams.SHARD, sliceName);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
index bdbeca9..588262d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
@@ -20,6 +20,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -326,6 +327,8 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
         return;
       }
       
+      Replica.Type replicaType;
+      
       try (SolrCore core = cc.getCore(coreName)) {
         
         if (core == null) {
@@ -338,6 +341,8 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
           }
         }
         
+        replicaType = core.getCoreDescriptor().getCloudDescriptor().getReplicaType();
+        
         // should I be leader?
         if (weAreReplacement && !shouldIBeLeader(leaderProps, core, weAreReplacement)) {
           rejoinLeaderElection(core);
@@ -423,9 +428,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
         try {
           // we must check LIR before registering as leader
           checkLIR(coreName, allReplicasInLine);
-
-          boolean onlyLeaderIndexes = zkController.getClusterState().getCollection(collection).getRealtimeReplicas() == 1;
-          if (onlyLeaderIndexes) {
+          if (replicaType == Replica.Type.TLOG) {
             // stop replicate from old leader
             zkController.stopReplicationFromLeader(coreName);
             if (weAreReplacement) {
@@ -621,7 +624,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
         }
         
         // on startup and after connection timeout, wait for all known shards
-        if (found >= slices.getReplicasMap().size()) {
+        if (found >= slices.getReplicas(EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT)).size()) {
           log.info("Enough replicas found to continue.");
           return true;
         } else {
@@ -629,7 +632,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
             log.info("Waiting until we see more replicas up for shard {}: total={}"
               + " found={}"
               + " timeoutin={}ms",
-                shardId, slices.getReplicasMap().size(), found,
+                shardId, slices.getReplicas(EnumSet.of(Replica.Type.TLOG, Replica.Type.NRT)).size(), found,
                 TimeUnit.MILLISECONDS.convert(timeoutAt - System.nanoTime(), TimeUnit.NANOSECONDS));
           }
         }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java b/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java
index 7b1ad2c..0ea5d6e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java
@@ -51,7 +51,7 @@ import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_PROP_P
 import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET;
 import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
 import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
+import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
 import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
@@ -208,7 +208,7 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
     Map<String, Object> props = makeMap(
         Overseer.QUEUE_OPERATION, CREATE.toLower(),
         NAME, tempSourceCollectionName,
-        REPLICATION_FACTOR, 1,
+        NRT_REPLICAS, 1,
         NUM_SLICES, 1,
         COLL_CONF, configName,
         CREATE_NODE_SET, sourceLeader.getNodeName());
@@ -224,7 +224,7 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
     Slice tempSourceSlice = clusterState.getCollection(tempSourceCollectionName).getSlices().iterator().next();
     Replica tempSourceLeader = zkStateReader.getLeaderRetry(tempSourceCollectionName, tempSourceSlice.getName(), 120000);
 
-    String tempCollectionReplica1 = tempSourceCollectionName + "_" + tempSourceSlice.getName() + "_replica1";
+    String tempCollectionReplica1 = Assign.buildCoreName(tempSourceCollectionName, tempSourceSlice.getName(), Replica.Type.NRT, 1);
     String coreNodeName = ocmh.waitForCoreNodeName(tempSourceCollectionName,
         sourceLeader.getNodeName(), tempCollectionReplica1);
     // wait for the replicas to be seen as active on temp source leader
@@ -257,7 +257,7 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
 
     log.info("Creating a replica of temporary collection: {} on the target leader node: {}",
         tempSourceCollectionName, targetLeader.getNodeName());
-    String tempCollectionReplica2 = tempSourceCollectionName + "_" + tempSourceSlice.getName() + "_replica2";
+    String tempCollectionReplica2 = Assign.buildCoreName(tempSourceCollectionName, tempSourceSlice.getName(), Replica.Type.NRT, 2); 
     props = new HashMap<>();
     props.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
     props.put(COLLECTION_PROP, tempSourceCollectionName);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
index 545989e..fed1398 100644
--- a/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
@@ -111,7 +111,7 @@ public class MoveReplicaCmd implements Cmd{
 
   private void moveHdfsReplica(ClusterState clusterState, NamedList results, String dataDir, String targetNode, String async,
                                  DocCollection coll, Replica replica, Slice slice) throws Exception {
-    String newCoreName = Assign.buildCoreName(coll, slice.getName());
+    String newCoreName = Assign.buildCoreName(coll, slice.getName(), replica.getType());
 
     ZkNodeProps removeReplicasProps = new ZkNodeProps(
         COLLECTION_PROP, coll.getName(),
@@ -155,7 +155,7 @@ public class MoveReplicaCmd implements Cmd{
 
   private void moveNormalReplica(ClusterState clusterState, NamedList results, String targetNode, String async,
                                  DocCollection coll, Replica replica, Slice slice) throws Exception {
-    String newCoreName = Assign.buildCoreName(coll, slice.getName());
+    String newCoreName = Assign.buildCoreName(coll, slice.getName(), replica.getType());
     ZkNodeProps addReplicasProps = new ZkNodeProps(
         COLLECTION_PROP, coll.getName(),
         SHARD_ID_PROP, slice.getName(),

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
index 9b83140..1d51df7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
@@ -16,6 +16,51 @@
  */
 package org.apache.solr.cloud;
 
+import static org.apache.solr.common.cloud.DocCollection.SNITCH;
+import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICAPROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.BACKUP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESHARDUNIQUE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATEALIAS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESNAPSHOT;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEALIAS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETENODE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICAPROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESNAPSHOT;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATESTATEFORMAT;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_COLL_TASK;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_REPLICA_TASK;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_SHARD_TASK;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MODIFYCOLLECTION;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.RELOAD;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.REPLACENODE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.RESTORE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.SPLITSHARD;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.util.Utils.makeMap;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
@@ -33,7 +78,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.collect.ImmutableMap;
 import org.apache.commons.lang.StringUtils;
 import org.apache.solr.client.solrj.SolrResponse;
 import org.apache.solr.client.solrj.SolrServerException;
@@ -80,21 +124,7 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.common.cloud.DocCollection.SNITCH;
-import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-import static org.apache.solr.common.params.CommonParams.NAME;
-import static org.apache.solr.common.util.Utils.makeMap;
+import com.google.common.collect.ImmutableMap;
 
 /**
  * A {@link OverseerMessageHandler} that handles Collections API related
@@ -130,9 +160,11 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
   public static final Map<String, Object> COLL_PROPS = Collections.unmodifiableMap(makeMap(
       ROUTER, DocRouter.DEFAULT_NAME,
       ZkStateReader.REPLICATION_FACTOR, "1",
+      ZkStateReader.NRT_REPLICAS, "1",
+      ZkStateReader.TLOG_REPLICAS, "0",
+      ZkStateReader.PULL_REPLICAS, "0",
       ZkStateReader.MAX_SHARDS_PER_NODE, "1",
       ZkStateReader.AUTO_ADD_REPLICAS, "false",
-      ZkStateReader.REALTIME_REPLICAS, "-1",
       DocCollection.RULE, null,
       SNITCH, null));
 
@@ -702,18 +734,33 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
                                       List<String> nodeList,
                                       ZkNodeProps message,
                                       List<String> shardNames,
-                                      int repFactor) throws IOException {
+                                      int numNrtReplicas, 
+                                      int numTlogReplicas,
+                                      int numPullReplicas) throws IOException {
     List<Map> rulesMap = (List) message.get("rule");
     if (rulesMap == null) {
       int i = 0;
       Map<Position, String> result = new HashMap<>();
       for (String aShard : shardNames) {
-        for (int j = 0; j < repFactor; j++){
-          result.put(new Position(aShard, j), nodeList.get(i % nodeList.size()));
+        for (int j = 0; j < numNrtReplicas; j++){
+          result.put(new Position(aShard, j, Replica.Type.NRT), nodeList.get(i % nodeList.size()));
+          i++;
+        }
+        for (int j = 0; j < numTlogReplicas; j++){
+          result.put(new Position(aShard, j, Replica.Type.TLOG), nodeList.get(i % nodeList.size()));
+          i++;
+        }
+        for (int j = 0; j < numPullReplicas; j++){
+          result.put(new Position(aShard, j, Replica.Type.PULL), nodeList.get(i % nodeList.size()));
           i++;
         }
       }
       return result;
+    } else {
+      if (numTlogReplicas + numPullReplicas != 0) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, 
+            Replica.Type.TLOG + " or " + Replica.Type.PULL + " replica types not supported with placement rules");
+      }
     }
 
     List<Rule> rules = new ArrayList<>();
@@ -721,7 +768,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
 
     Map<String, Integer> sharVsReplicaCount = new HashMap<>();
 
-    for (String shard : shardNames) sharVsReplicaCount.put(shard, repFactor);
+    for (String shard : shardNames) sharVsReplicaCount.put(shard, numNrtReplicas);
     ReplicaAssigner replicaAssigner = new ReplicaAssigner(rules,
         sharVsReplicaCount,
         (List<Map>) message.get(SNITCH),
@@ -752,6 +799,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
       
       if (result.size() == coreNames.size()) {
         return result;
+      } else {
+        log.debug("Expecting {} cores but found {}", coreNames.size(), result.size());
       }
       if (timeout.hasTimedOut()) {
         throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out waiting to see all replicas: " + coreNames + " in cluster state.");

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 7599b05..3449935 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -118,7 +118,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
   private boolean recoveringAfterStartup;
   private CoreContainer cc;
   private volatile HttpUriRequest prevSendPreRecoveryHttpUriRequest;
-  private boolean onlyLeaderIndexes;
+  private final Replica.Type replicaType;
 
   protected RecoveryStrategy(CoreContainer cc, CoreDescriptor cd, RecoveryListener recoveryListener) {
     this.cc = cc;
@@ -128,8 +128,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
     zkStateReader = zkController.getZkStateReader();
     baseUrl = zkController.getBaseUrl();
     coreZkNodeName = cd.getCloudDescriptor().getCoreNodeName();
-    String collection = cd.getCloudDescriptor().getCollectionName();
-    onlyLeaderIndexes = zkStateReader.getClusterState().getCollection(collection).getRealtimeReplicas() == 1;
+    replicaType = cd.getCloudDescriptor().getReplicaType();
   }
 
   final public int getWaitForUpdatesWithStaleStatePauseMilliSeconds() {
@@ -262,7 +261,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
       UpdateRequest ureq = new UpdateRequest();
       ureq.setParams(new ModifiableSolrParams());
       ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
-      ureq.getParams().set(UpdateParams.OPEN_SEARCHER, onlyLeaderIndexes);
+//      ureq.getParams().set(UpdateParams.OPEN_SEARCHER, onlyLeaderIndexes);// Why do we need to open searcher if "onlyLeaderIndexes"?
+      ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
       ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(
           client);
     }
@@ -296,9 +296,151 @@ public class RecoveryStrategy implements Runnable, Closeable {
       MDCLoggingContext.clear();
     }
   }
+  
+  final public void doRecovery(SolrCore core) throws KeeperException, InterruptedException {
+    if (core.getCoreDescriptor().getCloudDescriptor().requiresTransactionLog()) {
+      doSyncOrReplicateRecovery(core);
+    } else {
+      doReplicateOnlyRecovery(core);
+    }
+  }
+
+  final private void doReplicateOnlyRecovery(SolrCore core) throws InterruptedException {
+    boolean successfulRecovery = false;
+
+//  if (core.getUpdateHandler().getUpdateLog() != null) {
+//    SolrException.log(LOG, "'replicate-only' recovery strategy should only be used if no update logs are present, but this core has one: "
+//        + core.getUpdateHandler().getUpdateLog());
+//    return;
+//  }
+  while (!successfulRecovery && !Thread.currentThread().isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
+    try {
+      CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
+      ZkNodeProps leaderprops = zkStateReader.getLeaderRetry(
+          cloudDesc.getCollectionName(), cloudDesc.getShardId());
+      final String leaderBaseUrl = leaderprops.getStr(ZkStateReader.BASE_URL_PROP);
+      final String leaderCoreName = leaderprops.getStr(ZkStateReader.CORE_NAME_PROP);
+
+      String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName);
+
+      String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
+
+      boolean isLeader = leaderUrl.equals(ourUrl); //TODO: We can probably delete most of this code if we say this strategy can only be used for pull replicas
+      if (isLeader && !cloudDesc.isLeader()) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
+      }
+      if (cloudDesc.isLeader()) {
+        assert cloudDesc.getReplicaType() != Replica.Type.PULL;
+        // we are now the leader - no one else must have been suitable
+        LOG.warn("We have not yet recovered - but we are now the leader!");
+        LOG.info("Finished recovery process.");
+        zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
+        return;
+      }
+      
+      
+      LOG.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leaderUrl,
+          ourUrl);
+      zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING);
+      
+      if (isClosed()) {
+        LOG.info("Recovery for core {} has been closed", core.getName());
+        break;
+      }
+      LOG.info("Starting Replication Recovery.");
+
+      try {
+        LOG.info("Stopping background replicate from leader process");
+        zkController.stopReplicationFromLeader(coreName);
+        replicate(zkController.getNodeName(), core, leaderprops);
+
+        if (isClosed()) {
+          LOG.info("Recovery for core {} has been closed", core.getName());
+          break;
+        }
+
+        LOG.info("Replication Recovery was successful.");
+        successfulRecovery = true;
+      } catch (Exception e) {
+        SolrException.log(LOG, "Error while trying to recover", e);
+      }
+
+    } catch (Exception e) {
+      SolrException.log(LOG, "Error while trying to recover. core=" + coreName, e);
+    } finally {
+      if (successfulRecovery) {
+        LOG.info("Restaring background replicate from leader process");
+        zkController.startReplicationFromLeader(coreName, false);
+        LOG.info("Registering as Active after recovery.");
+        try {
+          zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
+        } catch (Exception e) {
+          LOG.error("Could not publish as ACTIVE after succesful recovery", e);
+          successfulRecovery = false;
+        }
+        
+        if (successfulRecovery) {
+          close = true;
+          recoveryListener.recovered();
+        }
+      }
+    }
+
+    if (!successfulRecovery) {
+      // lets pause for a moment and we need to try again...
+      // TODO: we don't want to retry for some problems?
+      // Or do a fall off retry...
+      try {
+
+        if (isClosed()) {
+          LOG.info("Recovery for core {} has been closed", core.getName());
+          break;
+        }
+        
+        LOG.error("Recovery failed - trying again... (" + retries + ")");
+        
+        retries++;
+        if (retries >= maxRetries) {
+          SolrException.log(LOG, "Recovery failed - max retries exceeded (" + retries + ").");
+          try {
+            recoveryFailed(core, zkController, baseUrl, coreZkNodeName, core.getCoreDescriptor());
+          } catch (Exception e) {
+            SolrException.log(LOG, "Could not publish that recovery failed", e);
+          }
+          break;
+        }
+      } catch (Exception e) {
+        SolrException.log(LOG, "An error has occurred during recovery", e);
+      }
+
+      try {
+        // Wait an exponential interval between retries, start at 5 seconds and work up to a minute.
+        // If we're at attempt >= 4, there's no point computing pow(2, retries) because the result 
+        // will always be the minimum of the two (12). Since we sleep at 5 seconds sub-intervals in
+        // order to check if we were closed, 12 is chosen as the maximum loopCount (5s * 12 = 1m).
+        double loopCount = retries < 4 ? Math.min(Math.pow(2, retries), 12) : 12;
+        LOG.info("Wait [{}] seconds before trying to recover again (attempt={})", loopCount, retries);
+        for (int i = 0; i < loopCount; i++) {
+          if (isClosed()) {
+            LOG.info("Recovery for core {} has been closed", core.getName());
+            break; // check if someone closed us
+          }
+          Thread.sleep(startingRecoveryDelayMilliSeconds);
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOG.warn("Recovery was interrupted.", e);
+        close = true;
+      }
+    }
+
+  }
+  // We skip core.seedVersionBuckets(); We don't have a transaction log
+  LOG.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery));
+}
 
   // TODO: perhaps make this grab a new core each time through the loop to handle core reloads?
-  final public void doRecovery(SolrCore core) throws KeeperException, InterruptedException {
+  final public void doSyncOrReplicateRecovery(SolrCore core) throws KeeperException, InterruptedException {
     boolean replayed = false;
     boolean successfulRecovery = false;
 
@@ -310,9 +452,9 @@ public class RecoveryStrategy implements Runnable, Closeable {
           core.getCoreDescriptor());
       return;
     }
-
-    // we temporary ignore peersync for realtimeReplicas mode
-    boolean firstTime = !onlyLeaderIndexes;
+    
+    // we temporary ignore peersync for tlog replicas
+    boolean firstTime = replicaType != Replica.Type.TLOG;
 
     List<Long> recentVersions;
     try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
@@ -364,7 +506,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
       }
     }
 
-    if (onlyLeaderIndexes) {
+    if (replicaType == Replica.Type.TLOG) {
       zkController.stopReplicationFromLeader(coreName);
     }
 
@@ -521,8 +663,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
         if (successfulRecovery) {
           LOG.info("Registering as Active after recovery.");
           try {
-            if (onlyLeaderIndexes) {
-              zkController.startReplicationFromLeader(coreName);
+            if (replicaType == Replica.Type.TLOG) {
+              zkController.startReplicationFromLeader(coreName, true);
             }
             zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
           } catch (Exception e) {
@@ -604,7 +746,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
     if (testing_beforeReplayBufferingUpdates != null) {
       testing_beforeReplayBufferingUpdates.run();
     }
-    if (onlyLeaderIndexes) {
+    if (replicaType == Replica.Type.TLOG) {
       // roll over all updates during buffering to new tlog, make RTG available
       SolrQueryRequest req = new LocalSolrQueryRequest(core,
           new ModifiableSolrParams());

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
index 92c9afe..e4240be 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
@@ -154,9 +154,9 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
                 SHARD_ID_PROP, slice.getName(),
                 ZkStateReader.CORE_NAME_PROP, replica.getCoreName(),
                 ZkStateReader.REPLICA_PROP, replica.getName(),
+                ZkStateReader.REPLICA_TYPE, replica.getType().name(),
                 CoreAdminParams.NODE, source);
-            sourceReplicas.add(props
-            );
+            sourceReplicas.add(props);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
index 817b371..0800e0f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
@@ -49,7 +49,12 @@ public class ReplicateFromLeader {
     this.coreName = coreName;
   }
 
-  public void startReplication() throws InterruptedException {
+  /**
+   * Start a replication handler thread that will periodically pull indices from the shard leader
+   * @param switchTransactionLog if true, ReplicationHandler will rotate the transaction log once
+   * the replication is done
+   */
+  public void startReplication(boolean switchTransactionLog) throws InterruptedException {
     try (SolrCore core = cc.getCore(coreName)) {
       if (core == null) {
         if (cc.isShutDown()) {
@@ -65,6 +70,7 @@ public class ReplicateFromLeader {
       } else if (uinfo.autoSoftCommmitMaxTime != -1) {
         pollIntervalStr = toPollIntervalStr(uinfo.autoSoftCommmitMaxTime/2);
       }
+      LOG.info("Will start replication from leader with poll interval: {}", pollIntervalStr );
 
       NamedList slaveConfig = new NamedList();
       slaveConfig.add("fetchFromLeader", true);
@@ -78,20 +84,22 @@ public class ReplicateFromLeader {
       }
 
       replicationProcess = new ReplicationHandler();
-      replicationProcess.setPollListener((solrCore, pollSuccess) -> {
-        if (pollSuccess) {
-          String commitVersion = getCommitVersion(core);
-          if (commitVersion == null) return;
-          if (Long.parseLong(commitVersion) == lastVersion) return;
-          UpdateLog updateLog = solrCore.getUpdateHandler().getUpdateLog();
-          SolrQueryRequest req = new LocalSolrQueryRequest(core,
-              new ModifiableSolrParams());
-          CommitUpdateCommand cuc = new CommitUpdateCommand(req, false);
-          cuc.setVersion(Long.parseLong(commitVersion));
-          updateLog.copyOverOldUpdates(cuc);
-          lastVersion = Long.parseLong(commitVersion);
-        }
-      });
+      if (switchTransactionLog) {
+        replicationProcess.setPollListener((solrCore, pollSuccess) -> {
+          if (pollSuccess) {
+            String commitVersion = getCommitVersion(core);
+            if (commitVersion == null) return;
+            if (Long.parseLong(commitVersion) == lastVersion) return;
+            UpdateLog updateLog = solrCore.getUpdateHandler().getUpdateLog();
+            SolrQueryRequest req = new LocalSolrQueryRequest(core,
+                new ModifiableSolrParams());
+            CommitUpdateCommand cuc = new CommitUpdateCommand(req, false);
+            cuc.setVersion(Long.parseLong(commitVersion));
+            updateLog.copyOverOldUpdates(cuc);
+            lastVersion = Long.parseLong(commitVersion);
+          }
+        });
+      }
       replicationProcess.init(replicationConfig);
       replicationProcess.inform(core);
     }
@@ -119,6 +127,8 @@ public class ReplicateFromLeader {
   }
 
   public void stopReplication() {
-    replicationProcess.close();
+    if (replicationProcess != null) {
+      replicationProcess.close();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
index 23b2fb5..76c12b8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
@@ -39,6 +39,7 @@ import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
@@ -63,11 +64,15 @@ import static org.apache.solr.cloud.OverseerCollectionMessageHandler.RANDOM;
 import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARDS_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_TYPE;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
 import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
 import static org.apache.solr.common.params.CommonParams.NAME;
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
+import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
 
 public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
@@ -107,15 +112,23 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
         zkStateReader.getClusterState().getLiveNodes(), message, RANDOM);
 
     int numShards = backupCollectionState.getActiveSlices().size();
-    int repFactor = message.getInt(REPLICATION_FACTOR, backupCollectionState.getReplicationFactor());
+    
+    int numNrtReplicas = getInt(message, NRT_REPLICAS, backupCollectionState.getNumNrtReplicas(), 0);
+    if (numNrtReplicas == 0) {
+      numNrtReplicas = getInt(message, REPLICATION_FACTOR, backupCollectionState.getReplicationFactor(), 0);
+    }
+    int numTlogReplicas = getInt(message, TLOG_REPLICAS, backupCollectionState.getNumTlogReplicas(), 0);
+    int numPullReplicas = getInt(message, PULL_REPLICAS, backupCollectionState.getNumPullReplicas(), 0);
+    int totalReplicasPerShard = numNrtReplicas + numTlogReplicas + numPullReplicas;
+    
     int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, backupCollectionState.getMaxShardsPerNode());
     int availableNodeCount = nodeList.size();
-    if ((numShards * repFactor) > (availableNodeCount * maxShardsPerNode)) {
+    if ((numShards * totalReplicasPerShard) > (availableNodeCount * maxShardsPerNode)) {
       throw new SolrException(ErrorCode.BAD_REQUEST,
           String.format(Locale.ROOT, "Solr cloud with available number of nodes:%d is insufficient for"
-              + " restoring a collection with %d shards, replication factor %d and maxShardsPerNode %d."
+              + " restoring a collection with %d shards, total replicas per shard %d and maxShardsPerNode %d."
               + " Consider increasing maxShardsPerNode value OR number of available nodes.",
-              availableNodeCount, numShards, repFactor, maxShardsPerNode));
+              availableNodeCount, numShards, totalReplicasPerShard, maxShardsPerNode));
     }
 
     //Upload the configs
@@ -201,7 +214,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
     restoreCollection.getSlices().forEach(x -> sliceNames.add(x.getName()));
 
     Map<ReplicaAssigner.Position, String> positionVsNodes = ocmh.identifyNodes(clusterState, nodeList,
-        message, sliceNames, repFactor);
+        message, sliceNames, numNrtReplicas, numTlogReplicas, numPullReplicas);
 
     //Create one replica per shard and copy backed up data to it
     for (Slice slice : restoreCollection.getSlices()) {
@@ -210,6 +223,15 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
       propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD);
       propMap.put(COLLECTION_PROP, restoreCollectionName);
       propMap.put(SHARD_ID_PROP, slice.getName());
+      
+      if (numNrtReplicas >= 1) {
+        propMap.put(REPLICA_TYPE, Replica.Type.NRT.name());
+      } else if (numTlogReplicas >= 1) {
+        propMap.put(REPLICA_TYPE, Replica.Type.TLOG.name());
+      } else {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unexpected number of replicas, replicationFactor, " + 
+            Replica.Type.NRT + " or " + Replica.Type.TLOG + " must be greater than 0");
+      }
 
       // Get the first node matching the shard to restore in
       String node;
@@ -261,17 +283,39 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
     //refresh the location copy of collection state
     restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
 
-    //Add the remaining replicas for each shard
-    Integer numReplicas = restoreCollection.getReplicationFactor();
-    if (numReplicas != null && numReplicas > 1) {
+    if (totalReplicasPerShard > 1) {
       log.info("Adding replicas to restored collection={}", restoreCollection);
-
       for (Slice slice : restoreCollection.getSlices()) {
-        for (int i = 1; i < numReplicas; i++) {
-          log.debug("Adding replica for shard={} collection={} ", slice.getName(), restoreCollection);
+
+        //Add the remaining replicas for each shard, considering it's type
+        int createdNrtReplicas = 0, createdTlogReplicas = 0, createdPullReplicas = 0;
+
+        // We already created either a NRT or an TLOG replica as leader
+        if (numNrtReplicas > 0) {
+          createdNrtReplicas++;
+        } else if (createdTlogReplicas > 0) {
+          createdTlogReplicas++;
+        }
+        
+        for (int i = 1; i < totalReplicasPerShard; i++) {
+          Replica.Type typeToCreate;
+          if (createdNrtReplicas < numNrtReplicas) {
+            createdNrtReplicas++;
+            typeToCreate = Replica.Type.NRT;
+          } else if (createdTlogReplicas < numTlogReplicas) {
+            createdTlogReplicas++;
+            typeToCreate = Replica.Type.TLOG;
+          } else {
+            createdPullReplicas++;
+            typeToCreate = Replica.Type.PULL;
+            assert createdPullReplicas <= numPullReplicas: "Unexpected number of replicas";
+          }
+
+          log.debug("Adding replica for shard={} collection={} of type {} ", slice.getName(), restoreCollection, typeToCreate);
           HashMap<String, Object> propMap = new HashMap<>();
           propMap.put(COLLECTION_PROP, restoreCollectionName);
           propMap.put(SHARD_ID_PROP, slice.getName());
+          propMap.put(REPLICA_TYPE, typeToCreate.name());
 
           // Get the first node matching the shard to restore in
           String node;
@@ -298,4 +342,9 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
 
     log.info("Completed restoring collection={} backupName={}", restoreCollection, backupName);
   }
+
+  private int getInt(ZkNodeProps message, String propertyName, Integer default1, int default2) {
+    Integer value = message.getInt(REPLICATION_FACTOR, default1);
+    return value!=null?value:default2;
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
index 5a099e1..fe95458 100644
--- a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
@@ -205,7 +205,7 @@ public class SplitShardCmd implements Cmd {
       for (int i = 0; i < subRanges.size(); i++) {
         String subSlice = slice + "_" + i;
         subSlices.add(subSlice);
-        String subShardName = collectionName + "_" + subSlice + "_replica1";
+        String subShardName = Assign.buildCoreName(collectionName, subSlice, Replica.Type.NRT, 1);
         subShardNames.add(subShardName);
       }
 
@@ -385,7 +385,7 @@ public class SplitShardCmd implements Cmd {
       Map<ReplicaAssigner.Position, String> nodeMap = ocmh.identifyNodes(clusterState,
           new ArrayList<>(clusterState.getLiveNodes()),
           new ZkNodeProps(collection.getProperties()),
-          subSlices, repFactor - 1);
+          subSlices, repFactor - 1, 0, 0);
 
       List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 5e00c32..cb8175e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -62,6 +62,7 @@ import org.apache.solr.common.cloud.DefaultZkCredentialsProvider;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.OnReconnect;
 import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Replica.Type;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkACLProvider;
@@ -883,12 +884,20 @@ public class ZkController {
       try {
         // If we're a preferred leader, insert ourselves at the head of the queue
         boolean joinAtHead = false;
-        Replica replica = zkStateReader.getClusterState().getReplica(desc.getCloudDescriptor().getCollectionName(),
-            coreZkNodeName);
+        Replica replica = zkStateReader.getClusterState().getReplica(collection, coreZkNodeName);
         if (replica != null) {
           joinAtHead = replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false);
         }
-        joinElection(desc, afterExpiration, joinAtHead);
+        //TODO WHy would replica be null?
+        if (replica == null || replica.getType() != Type.PULL) {
+          joinElection(desc, afterExpiration, joinAtHead);
+        } else if (replica.getType() == Type.PULL) {
+          if (joinAtHead) {
+            log.warn("Replica {} was designated as preferred leader but it's type is {}, It won't join election", coreZkNodeName, Type.PULL);
+          }
+          log.debug("Replica {} skipping election because it's type is {}", coreZkNodeName, Type.PULL);
+          startReplicationFromLeader(coreName, false);
+        }
       } catch (InterruptedException e) {
         // Restore the interrupted status
         Thread.currentThread().interrupt();
@@ -905,6 +914,8 @@ public class ZkController {
       String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
       log.debug("We are " + ourUrl + " and leader is " + leaderUrl);
       boolean isLeader = leaderUrl.equals(ourUrl);
+      Replica.Type replicaType =  zkStateReader.getClusterState().getCollection(collection).getReplica(coreZkNodeName).getType();
+      assert !(isLeader && replicaType == Type.PULL): "Pull replica became leader!";
       
       try (SolrCore core = cc.getCore(desc.getName())) {
         
@@ -915,16 +926,15 @@ public class ZkController {
         // leader election perhaps?
         
         UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
-        boolean onlyLeaderIndexes = zkStateReader.getClusterState().getCollection(collection).getRealtimeReplicas() == 1;
-        boolean isReplicaInOnlyLeaderIndexes = onlyLeaderIndexes && !isLeader;
-        if (isReplicaInOnlyLeaderIndexes) {
+        boolean isTlogReplicaAndNotLeader = replicaType == Replica.Type.TLOG && !isLeader;
+        if (isTlogReplicaAndNotLeader) {
           String commitVersion = ReplicateFromLeader.getCommitVersion(core);
           if (commitVersion != null) {
             ulog.copyOverOldUpdates(Long.parseLong(commitVersion));
           }
         }
         // we will call register again after zk expiration and on reload
-        if (!afterExpiration && !core.isReloaded() && ulog != null && !isReplicaInOnlyLeaderIndexes) {
+        if (!afterExpiration && !core.isReloaded() && ulog != null && !isTlogReplicaAndNotLeader) {
           // disable recovery in case shard is in construction state (for shard splits)
           Slice slice = getClusterState().getSlice(collection, shardId);
           if (slice.getState() != Slice.State.CONSTRUCTION || !isLeader) {
@@ -943,12 +953,13 @@ public class ZkController {
         boolean didRecovery
             = checkRecovery(recoverReloadedCores, isLeader, skipRecovery, collection, coreZkNodeName, core, cc, afterExpiration);
         if (!didRecovery) {
-          if (isReplicaInOnlyLeaderIndexes) {
-            startReplicationFromLeader(coreName);
+          if (isTlogReplicaAndNotLeader) {
+            startReplicationFromLeader(coreName, true);
           }
           publish(desc, Replica.State.ACTIVE);
         }
         
+        
         core.getCoreDescriptor().getCloudDescriptor().setHasRegistered(true);
       }
       
@@ -960,17 +971,25 @@ public class ZkController {
     }
   }
 
-  public void startReplicationFromLeader(String coreName) throws InterruptedException {
+  public void startReplicationFromLeader(String coreName, boolean switchTransactionLog) throws InterruptedException {
+    log.info("{} starting background replication from leader", coreName);
     ReplicateFromLeader replicateFromLeader = new ReplicateFromLeader(cc, coreName);
-    if (replicateFromLeaders.putIfAbsent(coreName, replicateFromLeader) == null) {
-      replicateFromLeader.startReplication();
+    synchronized (replicateFromLeader) { // synchronize to prevent any stop before we finish the start
+      if (replicateFromLeaders.putIfAbsent(coreName, replicateFromLeader) == null) {
+        replicateFromLeader.startReplication(switchTransactionLog);
+      } else {
+        log.warn("A replicate from leader instance already exists for core {}", coreName);
+      }
     }
   }
 
   public void stopReplicationFromLeader(String coreName) {
+    log.info("{} stopping background replication from leader", coreName);
     ReplicateFromLeader replicateFromLeader = replicateFromLeaders.remove(coreName);
     if (replicateFromLeader != null) {
-      replicateFromLeader.stopReplication();
+      synchronized (replicateFromLeader) {
+        replicateFromLeader.stopReplication();
+      }
     }
   }
 
@@ -1191,6 +1210,7 @@ public class ZkController {
       if (state != Replica.State.DOWN) {
         final Replica.State lirState = getLeaderInitiatedRecoveryState(collection, shardId, coreNodeName);
         if (lirState != null) {
+          assert cd.getCloudDescriptor().getReplicaType() != Replica.Type.PULL: "LIR should not happen for pull replicas!";
           if (state == Replica.State.ACTIVE) {
             // trying to become active, so leader-initiated state must be recovering
             if (lirState == Replica.State.RECOVERING) {
@@ -1217,6 +1237,7 @@ public class ZkController {
       props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
       props.put(ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId());
       props.put(ZkStateReader.COLLECTION_PROP, collection);
+      props.put(ZkStateReader.REPLICA_TYPE, cd.getCloudDescriptor().getReplicaType().toString());
       if (numShards != null) {
         props.put(ZkStateReader.NUM_SHARDS_PROP, numShards.toString());
       }
@@ -1272,13 +1293,15 @@ public class ZkController {
       assert false : "No collection was specified [" + collection + "]";
       return;
     }
+    Replica replica = zkStateReader.getClusterState().getReplica(collection, coreNodeName);
+    
+    if (replica == null || replica.getType() != Type.PULL) {
+      ElectionContext context = electionContexts.remove(new ContextKey(collection, coreNodeName));
 
-    ElectionContext context = electionContexts.remove(new ContextKey(collection, coreNodeName));
-
-    if (context != null) {
-      context.cancelElection();
+      if (context != null) {
+        context.cancelElection();
+      }
     }
-
     CloudDescriptor cloudDescriptor = cd.getCloudDescriptor();
     zkStateReader.unregisterCore(cloudDescriptor.getCollectionName());
 
@@ -2408,11 +2431,9 @@ public class ZkController {
 
       for (Slice slice : slices) {
         Collection<Replica> replicas = slice.getReplicas();
-        for (Replica replica : replicas) {
-          if (replica.getName().equals(
-              dcore.getCloudDescriptor().getCoreNodeName())) {
-            return true;
-          }
+        Replica r = slice.getReplica(dcore.getCloudDescriptor().getCoreNodeName());
+        if (r != null) {
+          return true;
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
index f03eeeb..9758c8f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
@@ -271,11 +271,12 @@ public class ReplicaMutator {
 
     replicaProps.putAll(message.getProperties());
     if (slice != null) {
-      Replica oldReplica = slice.getReplicasMap().get(coreNodeName);
+      Replica oldReplica = slice.getReplica(coreNodeName);
       if (oldReplica != null) {
         if (oldReplica.containsKey(ZkStateReader.LEADER_PROP)) {
           replicaProps.put(ZkStateReader.LEADER_PROP, oldReplica.get(ZkStateReader.LEADER_PROP));
         }
+        replicaProps.put(ZkStateReader.REPLICA_TYPE, oldReplica.getType().toString());
         // Move custom props over.
         for (Map.Entry<String, Object> ent : oldReplica.getProperties().entrySet()) {
           if (ent.getKey().startsWith(COLL_PROP_PREFIX)) {
@@ -311,6 +312,8 @@ public class ReplicaMutator {
 
 
     Replica replica = new Replica(coreNodeName, replicaProps);
+    
+    log.debug("Will update state for replica: {}", replica);
 
     Map<String, Object> sliceProps = null;
     Map<String, Replica> replicas;
@@ -328,11 +331,11 @@ public class ReplicaMutator {
       sliceProps.put(ZkStateReader.STATE_PROP, shardState);
       sliceProps.put(Slice.PARENT, shardParent);
     }
-
     replicas.put(replica.getName(), replica);
     slice = new Slice(sliceName, replicas, sliceProps);
 
     DocCollection newCollection = CollectionMutator.updateSlice(collectionName, collection, slice);
+    log.debug("Collection is now: {}", newCollection);
     return new ZkWriteCommand(collectionName, newCollection);
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
index 4d767ed..5724f17 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
@@ -16,14 +16,16 @@
  */
 package org.apache.solr.cloud.overseer;
 
-import java.lang.invoke.MethodHandles;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_PROP_PREFIX;
+import static org.apache.solr.cloud.overseer.CollectionMutator.checkCollectionKeyExistence;
+import static org.apache.solr.common.util.Utils.makeMap;
 
+import java.lang.invoke.MethodHandles;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
 
-import com.google.common.collect.ImmutableSet;
 import org.apache.solr.cloud.Assign;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.common.cloud.ClusterState;
@@ -37,9 +39,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_PROP_PREFIX;
-import static org.apache.solr.cloud.overseer.CollectionMutator.checkCollectionKeyExistence;
-import static org.apache.solr.common.util.Utils.makeMap;
+import com.google.common.collect.ImmutableSet;
 
 public class SliceMutator {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -66,14 +66,14 @@ public class SliceMutator {
       log.error("Invalid Collection/Slice {}/{} ", coll, slice);
       return ZkStateWriter.NO_OP;
     }
-
     String coreNodeName = Assign.assignNode(collection);
     Replica replica = new Replica(coreNodeName,
         makeMap(
             ZkStateReader.CORE_NAME_PROP, message.getStr(ZkStateReader.CORE_NAME_PROP),
             ZkStateReader.BASE_URL_PROP, message.getStr(ZkStateReader.BASE_URL_PROP),
             ZkStateReader.STATE_PROP, message.getStr(ZkStateReader.STATE_PROP),
-            ZkStateReader.NODE_NAME_PROP, message.getStr(ZkStateReader.NODE_NAME_PROP)));
+            ZkStateReader.NODE_NAME_PROP, message.getStr(ZkStateReader.NODE_NAME_PROP), 
+            ZkStateReader.REPLICA_TYPE, message.get(ZkStateReader.REPLICA_TYPE)));
     return new ZkWriteCommand(coll, updateReplica(collection, sl, replica.getName(), replica));
   }
 
@@ -249,13 +249,15 @@ public class SliceMutator {
   }
 
   public static DocCollection updateReplica(DocCollection collection, final Slice slice, String coreNodeName, final Replica replica) {
-    Map<String, Replica> copy = slice.getReplicasCopy();
+    Map<String, Replica> replicasCopy = slice.getReplicasCopy();
     if (replica == null) {
-      copy.remove(coreNodeName);
+      replicasCopy.remove(coreNodeName);
     } else {
-      copy.put(replica.getName(), replica);
+      replicasCopy.put(replica.getName(), replica);
     }
-    Slice newSlice = new Slice(slice.getName(), copy, slice.getProperties());
+    Slice newSlice = new Slice(slice.getName(), replicasCopy, slice.getProperties());
+    log.debug("Old Slice: {}", slice);
+    log.debug("New Slice: {}", newSlice);
     return CollectionMutator.updateSlice(collection.getName(), collection, newSlice);
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index da0f57c..911a9e3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -261,6 +261,7 @@ public class ZkStateWriter {
       }
     }
 
+    log.trace("New Cluster State is: {}", clusterState);
     return clusterState;
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java b/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java
index 3eab8b4..506e158 100644
--- a/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java
+++ b/solr/core/src/java/org/apache/solr/cloud/rule/ReplicaAssigner.java
@@ -62,10 +62,12 @@ public class ReplicaAssigner {
   public static class Position implements Comparable<Position> {
     public final String shard;
     public final int index;
+    public final Replica.Type type;
 
-    public Position(String shard, int replicaIdx) {
+    public Position(String shard, int replicaIdx, Replica.Type type) {
       this.shard = shard;
       this.index = replicaIdx;
+      this.type = type;
     }
 
     @Override
@@ -188,7 +190,7 @@ public class ReplicaAssigner {
       List<Position> positions = new ArrayList<>();
       for (int pos : p) {
         for (int j = 0; j < shardVsReplicaCount.get(shardNames.get(pos)); j++) {
-          positions.add(new Position(shardNames.get(pos), j));
+          positions.add(new Position(shardNames.get(pos), j, Replica.Type.NRT));
         }
       }
       Collections.sort(positions);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/2fc41d56/solr/core/src/java/org/apache/solr/core/CoreContainer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 3ff5135..7471c08 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -16,6 +16,17 @@
  */
 package org.apache.solr.core;
 
+import static java.util.Objects.requireNonNull;
+import static org.apache.solr.common.params.CommonParams.AUTHC_PATH;
+import static org.apache.solr.common.params.CommonParams.AUTHZ_PATH;
+import static org.apache.solr.common.params.CommonParams.COLLECTIONS_HANDLER_PATH;
+import static org.apache.solr.common.params.CommonParams.CONFIGSETS_HANDLER_PATH;
+import static org.apache.solr.common.params.CommonParams.CORES_HANDLER_PATH;
+import static org.apache.solr.common.params.CommonParams.INFO_HANDLER_PATH;
+import static org.apache.solr.common.params.CommonParams.METRICS_PATH;
+import static org.apache.solr.common.params.CommonParams.ZK_PATH;
+import static org.apache.solr.security.AuthenticationPlugin.AUTHENTICATION_PLUGIN_PROP;
+
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.nio.file.Path;
@@ -56,6 +67,7 @@ import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Replica.State;
+import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.common.util.Utils;
@@ -95,17 +107,7 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static java.util.Objects.requireNonNull;
-import static org.apache.solr.common.params.CommonParams.AUTHC_PATH;
-import static org.apache.solr.common.params.CommonParams.AUTHZ_PATH;
-import static org.apache.solr.common.params.CommonParams.COLLECTIONS_HANDLER_PATH;
-import static org.apache.solr.common.params.CommonParams.CONFIGSETS_HANDLER_PATH;
-import static org.apache.solr.common.params.CommonParams.CORES_HANDLER_PATH;
-import static org.apache.solr.common.params.CommonParams.INFO_HANDLER_PATH;
-import static org.apache.solr.common.params.CommonParams.METRICS_PATH;
-import static org.apache.solr.common.params.CommonParams.ZK_PATH;
 import static org.apache.solr.core.CorePropertiesLocator.PROPERTIES_FILENAME;
-import static org.apache.solr.security.AuthenticationPlugin.AUTHENTICATION_PLUGIN_PROP;
 
 /**
  *
@@ -1185,10 +1187,15 @@ public class CoreContainer {
         SolrCore newCore = core.reload(coreConfig);
         registerCore(cd, newCore, false, false);
         if (getZkController() != null) {
-          boolean onlyLeaderIndexes = getZkController().getClusterState().getCollection(cd.getCollectionName()).getRealtimeReplicas() == 1;
-          if (onlyLeaderIndexes && !cd.getCloudDescriptor().isLeader()) {
+          DocCollection docCollection = getZkController().getClusterState().getCollection(cd.getCollectionName());
+          Replica replica = docCollection.getReplica(cd.getCloudDescriptor().getCoreNodeName());
+          assert replica != null;
+          if (replica.getType() == Replica.Type.TLOG) { //TODO: needed here?
             getZkController().stopReplicationFromLeader(core.getName());
-            getZkController().startReplicationFromLeader(newCore.getName());
+            if (!cd.getCloudDescriptor().isLeader()) {
+              getZkController().startReplicationFromLeader(newCore.getName(), true);
+            }
+            
           }
         }
       } catch (SolrCoreState.CoreIsClosedException e) {
@@ -1274,6 +1281,11 @@ public class CoreContainer {
     if (zkSys.getZkController() != null) {
       // cancel recovery in cloud mode
       core.getSolrCoreState().cancelRecovery();
+      if (core.getCoreDescriptor().getCloudDescriptor().getReplicaType() == Replica.Type.PULL
+          || core.getCoreDescriptor().getCloudDescriptor().getReplicaType() == Replica.Type.TLOG) { 
+        // Stop replication if this is part of a pull/tlog replica before closing the core
+        zkSys.getZkController().stopReplicationFromLeader(name);
+      }
     }
     
     core.unloadOnClose(cd, deleteIndexDir, deleteDataDir, deleteInstanceDir);