You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2017/07/28 11:00:14 UTC

[2/2] lucene-solr:feature/autoscaling: SOLR-10397: Remove old implementation of autoAddReplicas features

SOLR-10397: Remove old implementation of autoAddReplicas features


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/0f7e3be5
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/0f7e3be5
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/0f7e3be5

Branch: refs/heads/feature/autoscaling
Commit: 0f7e3be589d7c9c663beed57cb4c6230a2a5b50e
Parents: b537361
Author: Cao Manh Dat <da...@apache.org>
Authored: Fri Jul 28 18:00:03 2017 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Fri Jul 28 18:00:03 2017 +0700

----------------------------------------------------------------------
 .../org/apache/solr/cloud/AddReplicaCmd.java    |   3 +
 .../java/org/apache/solr/cloud/CloudUtil.java   |   7 +-
 .../apache/solr/cloud/CreateCollectionCmd.java  |   5 -
 .../java/org/apache/solr/cloud/Overseer.java    |  66 ++-
 .../OverseerAutoReplicaFailoverThread.java      | 531 -------------------
 .../cloud/OverseerCollectionMessageHandler.java |  24 -
 .../org/apache/solr/cloud/ZkController.java     |   1 +
 .../autoscaling/AutoAddReplicasPlanAction.java  |  30 +-
 .../solr/cloud/autoscaling/AutoScaling.java     |   2 +-
 .../solr/cloud/overseer/ReplicaMutator.java     |   1 +
 .../java/org/apache/solr/core/CloudConfig.java  |  37 +-
 .../org/apache/solr/core/CoreContainer.java     |  18 +-
 .../org/apache/solr/core/SolrXmlConfig.java     |   8 +-
 .../apache/solr/cloud/ClusterStateMockUtil.java |  48 +-
 .../solr/cloud/MoveReplicaHDFSFailoverTest.java | 207 ++++++++
 .../solr/cloud/MoveReplicaHDFSUlogDirTest.java  | 142 -----
 .../org/apache/solr/cloud/NodeMutatorTest.java  |  15 +-
 .../cloud/SharedFSAutoReplicaFailoverTest.java  | 196 ++++---
 .../SharedFSAutoReplicaFailoverUtilsTest.java   | 191 -------
 .../AutoAddReplicasIntegrationTest.java         | 180 +++++++
 .../AutoAddReplicasPlanActionTest.java          |  21 +
 .../HdfsAutoAddReplicasIntegrationTest.java     |  58 ++
 .../solrj/cloud/autoscaling/NoneSuggester.java  |  32 ++
 .../solr/common/cloud/ClusterStateUtil.java     |  13 +-
 .../solrj/cloud/autoscaling/TestPolicy.java     | 129 +++++
 25 files changed, 832 insertions(+), 1133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
index c325f07..c7889c9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
@@ -191,6 +191,9 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
     if (instanceDir != null) {
       params.set(CoreAdminParams.INSTANCE_DIR, instanceDir);
     }
+    if (coreNodeName != null) {
+      params.set(CoreAdminParams.CORE_NODE_NAME, coreNodeName);
+    }
     ocmh.addPropertyParams(message, params);
 
     // For tracking async calls.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java b/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
index c05072d..9167b81 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
@@ -31,7 +31,6 @@ import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.SolrResourceLoader;
@@ -49,7 +48,8 @@ public class CloudUtil {
    * + throw exception if it has been.
    */
   public static void checkSharedFSFailoverReplaced(CoreContainer cc, CoreDescriptor desc) {
-    
+    if (!cc.isSharedFs(desc)) return;
+
     ZkController zkController = cc.getZkController();
     String thisCnn = zkController.getCoreNodeName(desc);
     String thisBaseUrl = zkController.getBaseUrl();
@@ -65,11 +65,10 @@ public class CloudUtil {
           
           String cnn = replica.getName();
           String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
-          boolean isSharedFs = replica.getStr(CoreAdminParams.DATA_DIR) != null;
           log.debug("compare against coreNodeName={} baseUrl={}", cnn, baseUrl);
           
           if (thisCnn != null && thisCnn.equals(cnn)
-              && !thisBaseUrl.equals(baseUrl) && isSharedFs) {
+              && !thisBaseUrl.equals(baseUrl)) {
             if (cc.getLoadedCoreNames().contains(desc.getName())) {
               cc.unload(desc.getName());
             }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
index 8b15144..e35369b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
@@ -31,7 +31,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
 import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
-import org.apache.solr.cloud.autoscaling.AutoScaling;
 import org.apache.solr.cloud.autoscaling.AutoScalingHandler;
 import org.apache.solr.cloud.overseer.ClusterStateMutator;
 import org.apache.solr.common.cloud.ReplicaPosition;
@@ -110,7 +109,6 @@ public class CreateCollectionCmd implements Cmd {
       int numTlogReplicas = message.getInt(TLOG_REPLICAS, 0);
       int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, numTlogReplicas>0?0:1));
       int numPullReplicas = message.getInt(PULL_REPLICAS, 0);
-      boolean autoAddReplicas = message.getBool(AUTO_ADD_REPLICAS, false);
       Map autoScalingJson = Utils.getJson(ocmh.zkStateReader.getZkClient(), SOLR_AUTOSCALING_CONF_PATH, true);
       String policy = message.getStr(Policy.POLICY);
       boolean usePolicyFramework = autoScalingJson.get(Policy.CLUSTER_POLICY) != null || policy != null;
@@ -318,9 +316,6 @@ public class CreateCollectionCmd implements Cmd {
         ocmh.cleanupCollection(collectionName, new NamedList());
         log.info("Cleaned up artifacts for failed create collection for [{}]", collectionName);
       } else {
-        if (autoAddReplicas) {
-          ocmh.forwardToAutoScaling(AutoScaling.AUTO_ADD_REPLICAS_TRIGGER_DSL);
-        }
         log.debug("Finished create command on all shards for collection: {}", collectionName);
 
         // Emit a warning about production use of data driven functionality

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/java/org/apache/solr/cloud/Overseer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index d4f914d..ca7a935 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -30,6 +30,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import com.codahale.metrics.Timer;
 import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.cloud.autoscaling.AutoScaling;
+import org.apache.solr.cloud.autoscaling.AutoScalingHandler;
 import org.apache.solr.cloud.autoscaling.OverseerTriggerThread;
 import org.apache.solr.cloud.overseer.ClusterStateMutator;
 import org.apache.solr.cloud.overseer.CollectionMutator;
@@ -39,17 +41,22 @@ import org.apache.solr.cloud.overseer.ReplicaMutator;
 import org.apache.solr.cloud.overseer.SliceMutator;
 import org.apache.solr.cloud.overseer.ZkStateWriter;
 import org.apache.solr.cloud.overseer.ZkWriteCommand;
+import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.ContentStreamBase;
 import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.CloudConfig;
 import org.apache.solr.handler.admin.CollectionsHandler;
 import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
 import org.apache.solr.update.UpdateShardHandler;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -470,8 +477,6 @@ public class Overseer implements Closeable {
   private OverseerThread ccThread;
 
   private OverseerThread updaterThread;
-  
-  private OverseerThread arfoThread;
 
   private OverseerThread triggerThread;
 
@@ -524,12 +529,11 @@ public class Overseer implements Closeable {
     overseerCollectionConfigSetProcessor = new OverseerCollectionConfigSetProcessor(reader, id, shardHandler, adminPath, stats, Overseer.this, overseerPrioritizer);
     ccThread = new OverseerThread(ccTg, overseerCollectionConfigSetProcessor, "OverseerCollectionConfigSetProcessor-" + id);
     ccThread.setDaemon(true);
-    
-    ThreadGroup ohcfTg = new ThreadGroup("Overseer Hdfs SolrCore Failover Thread.");
 
-    OverseerAutoReplicaFailoverThread autoReplicaFailoverThread = new OverseerAutoReplicaFailoverThread(config, reader, updateShardHandler);
-    arfoThread = new OverseerThread(ohcfTg, autoReplicaFailoverThread, "OverseerHdfsCoreFailoverThread-" + id);
-    arfoThread.setDaemon(true);
+    //TODO nocommit, autoscaling framework should start autoAddReplicas trigger automatically (implicitly)
+    Thread autoscalingTriggerCreator = new Thread(createAutoscalingTriggerIfNotExist(), "AutoscalingTriggerCreator");
+    autoscalingTriggerCreator.setDaemon(true);
+    autoscalingTriggerCreator.start();
 
     ThreadGroup triggerThreadGroup = new ThreadGroup("Overseer autoscaling triggers");
     OverseerTriggerThread trigger = new OverseerTriggerThread(zkController);
@@ -537,7 +541,6 @@ public class Overseer implements Closeable {
 
     updaterThread.start();
     ccThread.start();
-    arfoThread.start();
     triggerThread.start();
     assert ObjectReleaseTracker.track(this);
   }
@@ -569,6 +572,43 @@ public class Overseer implements Closeable {
     assert ObjectReleaseTracker.release(this);
   }
 
+  private Runnable createAutoscalingTriggerIfNotExist() {
+    return new Runnable() {
+      @Override
+      public void run() {
+        try {
+          boolean triggerExist = getZkStateReader().getAutoScalingConfig()
+              .getTriggerConfigs().get(".auto_add_replicas") != null;
+          if (triggerExist) return;
+        } catch (InterruptedException | KeeperException e) {
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+              "Failed when creating .auto_add_replicas trigger");
+        }
+        while (getZkController().getCoreContainer()
+            .getRequestHandler(AutoScalingHandler.HANDLER_PATH) == null) {
+          try {
+            Thread.sleep(500);
+          } catch (InterruptedException e) {
+            // expected
+          }
+        }
+
+        String dsl = AutoScaling.AUTO_ADD_REPLICAS_TRIGGER_DSL.replace("{{waitFor}}",
+            String.valueOf(config.getAutoReplicaFailoverWaitAfterExpiration()/1000));
+        LocalSolrQueryRequest request = new LocalSolrQueryRequest(null, new ModifiableSolrParams());
+        request.getContext().put("httpMethod", "POST");
+        request.setContentStreams(Collections.singleton(new ContentStreamBase.StringStream(dsl)));
+        SolrQueryResponse response = new SolrQueryResponse();
+        getZkController().getCoreContainer()
+            .getRequestHandler(AutoScalingHandler.HANDLER_PATH).handleRequest(request, response);
+        if (!"success".equals(response.getValues().get("result"))) {
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+              "Failed when creating .auto_add_replicas trigger, return " + response);
+        }
+      }
+    };
+  }
+
   private void doClose() {
     
     if (updaterThread != null) {
@@ -579,10 +619,6 @@ public class Overseer implements Closeable {
       IOUtils.closeQuietly(ccThread);
       ccThread.interrupt();
     }
-    if (arfoThread != null) {
-      IOUtils.closeQuietly(arfoThread);
-      arfoThread.interrupt();
-    }
     if (triggerThread != null)  {
       IOUtils.closeQuietly(triggerThread);
       triggerThread.interrupt();
@@ -598,11 +634,6 @@ public class Overseer implements Closeable {
         ccThread.join();
       } catch (InterruptedException e) {}
     }
-    if (arfoThread != null) {
-      try {
-        arfoThread.join();
-      } catch (InterruptedException e) {}
-    }
     if (triggerThread != null)  {
       try {
         triggerThread.join();
@@ -611,7 +642,6 @@ public class Overseer implements Closeable {
     
     updaterThread = null;
     ccThread = null;
-    arfoThread = null;
     triggerThread = null;
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/java/org/apache/solr/cloud/OverseerAutoReplicaFailoverThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerAutoReplicaFailoverThread.java b/solr/core/src/java/org/apache/solr/cloud/OverseerAutoReplicaFailoverThread.java
deleted file mode 100644
index 2eccef8..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerAutoReplicaFailoverThread.java
+++ /dev/null
@@ -1,531 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud;
-
-import java.io.Closeable;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.client.solrj.request.CoreAdminRequest.Create;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.ClusterStateUtil;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.core.CloudConfig;
-import org.apache.solr.update.UpdateShardHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-
-
-// TODO: how to tmp exclude nodes?
-
-// TODO: more fine grained failover rules?
-
-// TODO: test with lots of collections
-
-// TODO: add config for only failover if replicas is < N
-
-// TODO: general support for non shared filesystems
-// this is specialized for a shared file system, but it should
-// not be much work to generalize
-
-// NOTE: using replication can slow down failover if a whole
-// shard is lost.
-
-/**
- *
- * In this simple initial implementation we are limited in how quickly we detect
- * a failure by a worst case of roughly zk session timeout + WAIT_AFTER_EXPIRATION_SECONDS + WORK_LOOP_DELAY_MS
- * and best case of roughly zk session timeout + WAIT_AFTER_EXPIRATION_SECONDS. Also, consider the time to
- * create the SolrCore, do any recovery necessary, and warm up the readers.
- * 
- * NOTE: this will only work with collections created via the collections api because they will have defined
- * replicationFactor and maxShardsPerNode.
- * 
- * @lucene.experimental
- */
-public class OverseerAutoReplicaFailoverThread implements Runnable, Closeable {
-  
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  private Integer lastClusterStateVersion;
-  
-  private final ExecutorService updateExecutor;
-  private volatile boolean isClosed;
-  private ZkStateReader zkStateReader;
-  private final Cache<String,Long> baseUrlForBadNodes;
-  private Set<String> liveNodes = Collections.EMPTY_SET;
-
-  private final int workLoopDelay;
-  private final int waitAfterExpiration;
-
-  private volatile Thread thread;
-  
-  public OverseerAutoReplicaFailoverThread(CloudConfig config, ZkStateReader zkStateReader,
-      UpdateShardHandler updateShardHandler) {
-    this.zkStateReader = zkStateReader;
-    
-    this.workLoopDelay = config.getAutoReplicaFailoverWorkLoopDelay();
-    this.waitAfterExpiration = config.getAutoReplicaFailoverWaitAfterExpiration();
-    int badNodeExpiration = config.getAutoReplicaFailoverBadNodeExpiration();
-    
-    log.debug(
-        "Starting "
-            + this.getClass().getSimpleName()
-            + " autoReplicaFailoverWorkLoopDelay={} autoReplicaFailoverWaitAfterExpiration={} autoReplicaFailoverBadNodeExpiration={}",
-        workLoopDelay, waitAfterExpiration, badNodeExpiration);
-
-    baseUrlForBadNodes = CacheBuilder.newBuilder()
-        .concurrencyLevel(1).expireAfterWrite(badNodeExpiration, TimeUnit.MILLISECONDS).build();
-    
-    // TODO: Speed up our work loop when live_nodes changes??
-
-    updateExecutor = updateShardHandler.getUpdateExecutor();
-
-    
-    // TODO: perhaps do a health ping periodically to each node (scaryish)
-    // And/OR work on JIRA issue around self health checks (SOLR-5805)
-  }
-  
-  @Override
-  public void run() {
-    this.thread = Thread.currentThread();
-    while (!this.isClosed) {
-      // work loop
-      log.debug("do " + this.getClass().getSimpleName() + " work loop");
-
-      // every n, look at state and make add / remove calls
-
-      try {
-        doWork();
-      } catch (Exception e) {
-        SolrException.log(log, this.getClass().getSimpleName()
-            + " had an error in its thread work loop.", e);
-      }
-      
-      if (!this.isClosed) {
-        try {
-          Thread.sleep(workLoopDelay);
-        } catch (InterruptedException e) {
-          return;
-        }
-      }
-    }
-  }
-  
-  private void doWork() {
-    
-    // TODO: extract to configurable strategy class ??
-    ClusterState clusterState = zkStateReader.getClusterState();
-    //check if we have disabled autoAddReplicas cluster wide
-    String autoAddReplicas = zkStateReader.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, (String) null);
-    if (autoAddReplicas != null && autoAddReplicas.equals("false")) {
-      return;
-    }
-    if (clusterState != null) {
-      if (clusterState.getZkClusterStateVersion() != null &&
-          clusterState.getZkClusterStateVersion().equals(lastClusterStateVersion) && baseUrlForBadNodes.size() == 0 &&
-          liveNodes.equals(clusterState.getLiveNodes())) {
-        // nothing has changed, no work to do
-        return;
-      }
-
-      liveNodes = clusterState.getLiveNodes();
-      lastClusterStateVersion = clusterState.getZkClusterStateVersion();
-      Map<String, DocCollection> collections = clusterState.getCollectionsMap();
-      for (Map.Entry<String, DocCollection> entry : collections.entrySet()) {
-        log.debug("look at collection={}", entry.getKey());
-        DocCollection docCollection = entry.getValue();
-        if (!docCollection.getAutoAddReplicas()) {
-          log.debug("Collection {} is not setup to use autoAddReplicas, skipping..", docCollection.getName());
-          continue;
-        }
-        if (docCollection.getReplicationFactor() == null) {
-          log.debug("Skipping collection because it has no defined replicationFactor, name={}", docCollection.getName());
-          continue;
-        }
-        log.debug("Found collection, name={} replicationFactor={}", entry.getKey(), docCollection.getReplicationFactor());
-        
-        Collection<Slice> slices = docCollection.getSlices();
-        for (Slice slice : slices) {
-          if (slice.getState() == Slice.State.ACTIVE) {
-            
-            final Collection<DownReplica> downReplicas = new ArrayList<DownReplica>();
-            
-            int goodReplicas = findDownReplicasInSlice(clusterState, docCollection, slice, downReplicas);
-            
-            log.debug("collection={} replicationFactor={} goodReplicaCount={}", docCollection.getName(), docCollection.getReplicationFactor(), goodReplicas);
-            
-            if (downReplicas.size() > 0 && goodReplicas < docCollection.getReplicationFactor()) {
-              // badReplicaMap.put(collection, badReplicas);
-              processBadReplicas(entry.getKey(), downReplicas);
-            } else if (goodReplicas > docCollection.getReplicationFactor()) {
-              log.debug("There are too many replicas");
-            }
-          }
-        }
-      }
-     
-    }
-  }
-
-  private void processBadReplicas(final String collection, final Collection<DownReplica> badReplicas) {
-    for (DownReplica badReplica : badReplicas) {
-      log.debug("process down replica={} from collection={}", badReplica.replica.getName(), collection);
-      String baseUrl = badReplica.replica.getStr(ZkStateReader.BASE_URL_PROP);
-      Long wentBadAtNS = baseUrlForBadNodes.getIfPresent(baseUrl);
-      if (wentBadAtNS == null) {
-        log.warn("Replica {} may need to failover.",
-            badReplica.replica.getName());
-        baseUrlForBadNodes.put(baseUrl, System.nanoTime());
-        
-      } else {
-        
-        long elasped = System.nanoTime() - wentBadAtNS;
-        if (elasped < TimeUnit.NANOSECONDS.convert(waitAfterExpiration, TimeUnit.MILLISECONDS)) {
-          // protect against ZK 'flapping', startup and shutdown
-          log.debug("Looks troublesome...continue. Elapsed={}", elasped + "ns");
-        } else {
-          log.debug("We need to add a replica. Elapsed={}", elasped + "ns");
-          
-          if (addReplica(collection, badReplica)) {
-            baseUrlForBadNodes.invalidate(baseUrl);
-          }
-        }
-      }
-    }
-  }
-
-  private boolean addReplica(final String collection, DownReplica badReplica) {
-    // first find best home - first strategy, sort by number of cores
-    // hosted where maxCoresPerNode is not violated
-    final Integer maxCoreCount = zkStateReader.getClusterProperty(ZkStateReader.MAX_CORES_PER_NODE, (Integer) null);
-    final String createUrl = getBestCreateUrl(zkStateReader, badReplica, maxCoreCount);
-    if (createUrl == null) {
-      log.warn("Could not find a node to create new replica on.");
-      return false;
-    }
-    
-    // NOTE: we send the absolute path, which will slightly change
-    // behavior of these cores as they won't respond to changes
-    // in the solr.hdfs.home sys prop as they would have.
-    final String dataDir = badReplica.replica.getStr("dataDir");
-    final String ulogDir = badReplica.replica.getStr("ulogDir");
-    final String coreNodeName = badReplica.replica.getName();
-    final String shardId = badReplica.slice.getName();
-    if (dataDir != null) {
-      // need an async request - full shard goes down leader election
-      final String coreName = badReplica.replica.getStr(ZkStateReader.CORE_NAME_PROP);
-      log.debug("submit call to {}", createUrl);
-      MDC.put("OverseerAutoReplicaFailoverThread.createUrl", createUrl);
-      try {
-        updateExecutor.submit(() -> createSolrCore(collection, createUrl, dataDir, ulogDir, coreNodeName, coreName, shardId));
-      } finally {
-        MDC.remove("OverseerAutoReplicaFailoverThread.createUrl");
-      }
-
-      // wait to see state for core we just created
-      boolean success = ClusterStateUtil.waitToSeeLiveReplica(zkStateReader, collection, coreNodeName, createUrl, 30000);
-      if (!success) {
-        log.error("Creating new replica appears to have failed, timed out waiting to see created SolrCore register in the clusterstate.");
-        return false;
-      }
-      return true;
-    }
-    
-    log.warn("Could not find dataDir or ulogDir in cluster state.");
-    
-    return false;
-  }
-
-  private static int findDownReplicasInSlice(ClusterState clusterState, DocCollection collection, Slice slice, final Collection<DownReplica> badReplicas) {
-    int goodReplicas = 0;
-    Collection<Replica> replicas = slice.getReplicas();
-    if (replicas != null) {
-      for (Replica replica : replicas) {
-        // on a live node?
-        boolean live = clusterState.liveNodesContain(replica.getNodeName());
-        final Replica.State state = replica.getState();
-        
-        final boolean okayState = state == Replica.State.DOWN
-            || state == Replica.State.RECOVERING
-            || state == Replica.State.ACTIVE;
-        
-        log.debug("Process replica name={} live={} state={}", replica.getName(), live, state.toString());
-        
-        if (live && okayState) {
-          goodReplicas++;
-        } else {
-          DownReplica badReplica = new DownReplica();
-          badReplica.replica = replica;
-          badReplica.slice = slice;
-          badReplica.collection = collection;
-          badReplicas.add(badReplica);
-        }
-      }
-    }
-    log.debug("bad replicas for slice {}", badReplicas);
-    return goodReplicas;
-  }
-  
-  /**
-   * 
-   * @return the best node to replace the badReplica on or null if there is no
-   *         such node
-   */
-  static String getBestCreateUrl(ZkStateReader zkStateReader, DownReplica badReplica, Integer maxCoreCount) {
-    assert badReplica != null;
-    assert badReplica.collection != null;
-    assert badReplica.slice != null;
-    log.debug("getBestCreateUrl for " + badReplica.replica);
-    Map<String,Counts> counts = new HashMap<>();
-    Set<String> unsuitableHosts = new HashSet<>();
-    
-    Set<String> liveNodes = new HashSet<>(zkStateReader.getClusterState().getLiveNodes());
-    Map<String, Integer> coresPerNode = new HashMap<>();
-    
-    ClusterState clusterState = zkStateReader.getClusterState();
-    if (clusterState != null) {
-      Map<String, DocCollection> collections = clusterState.getCollectionsMap();
-      for (Map.Entry<String, DocCollection> entry : collections.entrySet()) {
-        String collection = entry.getKey();
-        log.debug("look at collection {} as possible create candidate", collection);
-        DocCollection docCollection = entry.getValue();
-        // TODO - only operate on collections with sharedfs failover = true ??
-        Collection<Slice> slices = docCollection.getSlices();
-        for (Slice slice : slices) {
-          // only look at active shards
-          if (slice.getState() == Slice.State.ACTIVE) {
-            log.debug("look at slice {} for collection {} as possible create candidate", slice.getName(), collection); 
-            Collection<Replica> replicas = slice.getReplicas();
-
-            for (Replica replica : replicas) {
-              liveNodes.remove(replica.getNodeName());
-              String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
-              if (coresPerNode.containsKey(baseUrl)) {
-                Integer nodeCount = coresPerNode.get(baseUrl);
-                coresPerNode.put(baseUrl, nodeCount++);
-              } else {
-                coresPerNode.put(baseUrl, 1);
-              }
-              if (baseUrl.equals(badReplica.replica.getStr(ZkStateReader.BASE_URL_PROP))) {
-                continue;
-              }
-              // on a live node?
-              log.debug("collection={} nodename={} livenodes={}", collection, replica.getNodeName(), clusterState.getLiveNodes());
-              boolean live = clusterState.liveNodesContain(replica.getNodeName());
-              log.debug("collection={} look at replica {} as possible create candidate, live={}", collection, replica.getName(), live); 
-              if (live) {
-                Counts cnt = counts.get(baseUrl);
-                if (cnt == null) {
-                  cnt = new Counts();
-                }
-                if (badReplica.collection.getName().equals(collection)) {
-                  cnt.negRankingWeight += 3;
-                  cnt.collectionShardsOnNode += 1;
-                } else {
-                  cnt.negRankingWeight += 1;
-                }
-                if (badReplica.collection.getName().equals(collection) && badReplica.slice.getName().equals(slice.getName())) {
-                  cnt.ourReplicas++;
-                }
-
-                Integer maxShardsPerNode = badReplica.collection.getMaxShardsPerNode();
-                if (maxShardsPerNode == null) {
-                  log.warn("maxShardsPerNode is not defined for collection, name=" + badReplica.collection.getName());
-                  maxShardsPerNode = Integer.MAX_VALUE;
-                }
-                log.debug("collection={} node={} maxShardsPerNode={} maxCoresPerNode={} potential hosts={}",
-                    collection, baseUrl, maxShardsPerNode, maxCoreCount, cnt);
-
-                Collection<Replica> badSliceReplicas = null;
-                DocCollection c = clusterState.getCollection(badReplica.collection.getName());
-                if (c != null) {
-                  Slice s = c.getSlice(badReplica.slice.getName());
-                  if (s != null) {
-                    badSliceReplicas = s.getReplicas();
-                  }
-                }
-                boolean alreadyExistsOnNode = replicaAlreadyExistsOnNode(zkStateReader.getClusterState(), badSliceReplicas, badReplica, baseUrl);
-                if (unsuitableHosts.contains(baseUrl) || alreadyExistsOnNode || cnt.collectionShardsOnNode >= maxShardsPerNode
-                    || (maxCoreCount != null && coresPerNode.get(baseUrl) >= maxCoreCount) ) {
-                  counts.remove(baseUrl);
-                  unsuitableHosts.add(baseUrl);
-                  log.debug("not a candidate node, collection={} node={} max shards per node={} good replicas={}", collection, baseUrl, maxShardsPerNode, cnt);
-                } else {
-                  counts.put(baseUrl, cnt);
-                  log.debug("is a candidate node, collection={} node={} max shards per node={} good replicas={}", collection, baseUrl, maxShardsPerNode, cnt);
-                }
-              }
-            }
-          }
-        }
-      }
-    }
-    
-    for (String node : liveNodes) {
-      counts.put(zkStateReader.getBaseUrlForNodeName(node), new Counts(0, 0));
-    }
-    
-    if (counts.size() == 0) {
-      log.debug("no suitable hosts found for getBestCreateUrl for collection={}", badReplica.collection.getName());
-      return null;
-    }
-    
-    ValueComparator vc = new ValueComparator(counts);
-    Map<String,Counts> sortedCounts = new TreeMap<String, Counts>(vc);
-    sortedCounts.putAll(counts);
-    
-    log.debug("empty nodes={} for collection={}", liveNodes, badReplica.collection.getName());
-    log.debug("sorted hosts={} for collection={}", sortedCounts, badReplica.collection.getName());
-    log.debug("unsuitable hosts={} for collection={}", unsuitableHosts, badReplica.collection.getName());
-    
-    return sortedCounts.keySet().iterator().next();
-  }
-  
-  private static boolean replicaAlreadyExistsOnNode(ClusterState clusterState, Collection<Replica> replicas, DownReplica badReplica, String baseUrl) {
-    if (replicas != null) {
-      log.debug("collection={} check if replica already exists on node using replicas {}", badReplica.collection.getName(), getNames(replicas));
-      for (Replica replica : replicas) {
-        final Replica.State state = replica.getState();
-        if (!replica.getName().equals(badReplica.replica.getName()) && replica.getStr(ZkStateReader.BASE_URL_PROP).equals(baseUrl)
-            && clusterState.liveNodesContain(replica.getNodeName())
-            && (state == Replica.State.ACTIVE || state == Replica.State.DOWN || state == Replica.State.RECOVERING)) {
-          log.debug("collection={} replica already exists on node, bad replica={}, existing replica={}, node name={}",  badReplica.collection.getName(), badReplica.replica.getName(), replica.getName(), replica.getNodeName());
-          return true;
-        }
-      }
-    }
-    log.debug("collection={} replica does not yet exist on node: {}",  badReplica.collection.getName(), baseUrl);
-    return false;
-  }
-  
-  private static Object getNames(Collection<Replica> replicas) {
-    Set<String> names = new HashSet<>(replicas.size());
-    for (Replica replica : replicas) {
-      names.add(replica.getName());
-    }
-    return names;
-  }
-
-  private boolean createSolrCore(final String collection,
-      final String createUrl, final String dataDir, final String ulogDir,
-      final String coreNodeName, final String coreName, final String shardId) {
-
-    try (HttpSolrClient client = new HttpSolrClient.Builder(createUrl)
-        .withConnectionTimeout(30000)
-        .withSocketTimeout(60000)
-        .build()) {
-      log.debug("create url={}", createUrl);
-      Create createCmd = new Create();
-      createCmd.setCollection(collection);
-      createCmd.setCoreNodeName(coreNodeName);
-      // TODO: how do we ensure unique coreName
-      // for now, the collections API will use unique names
-      createCmd.setShardId(shardId);
-      createCmd.setCoreName(coreName);
-      createCmd.setDataDir(dataDir);
-      createCmd.setUlogDir(ulogDir.substring(0, ulogDir.length() - "/tlog".length()));
-      client.request(createCmd);
-    } catch (Exception e) {
-      SolrException.log(log, "Exception trying to create new replica on " + createUrl, e);
-      return false;
-    }
-    return true;
-  }
-  
-  private static class ValueComparator implements Comparator<String> {
-    Map<String,Counts> map;
-    
-    public ValueComparator(Map<String,Counts> map) {
-      this.map = map;
-    }
-    
-    public int compare(String a, String b) {
-      if (map.get(a).negRankingWeight >= map.get(b).negRankingWeight) {
-        return 1;
-      } else {
-        return -1;
-      }
-    }
-  }
-  
-  @Override
-  public void close() {
-    isClosed = true;
-    Thread lThread = thread;
-    if (lThread != null) {
-      lThread.interrupt();
-    }
-  }
-  
-  public boolean isClosed() {
-    return isClosed;
-  }
-  
-  
-  private static class Counts {
-    int collectionShardsOnNode = 0;
-    int negRankingWeight = 0;
-    int ourReplicas = 0;
-    
-    private Counts() {
-      
-    }
-    
-    private Counts(int totalReplicas, int ourReplicas) {
-      this.negRankingWeight = totalReplicas;
-      this.ourReplicas = ourReplicas;
-    }
-    
-    @Override
-    public String toString() {
-      return "Counts [negRankingWeight=" + negRankingWeight + ", sameSliceCount="
-          + ourReplicas + ", collectionShardsOnNode=" + collectionShardsOnNode + "]";
-    }
-  }
-  
-  static class DownReplica {
-    Replica replica;
-    Slice slice;
-    DocCollection collection;
-    
-    @Override
-    public String toString() {
-      return "DownReplica [replica=" + replica.getName() + ", slice="
-          + slice.getName() + ", collection=" + collection.getName() + "]";
-    }
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
index b0791e2..5c29fa4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
@@ -41,9 +41,6 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
 import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.response.UpdateResponse;
-import org.apache.solr.cloud.autoscaling.AutoScaling;
-import org.apache.solr.cloud.autoscaling.AutoScalingHandler;
-import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
 import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
@@ -62,8 +59,6 @@ import org.apache.solr.common.params.CollectionParams.CollectionAction;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
 import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.util.ContentStreamBase;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.SimpleOrderedMap;
@@ -74,10 +69,6 @@ import org.apache.solr.handler.component.ShardHandler;
 import org.apache.solr.handler.component.ShardHandlerFactory;
 import org.apache.solr.handler.component.ShardRequest;
 import org.apache.solr.handler.component.ShardResponse;
-import org.apache.solr.request.LocalSolrQueryRequest;
-import org.apache.solr.request.SolrQueryRequest;
-import org.apache.solr.request.SolrRequestInfo;
-import org.apache.solr.response.SolrQueryResponse;
 import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.apache.solr.util.RTimer;
 import org.apache.solr.util.TimeOut;
@@ -678,10 +669,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
       Thread.sleep(100);
     }
 
-    if (message.getBool(ZkStateReader.AUTO_ADD_REPLICAS, false)) {
-      forwardToAutoScaling(AutoScaling.AUTO_ADD_REPLICAS_TRIGGER_DSL);
-    }
-
     if (!areChangesVisible)
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not modify collection " + message);
   }
@@ -881,17 +868,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
     }
   }
 
-  void forwardToAutoScaling(String command) {
-    LocalSolrQueryRequest request = new LocalSolrQueryRequest(null, new ModifiableSolrParams());
-    request.getContext().put("httpMethod", "POST");
-    request.setContentStreams(Collections.singleton(new ContentStreamBase.StringStream(command)));
-    SolrQueryResponse response = new SolrQueryResponse();
-    overseer.getZkController().getCoreContainer().getRequestHandler(AutoScalingHandler.HANDLER_PATH).handleRequest(request, response);
-    if (!"success".equals(response.getValues().get("result"))) {
-      throw new SolrException(ErrorCode.SERVER_ERROR, "Failed when execute command on autoScalingHandler, return " + response);
-    }
-  }
-
   private NamedList waitForCoreAdminAsyncCallToComplete(String nodeName, String requestId) {
     ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
     ModifiableSolrParams params = new ModifiableSolrParams();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 512acbe..57cda45 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -1482,6 +1482,7 @@ public class ZkController {
             if (msgNodeName.equals(nodeName) && core.equals(msgCore)) {
               descriptor.getCloudDescriptor()
                   .setCoreNodeName(replica.getName());
+              getCoreContainer().getCoresLocator().persist(getCoreContainer(), descriptor);
               return;
             }
           }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanAction.java
index fac69f4..ff09469 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanAction.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoAddReplicasPlanAction.java
@@ -17,35 +17,35 @@
 
 package org.apache.solr.cloud.autoscaling;
 
-import java.util.HashSet;
-import java.util.Set;
 
+import org.apache.solr.client.solrj.cloud.autoscaling.NoneSuggester;
 import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.ZkStateReader;
 
 public class AutoAddReplicasPlanAction extends ComputePlanAction {
-  Set<String> autoAddReplicasCollections;
 
   @Override
   protected Policy.Suggester getSuggester(Policy.Session session, TriggerEvent event, ZkStateReader zkStateReader) {
-    Policy.Suggester suggester = super.getSuggester(session, event, zkStateReader);
-    if (autoAddReplicasCollections == null) {
-      autoAddReplicasCollections = new HashSet<>();
-
-      ClusterState clusterState = zkStateReader.getClusterState();
-      for (DocCollection collection: clusterState.getCollectionsMap().values()) {
-        if (collection.getAutoAddReplicas()) {
-          autoAddReplicasCollections.add(collection.getName());
-        }
-      }
+    // for backward compatibility
+    String autoAddReplicas = zkStateReader.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, (String) null);
+    if (autoAddReplicas != null && autoAddReplicas.equals("false")) {
+      return new NoneSuggester();
     }
 
-    for (String collection : autoAddReplicasCollections) {
-      suggester.hint(Policy.Suggester.Hint.COLL, collection);
+    Policy.Suggester suggester = super.getSuggester(session, event, zkStateReader);
+    ClusterState clusterState = zkStateReader.getClusterState();
+
+    boolean anyCollections = false;
+    for (DocCollection collection: clusterState.getCollectionsMap().values()) {
+      if (collection.getAutoAddReplicas()) {
+        anyCollections = true;
+        suggester.hint(Policy.Suggester.Hint.COLL, collection.getName());
+      }
     }
 
+    if (!anyCollections) return new NoneSuggester();
     return suggester;
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
index 0d192d3..ed24bf7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
@@ -148,7 +148,7 @@ public class AutoScaling {
       "    'set-trigger' : {" +
       "        'name' : '.auto_add_replicas'," +
       "        'event' : 'nodeLost'," +
-      "        'waitFor' : '5s'," +
+      "        'waitFor' : '{{waitFor}}s'," +
       "        'enabled' : true," +
       "        'actions' : [" +
       "            {" +

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
index 82d106d..fad5d96 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
@@ -37,6 +37,7 @@ import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.util.Utils;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/java/org/apache/solr/core/CloudConfig.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/CloudConfig.java b/solr/core/src/java/org/apache/solr/core/CloudConfig.java
index 447dd22..b971a0b 100644
--- a/solr/core/src/java/org/apache/solr/core/CloudConfig.java
+++ b/solr/core/src/java/org/apache/solr/core/CloudConfig.java
@@ -38,10 +38,6 @@ public class CloudConfig {
 
   private final int autoReplicaFailoverWaitAfterExpiration;
 
-  private final int autoReplicaFailoverWorkLoopDelay;
-
-  private final int autoReplicaFailoverBadNodeExpiration;
-
   private final String zkCredentialsProviderClass;
 
   private final String zkACLProviderClass;
@@ -51,9 +47,9 @@ public class CloudConfig {
   private final boolean createCollectionCheckLeaderActive;
 
   CloudConfig(String zkHost, int zkClientTimeout, int hostPort, String hostName, String hostContext, boolean useGenericCoreNames, 
-              int leaderVoteWait, int leaderConflictResolveWait, int autoReplicaFailoverWaitAfterExpiration, 
-              int autoReplicaFailoverWorkLoopDelay, int autoReplicaFailoverBadNodeExpiration, String zkCredentialsProviderClass, 
-              String zkACLProviderClass, int createCollectionWaitTimeTillActive, boolean createCollectionCheckLeaderActive) {
+              int leaderVoteWait, int leaderConflictResolveWait, int autoReplicaFailoverWaitAfterExpiration,
+              String zkCredentialsProviderClass, String zkACLProviderClass, int createCollectionWaitTimeTillActive,
+              boolean createCollectionCheckLeaderActive) {
     this.zkHost = zkHost;
     this.zkClientTimeout = zkClientTimeout;
     this.hostPort = hostPort;
@@ -63,8 +59,6 @@ public class CloudConfig {
     this.leaderVoteWait = leaderVoteWait;
     this.leaderConflictResolveWait = leaderConflictResolveWait;
     this.autoReplicaFailoverWaitAfterExpiration = autoReplicaFailoverWaitAfterExpiration;
-    this.autoReplicaFailoverWorkLoopDelay = autoReplicaFailoverWorkLoopDelay;
-    this.autoReplicaFailoverBadNodeExpiration = autoReplicaFailoverBadNodeExpiration;
     this.zkCredentialsProviderClass = zkCredentialsProviderClass;
     this.zkACLProviderClass = zkACLProviderClass;
     this.createCollectionWaitTimeTillActive = createCollectionWaitTimeTillActive;
@@ -116,14 +110,6 @@ public class CloudConfig {
     return autoReplicaFailoverWaitAfterExpiration;
   }
 
-  public int getAutoReplicaFailoverWorkLoopDelay() {
-    return autoReplicaFailoverWorkLoopDelay;
-  }
-
-  public int getAutoReplicaFailoverBadNodeExpiration() {
-    return autoReplicaFailoverBadNodeExpiration;
-  }
-
   public boolean getGenericCoreNodeNames() {
     return useGenericCoreNames;
   }
@@ -146,8 +132,6 @@ public class CloudConfig {
  
     // TODO: tune defaults
     private static final int DEFAULT_AUTO_REPLICA_FAILOVER_WAIT_AFTER_EXPIRATION = 30000;
-    private static final int DEFAULT_AUTO_REPLICA_FAILOVER_WORKLOOP_DELAY = 10000;
-    private static final int DEFAULT_AUTO_REPLICA_FAILOVER_BAD_NODE_EXPIRATION = 60000;
 
     private String zkHost = System.getProperty("zkHost");
     private int zkClientTimeout = Integer.getInteger("zkClientTimeout", DEFAULT_ZK_CLIENT_TIMEOUT);
@@ -158,8 +142,6 @@ public class CloudConfig {
     private int leaderVoteWait = DEFAULT_LEADER_VOTE_WAIT;
     private int leaderConflictResolveWait = DEFAULT_LEADER_CONFLICT_RESOLVE_WAIT;
     private int autoReplicaFailoverWaitAfterExpiration = DEFAULT_AUTO_REPLICA_FAILOVER_WAIT_AFTER_EXPIRATION;
-    private int autoReplicaFailoverWorkLoopDelay = DEFAULT_AUTO_REPLICA_FAILOVER_WORKLOOP_DELAY;
-    private int autoReplicaFailoverBadNodeExpiration = DEFAULT_AUTO_REPLICA_FAILOVER_BAD_NODE_EXPIRATION;
     private String zkCredentialsProviderClass;
     private String zkACLProviderClass;
     private int createCollectionWaitTimeTillActive = DEFAULT_CREATE_COLLECTION_ACTIVE_WAIT;
@@ -205,16 +187,6 @@ public class CloudConfig {
       return this;
     }
 
-    public CloudConfigBuilder setAutoReplicaFailoverWorkLoopDelay(int autoReplicaFailoverWorkLoopDelay) {
-      this.autoReplicaFailoverWorkLoopDelay = autoReplicaFailoverWorkLoopDelay;
-      return this;
-    }
-
-    public CloudConfigBuilder setAutoReplicaFailoverBadNodeExpiration(int autoReplicaFailoverBadNodeExpiration) {
-      this.autoReplicaFailoverBadNodeExpiration = autoReplicaFailoverBadNodeExpiration;
-      return this;
-    }
-
     public CloudConfigBuilder setZkCredentialsProviderClass(String zkCredentialsProviderClass) {
       this.zkCredentialsProviderClass = zkCredentialsProviderClass;
       return this;
@@ -237,8 +209,7 @@ public class CloudConfig {
     
     public CloudConfig build() {
       return new CloudConfig(zkHost, zkClientTimeout, hostPort, hostName, hostContext, useGenericCoreNames, leaderVoteWait, 
-                             leaderConflictResolveWait, autoReplicaFailoverWaitAfterExpiration, autoReplicaFailoverWorkLoopDelay, 
-                             autoReplicaFailoverBadNodeExpiration, zkCredentialsProviderClass, zkACLProviderClass, createCollectionWaitTimeTillActive,
+                             leaderConflictResolveWait, autoReplicaFailoverWaitAfterExpiration, zkCredentialsProviderClass, zkACLProviderClass, createCollectionWaitTimeTillActive,
                              createCollectionCheckLeaderActive);
     }
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/java/org/apache/solr/core/CoreContainer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index a43af96..4efd369 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -101,6 +101,7 @@ import org.apache.solr.security.HttpClientBuilderPlugin;
 import org.apache.solr.security.PKIAuthenticationPlugin;
 import org.apache.solr.security.SecurityPluginHolder;
 import org.apache.solr.update.SolrCoreState;
+import org.apache.solr.update.UpdateLog;
 import org.apache.solr.update.UpdateShardHandler;
 import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.apache.solr.util.stats.MetricUtils;
@@ -965,7 +966,7 @@ public class CoreContainer {
         zkSys.getZkController().preRegister(dcore);
       }
 
-      ConfigSet coreConfig = coreConfigService.getConfig(dcore);
+      ConfigSet coreConfig = getConfigSet(dcore);
       dcore.setConfigSetTrusted(coreConfig.isTrusted());
       log.info("Creating SolrCore '{}' using configuration from {}, trusted={}", dcore.getName(), coreConfig.getName(), dcore.isConfigSetTrusted());
       try {
@@ -1000,6 +1001,21 @@ public class CoreContainer {
       MDCLoggingContext.clear();
     }
   }
+
+  public boolean isSharedFs(CoreDescriptor cd) {
+    try (SolrCore core = this.getCore(cd.getName())) {
+      if (core != null) {
+        return core.getDirectoryFactory().isSharedStorage();
+      } else {
+        ConfigSet configSet = getConfigSet(cd);
+        return DirectoryFactory.loadDirectoryFactory(configSet.getSolrConfig(), this, null).isSharedStorage();
+      }
+    }
+  }
+
+  private ConfigSet getConfigSet(CoreDescriptor cd) {
+    return coreConfigService.getConfig(cd);
+  }
   
   /**
    * Take action when we failed to create a SolrCore. If error is due to corrupt index, try to recover. Various recovery

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java b/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java
index 8cdf947..c15127b 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java
@@ -376,15 +376,13 @@ public class SolrXmlConfig {
         case "zkClientTimeout":
           builder.setZkClientTimeout(parseInt(name, value));
           break;
-        case "autoReplicaFailoverBadNodeExpiration":
-          builder.setAutoReplicaFailoverBadNodeExpiration(parseInt(name, value));
+        case "autoReplicaFailoverBadNodeExpiration": case "autoReplicaFailoverWorkLoopDelay":
+          //TODO remove this in Solr 8.0
+          log.info("Configuration parameter " + name + " is ignored");
           break;
         case "autoReplicaFailoverWaitAfterExpiration":
           builder.setAutoReplicaFailoverWaitAfterExpiration(parseInt(name, value));
           break;
-        case "autoReplicaFailoverWorkLoopDelay":
-          builder.setAutoReplicaFailoverWorkLoopDelay(parseInt(name, value));
-          break;
         case "zkHost":
           builder.setZkHost(value);
           break;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/test/org/apache/solr/cloud/ClusterStateMockUtil.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ClusterStateMockUtil.java b/solr/core/src/test/org/apache/solr/cloud/ClusterStateMockUtil.java
index e0cf3f7..3482596 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ClusterStateMockUtil.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ClusterStateMockUtil.java
@@ -18,13 +18,10 @@
 package org.apache.solr.cloud;
 
 
-import java.io.Closeable;
-import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -40,22 +37,12 @@ public class ClusterStateMockUtil {
 
   private final static Pattern BLUEPRINT = Pattern.compile("([a-z])(\\d+)?(?:(['A','R','D','F']))?(\\*)?");
 
-  protected static class Result implements Closeable {
-    OverseerAutoReplicaFailoverThread.DownReplica badReplica;
-    ZkStateReader reader;
-
-    @Override
-    public void close() throws IOException {
-      reader.close();
-    }
-  }
-
-  protected static ClusterStateMockUtil.Result buildClusterState(List<Result> results, String string, String ... liveNodes) {
-    return buildClusterState(results, string, 1, liveNodes);
+  protected static ZkStateReader buildClusterState(String string, String ... liveNodes) {
+    return buildClusterState(string, 1, liveNodes);
   }
 
-  protected static ClusterStateMockUtil.Result buildClusterState(List<Result> results, String string, int replicationFactor, String ... liveNodes) {
-    return buildClusterState(results, string, replicationFactor, 10, liveNodes);
+  protected static ZkStateReader buildClusterState(String string, int replicationFactor, String ... liveNodes) {
+    return buildClusterState(string, replicationFactor, 10, liveNodes);
   }
 
   /**
@@ -118,9 +105,7 @@ public class ClusterStateMockUtil {
    *
    */
   @SuppressWarnings("resource")
-  protected static ClusterStateMockUtil.Result buildClusterState(List<Result> results, String clusterDescription, int replicationFactor, int maxShardsPerNode, String ... liveNodes) {
-    ClusterStateMockUtil.Result result = new ClusterStateMockUtil.Result();
-
+  protected static ZkStateReader buildClusterState(String clusterDescription, int replicationFactor, int maxShardsPerNode, String ... liveNodes) {
     Map<String,Slice> slices = null;
     Map<String,Replica> replicas = null;
     Map<String,Object> collectionProps = new HashMap<>();
@@ -181,23 +166,12 @@ public class ClusterStateMockUtil {
           String nodeName = "baseUrl" + node + "_";
           String replicaName = "replica" + replicaCount++;
 
-          if ("*".equals(m.group(4))) {
-            replicaName += " (bad)";
-          }
-
           replicaPropMap.put(ZkStateReader.NODE_NAME_PROP, nodeName);
           replicaPropMap.put(ZkStateReader.BASE_URL_PROP, "http://baseUrl" + node);
           replicaPropMap.put(ZkStateReader.STATE_PROP, state.toString());
 
           replica = new Replica(replicaName, replicaPropMap);
 
-          if ("*".equals(m.group(4))) {
-            result.badReplica = new OverseerAutoReplicaFailoverThread.DownReplica();
-            result.badReplica.replica = replica;
-            result.badReplica.slice = slice;
-            result.badReplica.collection = docCollection;
-          }
-
           replicas.put(replica.getName(), replica);
           break;
         default:
@@ -216,17 +190,7 @@ public class ClusterStateMockUtil {
     }
     System.err.println(json);
 
-    // todo remove the limitation of always having a bad replica
-    assert result.badReplica != null : "Is there no bad replica?";
-    assert result.badReplica.slice != null : "Is there no bad replica?";
-
-    result.reader = reader;
-
-    if (results != null) {
-      results.add(result);
-    }
-
-    return result;
+    return reader;
   }
 
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSFailoverTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSFailoverTest.java b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSFailoverTest.java
new file mode 100644
index 0000000..5edae7c
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSFailoverTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.io.IOException;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.cloud.hdfs.HdfsTestUtil;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ClusterStateUtil;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkConfigManager;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.util.BadHdfsThreadsFilter;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+@ThreadLeakFilters(defaultFilters = true, filters = {
+    BadHdfsThreadsFilter.class, // hdfs currently leaks thread(s)
+    MoveReplicaHDFSTest.ForkJoinThreadsFilter.class
+})
+public class MoveReplicaHDFSFailoverTest extends SolrCloudTestCase {
+  private static MiniDFSCluster dfsCluster;
+
+  @BeforeClass
+  public static void setupClass() throws Exception {
+    configureCluster(2)
+        .addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf"))
+        .configure();
+
+    System.setProperty("solr.hdfs.blockcache.enabled", "false");
+    dfsCluster = HdfsTestUtil.setupClass(createTempDir().toFile().getAbsolutePath());
+
+    ZkConfigManager configManager = new ZkConfigManager(zkClient());
+    configManager.uploadConfigDir(configset("cloud-hdfs"), "conf1");
+
+    System.setProperty("solr.hdfs.home", HdfsTestUtil.getDataDir(dfsCluster, "data"));
+  }
+
+  @AfterClass
+  public static void teardownClass() throws Exception {
+    cluster.shutdown(); // need to close before the MiniDFSCluster
+    HdfsTestUtil.teardownClass(dfsCluster);
+    dfsCluster = null;
+  }
+
+  @Test
+  public void testDataDirAndUlogAreMaintained() throws Exception {
+    String coll = "movereplicatest_coll2";
+    CollectionAdminRequest.createCollection(coll, "conf1", 1, 1)
+        .setCreateNodeSet("")
+        .process(cluster.getSolrClient());
+    String hdfsUri = HdfsTestUtil.getURI(dfsCluster);
+    String dataDir = hdfsUri + "/dummyFolder/dataDir";
+    String ulogDir = hdfsUri + "/dummyFolder2/ulogDir";
+    CollectionAdminResponse res = CollectionAdminRequest
+        .addReplicaToShard(coll, "shard1")
+        .setDataDir(dataDir)
+        .setUlogDir(ulogDir)
+        .setNode(cluster.getJettySolrRunner(0).getNodeName())
+        .process(cluster.getSolrClient());
+
+    ulogDir += "/tlog";
+    ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
+    assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(zkStateReader, 120000));
+
+    DocCollection docCollection = zkStateReader.getClusterState().getCollection(coll);
+    Replica replica = docCollection.getReplicas().iterator().next();
+    assertTrue(replica.getStr("ulogDir"), replica.getStr("ulogDir").equals(ulogDir) || replica.getStr("ulogDir").equals(ulogDir+'/'));
+    assertTrue(replica.getStr("dataDir"),replica.getStr("dataDir").equals(dataDir) || replica.getStr("dataDir").equals(dataDir+'/'));
+
+    new CollectionAdminRequest.MoveReplica(coll, replica.getName(), cluster.getJettySolrRunner(1).getNodeName())
+        .process(cluster.getSolrClient());
+    assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(zkStateReader, 120000));
+    docCollection = zkStateReader.getClusterState().getCollection(coll);
+    assertEquals(1, docCollection.getSlice("shard1").getReplicas().size());
+    Replica newReplica = docCollection.getReplicas().iterator().next();
+    assertEquals(newReplica.getNodeName(), cluster.getJettySolrRunner(1).getNodeName());
+    assertTrue(newReplica.getStr("ulogDir"), newReplica.getStr("ulogDir").equals(ulogDir) || newReplica.getStr("ulogDir").equals(ulogDir+'/'));
+    assertTrue(newReplica.getStr("dataDir"),newReplica.getStr("dataDir").equals(dataDir) || newReplica.getStr("dataDir").equals(dataDir+'/'));
+
+    assertEquals(replica.getName(), newReplica.getName());
+    assertEquals(replica.getCoreName(), newReplica.getCoreName());
+    assertFalse(replica.getNodeName().equals(newReplica.getNodeName()));
+    final int numDocs = 100;
+    addDocs(coll, numDocs);  // indexed but not committed
+
+    cluster.getJettySolrRunner(1).stop();
+    Thread.sleep(5000);
+    new CollectionAdminRequest.MoveReplica(coll, newReplica.getName(), cluster.getJettySolrRunner(0).getNodeName())
+        .process(cluster.getSolrClient());
+    assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(zkStateReader, 120000));
+
+    // assert that the old core will be removed on startup
+    cluster.getJettySolrRunner(1).start();
+    assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(zkStateReader, 120000));
+    docCollection = zkStateReader.getClusterState().getCollection(coll);
+    assertEquals(1, docCollection.getReplicas().size());
+    newReplica = docCollection.getReplicas().iterator().next();
+    assertEquals(newReplica.getNodeName(), cluster.getJettySolrRunner(0).getNodeName());
+    assertTrue(newReplica.getStr("ulogDir"), newReplica.getStr("ulogDir").equals(ulogDir) || newReplica.getStr("ulogDir").equals(ulogDir+'/'));
+    assertTrue(newReplica.getStr("dataDir"),newReplica.getStr("dataDir").equals(dataDir) || newReplica.getStr("dataDir").equals(dataDir+'/'));
+
+    assertEquals(0, cluster.getJettySolrRunner(1).getCoreContainer().getCores().size());
+
+    cluster.getSolrClient().commit(coll);
+    assertEquals(numDocs, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound());
+    CollectionAdminRequest.deleteCollection(coll).process(cluster.getSolrClient());
+  }
+
+  @Test
+  public void testOldReplicaIsDeleted() throws Exception {
+    String coll = "movereplicatest_coll3";
+    CollectionAdminRequest.createCollection(coll, "conf1", 1, 1)
+        .setCreateNodeSet(cluster.getJettySolrRunner(0).getNodeName())
+        .process(cluster.getSolrClient());
+    addDocs(coll, 2);
+    Replica replica = getCollectionState(coll).getReplicas().iterator().next();
+
+    cluster.getJettySolrRunners().get(0).stop();
+    assertTrue(ClusterStateUtil.waitForAllReplicasNotLive(cluster.getSolrClient().getZkStateReader(), 20000));
+
+    // move replica from node0 -> node1
+    new CollectionAdminRequest.MoveReplica(coll, replica.getName(), cluster.getJettySolrRunner(1).getNodeName())
+        .process(cluster.getSolrClient());
+    assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), 20000));
+
+    cluster.getJettySolrRunners().get(1).stop();
+    assertTrue(ClusterStateUtil.waitForAllReplicasNotLive(cluster.getSolrClient().getZkStateReader(), 20000));
+
+    // node0 will delete it replica because of CloudUtil.checkSharedFSFailoverReplaced()
+    cluster.getJettySolrRunners().get(0).start();
+    Thread.sleep(5000);
+    assertTrue(ClusterStateUtil.waitForAllReplicasNotLive(cluster.getSolrClient().getZkStateReader(), 20000));
+
+    cluster.getJettySolrRunners().get(1).start();
+    assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), 20000));
+
+    assertEquals(1, getCollectionState(coll).getReplicas().size());
+    assertEquals(2, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound());
+    CollectionAdminRequest.deleteCollection(coll).process(cluster.getSolrClient());
+  }
+
+  @Test
+  public void testOldReplicaIsDeletedInRaceCondition() throws Exception {
+    String coll = "movereplicatest_coll4";
+    CollectionAdminRequest.createCollection(coll, "conf1", 1, 1)
+        .setCreateNodeSet(cluster.getJettySolrRunner(0).getNodeName())
+        .process(cluster.getSolrClient());
+    addDocs(coll, 100);
+    Replica replica = getCollectionState(coll).getReplicas().iterator().next();
+
+    cluster.getJettySolrRunners().get(0).stop();
+    assertTrue(ClusterStateUtil.waitForAllReplicasNotLive(cluster.getSolrClient().getZkStateReader(), 20000));
+
+    // move replica from node0 -> node1
+    new CollectionAdminRequest.MoveReplica(coll, replica.getName(), cluster.getJettySolrRunner(1).getNodeName())
+        .process(cluster.getSolrClient());
+    assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), 20000));
+
+    cluster.getJettySolrRunners().get(1).stop();
+    assertTrue(ClusterStateUtil.waitForAllReplicasNotLive(cluster.getSolrClient().getZkStateReader(), 20000));
+
+    cluster.getJettySolrRunners().get(1).start();
+    // node0 will delete it replica because of CloudUtil.checkSharedFSFailoverReplaced()
+    cluster.getJettySolrRunners().get(0).start();
+    Thread.sleep(5000);
+    assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), 20000));
+
+    assertEquals(1, getCollectionState(coll).getReplicas().size());
+    assertEquals(100, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound());
+    CollectionAdminRequest.deleteCollection(coll).process(cluster.getSolrClient());
+  }
+
+  private void addDocs(String collection, int numDocs) throws SolrServerException, IOException {
+    SolrClient solrClient = cluster.getSolrClient();
+    for (int docId = 1; docId <= numDocs; docId++) {
+      SolrInputDocument doc = new SolrInputDocument();
+      doc.addField("id", docId);
+      solrClient.add(collection, doc);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSUlogDirTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSUlogDirTest.java b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSUlogDirTest.java
deleted file mode 100644
index a27a39d..0000000
--- a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSUlogDirTest.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud;
-
-import java.io.IOException;
-
-import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrQuery;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.client.solrj.response.CollectionAdminResponse;
-import org.apache.solr.cloud.hdfs.HdfsTestUtil;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.ClusterStateUtil;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.ZkConfigManager;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.util.BadHdfsThreadsFilter;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-@ThreadLeakFilters(defaultFilters = true, filters = {
-    BadHdfsThreadsFilter.class, // hdfs currently leaks thread(s)
-    MoveReplicaHDFSTest.ForkJoinThreadsFilter.class
-})
-public class MoveReplicaHDFSUlogDirTest extends SolrCloudTestCase {
-  private static MiniDFSCluster dfsCluster;
-
-  @BeforeClass
-  public static void setupClass() throws Exception {
-    configureCluster(2)
-        .addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf"))
-        .configure();
-
-    System.setProperty("solr.hdfs.blockcache.enabled", "false");
-    dfsCluster = HdfsTestUtil.setupClass(createTempDir().toFile().getAbsolutePath());
-
-    ZkConfigManager configManager = new ZkConfigManager(zkClient());
-    configManager.uploadConfigDir(configset("cloud-hdfs"), "conf1");
-
-    System.setProperty("solr.hdfs.home", HdfsTestUtil.getDataDir(dfsCluster, "data"));
-  }
-
-  @AfterClass
-  public static void teardownClass() throws Exception {
-    cluster.shutdown(); // need to close before the MiniDFSCluster
-    HdfsTestUtil.teardownClass(dfsCluster);
-    dfsCluster = null;
-  }
-
-  @Test
-  public void testDataDirAndUlogAreMaintained() throws Exception {
-    String coll = "movereplicatest_coll2";
-    CollectionAdminRequest.createCollection(coll, "conf1", 1, 1)
-        .setCreateNodeSet("")
-        .process(cluster.getSolrClient());
-    String hdfsUri = HdfsTestUtil.getURI(dfsCluster);
-    String dataDir = hdfsUri + "/dummyFolder/dataDir";
-    String ulogDir = hdfsUri + "/dummyFolder2/ulogDir";
-    CollectionAdminResponse res = CollectionAdminRequest
-        .addReplicaToShard(coll, "shard1")
-        .setDataDir(dataDir)
-        .setUlogDir(ulogDir)
-        .setNode(cluster.getJettySolrRunner(0).getNodeName())
-        .process(cluster.getSolrClient());
-
-    ulogDir += "/tlog";
-    ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
-    assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(zkStateReader, 120000));
-
-    DocCollection docCollection = zkStateReader.getClusterState().getCollection(coll);
-    Replica replica = docCollection.getReplicas().iterator().next();
-    assertTrue(replica.getStr("ulogDir"), replica.getStr("ulogDir").equals(ulogDir) || replica.getStr("ulogDir").equals(ulogDir+'/'));
-    assertTrue(replica.getStr("dataDir"),replica.getStr("dataDir").equals(dataDir) || replica.getStr("dataDir").equals(dataDir+'/'));
-
-    new CollectionAdminRequest.MoveReplica(coll, replica.getName(), cluster.getJettySolrRunner(1).getNodeName())
-        .process(cluster.getSolrClient());
-    assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(zkStateReader, 120000));
-    docCollection = zkStateReader.getClusterState().getCollection(coll);
-    assertEquals(1, docCollection.getSlice("shard1").getReplicas().size());
-    Replica newReplica = docCollection.getReplicas().iterator().next();
-    assertEquals(newReplica.getNodeName(), cluster.getJettySolrRunner(1).getNodeName());
-    assertTrue(newReplica.getStr("ulogDir"), newReplica.getStr("ulogDir").equals(ulogDir) || newReplica.getStr("ulogDir").equals(ulogDir+'/'));
-    assertTrue(newReplica.getStr("dataDir"),newReplica.getStr("dataDir").equals(dataDir) || newReplica.getStr("dataDir").equals(dataDir+'/'));
-
-    assertEquals(replica.getName(), newReplica.getName());
-    assertEquals(replica.getCoreName(), newReplica.getCoreName());
-    assertFalse(replica.getNodeName().equals(newReplica.getNodeName()));
-    final int numDocs = 100;
-    addDocs(coll, numDocs);  // indexed but not committed
-
-    cluster.getJettySolrRunner(1).stop();
-    Thread.sleep(5000);
-    new CollectionAdminRequest.MoveReplica(coll, newReplica.getName(), cluster.getJettySolrRunner(0).getNodeName())
-        .process(cluster.getSolrClient());
-    assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(zkStateReader, 120000));
-
-    // assert that the old core will be removed on startup
-    cluster.getJettySolrRunner(1).start();
-    assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(zkStateReader, 120000));
-    docCollection = zkStateReader.getClusterState().getCollection(coll);
-    assertEquals(1, docCollection.getReplicas().size());
-    newReplica = docCollection.getReplicas().iterator().next();
-    assertEquals(newReplica.getNodeName(), cluster.getJettySolrRunner(0).getNodeName());
-    assertTrue(newReplica.getStr("ulogDir"), newReplica.getStr("ulogDir").equals(ulogDir) || newReplica.getStr("ulogDir").equals(ulogDir+'/'));
-    assertTrue(newReplica.getStr("dataDir"),newReplica.getStr("dataDir").equals(dataDir) || newReplica.getStr("dataDir").equals(dataDir+'/'));
-
-    assertEquals(0, cluster.getJettySolrRunner(1).getCoreContainer().getCores().size());
-
-    cluster.getSolrClient().commit(coll);
-    assertEquals(numDocs, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound());
-  }
-
-  private void addDocs(String collection, int numDocs) throws SolrServerException, IOException {
-    SolrClient solrClient = cluster.getSolrClient();
-    for (int docId = 1; docId <= numDocs; docId++) {
-      SolrInputDocument doc = new SolrInputDocument();
-      doc.addField("id", docId);
-      solrClient.add(collection, doc);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0f7e3be5/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java b/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java
index ffa6ba2..a446f29 100644
--- a/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java
@@ -42,31 +42,30 @@ public class NodeMutatorTest extends SolrTestCaseJ4Test {
   @Test
   public void downNodeReportsAllImpactedCollectionsAndNothingElse() throws IOException {
     NodeMutator nm = new NodeMutator();
-    ZkNodeProps props = new ZkNodeProps(ZkStateReader.NODE_NAME_PROP, NODE1);
 
     //We use 2 nodes with maxShardsPerNode as 1
     //Collection1: 2 shards X 1 replica = replica1 on node1 and replica2 on node2
     //Collection2: 1 shard X 1 replica = replica1 on node2
-    ClusterStateMockUtil.Result result = ClusterStateMockUtil.buildClusterState(null, "csrr2rD*csr2", 1, 1, NODE1, NODE2);
-    ClusterState clusterState = result.reader.getClusterState();
+    ZkStateReader reader = ClusterStateMockUtil.buildClusterState("csrr2rDcsr2", 1, 1, NODE1, NODE2);
+    ClusterState clusterState = reader.getClusterState();
     assertEquals(clusterState.getCollection("collection1").getReplica("replica1").getBaseUrl(), NODE1_URL);
     assertEquals(clusterState.getCollection("collection1").getReplica("replica2").getBaseUrl(), NODE2_URL);
     assertEquals(clusterState.getCollection("collection2").getReplica("replica4").getBaseUrl(), NODE2_URL);
 
-    props = new ZkNodeProps(ZkStateReader.NODE_NAME_PROP, NODE1);
+    ZkNodeProps props = new ZkNodeProps(ZkStateReader.NODE_NAME_PROP, NODE1);
     List<ZkWriteCommand> writes = nm.downNode(clusterState, props);
     assertEquals(writes.size(), 1);
     assertEquals(writes.get(0).name, "collection1");
     assertEquals(writes.get(0).collection.getReplica("replica1").getState(), Replica.State.DOWN);
     assertEquals(writes.get(0).collection.getReplica("replica2").getState(), Replica.State.ACTIVE);
-    result.close();
+    reader.close();
 
     //We use 3 nodes with maxShardsPerNode as 1
     //Collection1: 2 shards X 1 replica = replica1 on node1 and replica2 on node2
     //Collection2: 1 shard X 1 replica = replica1 on node2
     //Collection3: 1 shard X 3 replica = replica1 on node1 , replica2 on node2, replica3 on node3
-    result = ClusterStateMockUtil.buildClusterState(null, "csrr2rD*csr2csr1r2r3", 1, 1, NODE1, NODE2, NODE3);
-    clusterState = result.reader.getClusterState();
+    reader = ClusterStateMockUtil.buildClusterState("csrr2rDcsr2csr1r2r3", 1, 1, NODE1, NODE2, NODE3);
+    clusterState = reader.getClusterState();
     assertEquals(clusterState.getCollection("collection1").getReplica("replica1").getBaseUrl(), NODE1_URL);
     assertEquals(clusterState.getCollection("collection1").getReplica("replica2").getBaseUrl(), NODE2_URL);
 
@@ -90,6 +89,6 @@ public class NodeMutatorTest extends SolrTestCaseJ4Test {
         fail("No other collection needs to be changed");
       }
     }
-    result.close();
+    reader.close();
   }
 }