You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/07/04 17:58:25 UTC

[1/2] lucene-solr:branch_7_0: SOLR-10878: Fix precommit.

Repository: lucene-solr
Updated Branches:
  refs/heads/branch_7_0 b73e8e5ef -> 174d55f41


SOLR-10878: Fix precommit.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/174d55f4
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/174d55f4
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/174d55f4

Branch: refs/heads/branch_7_0
Commit: 174d55f41486b5bc30661d116e1119abfcc30ba3
Parents: a32ba2c
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Tue Jul 4 19:46:10 2017 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Tue Jul 4 19:58:14 2017 +0200

----------------------------------------------------------------------
 .../java/org/apache/solr/cloud/MoveReplicaCmd.java  |  1 -
 .../org/apache/solr/cloud/MoveReplicaHDFSTest.java  | 16 ++++++++++++++++
 .../test/org/apache/solr/cloud/MoveReplicaTest.java |  1 -
 3 files changed, 16 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/174d55f4/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
index 53d05e1..4d6e26d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
@@ -31,7 +31,6 @@ import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.Utils;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/174d55f4/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSTest.java b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSTest.java
index 884d49e..dc6318b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSTest.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.solr.cloud;
 
 import com.carrotsearch.randomizedtesting.ThreadFilter;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/174d55f4/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
index 8f00431..930e8ee 100644
--- a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
@@ -35,7 +35,6 @@ import org.apache.solr.client.solrj.response.RequestStatusState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkStateReader;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;


[2/2] lucene-solr:branch_7_0: SOLR-10878: MOVEREPLICA command may lose data when replicationFactor==1.

Posted by ab...@apache.org.
SOLR-10878: MOVEREPLICA command may lose data when replicationFactor==1.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/a32ba2c9
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/a32ba2c9
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/a32ba2c9

Branch: refs/heads/branch_7_0
Commit: a32ba2c9560cb1e6ad79854a382de668145994c4
Parents: b73e8e5
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Tue Jul 4 11:33:41 2017 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Tue Jul 4 19:58:14 2017 +0200

----------------------------------------------------------------------
 solr/CHANGES.txt                                |  2 +
 .../org/apache/solr/cloud/AddReplicaCmd.java    |  2 +-
 .../org/apache/solr/cloud/DeleteReplicaCmd.java |  2 +-
 .../org/apache/solr/cloud/MoveReplicaCmd.java   | 43 ++++++++--
 .../cloud/OverseerCollectionMessageHandler.java | 23 +++---
 .../org/apache/solr/cloud/ReplaceNodeCmd.java   | 43 +++++++---
 .../apache/solr/cloud/MoveReplicaHDFSTest.java  | 53 ++++++++++++
 .../org/apache/solr/cloud/MoveReplicaTest.java  | 84 +++++++++++++++++++-
 8 files changed, 220 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a32ba2c9/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 6811905..6abaceb 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -241,6 +241,8 @@ Bug Fixes
 * SOLR-6807: CloudSolrClient's ZK state version check with the server was ignored when handleSelect=false
   (David Smiley)
 
+* SOLR-10878: MOVEREPLICA command may lose data when replicationFactor is 1. (ab, shalin)
+
 Optimizations
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a32ba2c9/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
index 63acdd1..c42d073 100644
--- a/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
@@ -68,7 +68,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
 
   ZkNodeProps addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
       throws KeeperException, InterruptedException {
-    log.info("addReplica() : {}", Utils.toJSONString(message));
+    log.debug("addReplica() : {}", Utils.toJSONString(message));
     String collection = message.getStr(COLLECTION_PROP);
     String node = message.getStr(CoreAdminParams.NODE);
     String shard = message.getStr(SHARD_ID_PROP);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a32ba2c9/solr/core/src/java/org/apache/solr/cloud/DeleteReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DeleteReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/DeleteReplicaCmd.java
index b79fa46..e71d7e8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DeleteReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DeleteReplicaCmd.java
@@ -265,7 +265,7 @@ public class DeleteReplicaCmd implements Cmd {
       try {
         if (!callable.call())
           throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-                  "Could not  remove replica : " + collectionName + "/" + shard + "/" + replicaName);
+                  "Could not remove replica : " + collectionName + "/" + shard + "/" + replicaName);
       } catch (InterruptedException | KeeperException e) {
         throw e;
       } catch (Exception ex) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a32ba2c9/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
index fed1398..53d05e1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
@@ -22,6 +22,8 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
@@ -29,6 +31,7 @@ import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.Utils;
@@ -56,10 +59,11 @@ public class MoveReplicaCmd implements Cmd{
   }
 
   private void moveReplica(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
-    log.info("moveReplica() : {}", Utils.toJSONString(message));
+    log.debug("moveReplica() : {}", Utils.toJSONString(message));
     ocmh.checkRequired(message, COLLECTION_PROP, "targetNode");
     String collection = message.getStr(COLLECTION_PROP);
     String targetNode = message.getStr("targetNode");
+    int timeout = message.getInt("timeout", 10 * 60); // 10 minutes
 
     String async = message.getStr(ASYNC);
 
@@ -103,14 +107,14 @@ public class MoveReplicaCmd implements Cmd{
     assert slice != null;
     Object dataDir = replica.get("dataDir");
     if (dataDir != null && dataDir.toString().startsWith("hdfs:/")) {
-      moveHdfsReplica(clusterState, results, dataDir.toString(), targetNode, async, coll, replica, slice);
+      moveHdfsReplica(clusterState, results, dataDir.toString(), targetNode, async, coll, replica, slice, timeout);
     } else {
-      moveNormalReplica(clusterState, results, targetNode, async, coll, replica, slice);
+      moveNormalReplica(clusterState, results, targetNode, async, coll, replica, slice, timeout);
     }
   }
 
   private void moveHdfsReplica(ClusterState clusterState, NamedList results, String dataDir, String targetNode, String async,
-                                 DocCollection coll, Replica replica, Slice slice) throws Exception {
+                                 DocCollection coll, Replica replica, Slice slice, int timeout) throws Exception {
     String newCoreName = Assign.buildCoreName(coll, slice.getName(), replica.getType());
 
     ZkNodeProps removeReplicasProps = new ZkNodeProps(
@@ -154,7 +158,7 @@ public class MoveReplicaCmd implements Cmd{
   }
 
   private void moveNormalReplica(ClusterState clusterState, NamedList results, String targetNode, String async,
-                                 DocCollection coll, Replica replica, Slice slice) throws Exception {
+                                 DocCollection coll, Replica replica, Slice slice, int timeout) throws Exception {
     String newCoreName = Assign.buildCoreName(coll, slice.getName(), replica.getType());
     ZkNodeProps addReplicasProps = new ZkNodeProps(
         COLLECTION_PROP, coll.getName(),
@@ -163,20 +167,47 @@ public class MoveReplicaCmd implements Cmd{
         CoreAdminParams.NAME, newCoreName);
     if(async!=null) addReplicasProps.getProperties().put(ASYNC, async);
     NamedList addResult = new NamedList();
+    CountDownLatch countDownLatch = new CountDownLatch(1);
+    ReplaceNodeCmd.RecoveryWatcher watcher = null;
+    if (replica.equals(slice.getLeader())) {
+      watcher = new ReplaceNodeCmd.RecoveryWatcher(coll.getName(), slice.getName(),
+          replica.getName(), null, countDownLatch);
+      ocmh.zkStateReader.registerCollectionStateWatcher(coll.getName(), watcher);
+    }
     ocmh.addReplica(clusterState, addReplicasProps, addResult, null);
     if (addResult.get("failure") != null) {
       String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
           " on node=%s", coll.getName(), slice.getName(), targetNode);
       log.warn(errorString);
       results.add("failure", errorString);
+      if (watcher != null) { // unregister
+        ocmh.zkStateReader.registerCollectionStateWatcher(coll.getName(), watcher);
+      }
       return;
     }
+    // wait for the other replica to be active if the source replica was a leader
+    if (watcher != null) {
+      try {
+        log.debug("Waiting for leader's replica to recover.");
+        if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
+          String errorString = String.format(Locale.ROOT, "Timed out waiting for leader's replica to recover, collection=%s shard=%s" +
+              " on node=%s", coll.getName(), slice.getName(), targetNode);
+          log.warn(errorString);
+          results.add("failure", errorString);
+          return;
+        } else {
+          log.debug("Replica " + watcher.getRecoveredReplica() + " is active - deleting the source...");
+        }
+      } finally {
+        ocmh.zkStateReader.removeCollectionStateWatcher(coll.getName(), watcher);
+      }
+    }
 
     ZkNodeProps removeReplicasProps = new ZkNodeProps(
         COLLECTION_PROP, coll.getName(),
         SHARD_ID_PROP, slice.getName(),
         REPLICA_PROP, replica.getName());
-    if(async!=null) removeReplicasProps.getProperties().put(ASYNC, async);
+    if (async != null) removeReplicasProps.getProperties().put(ASYNC, async);
     NamedList deleteResult = new NamedList();
     ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult, null);
     if (deleteResult.get("failure") != null) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a32ba2c9/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
index a8d74e8..2c55f3c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
@@ -419,20 +419,25 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
 
   boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException {
     TimeOut timeout = new TimeOut(timeoutms, TimeUnit.MILLISECONDS);
-    boolean deleted = false;
-    while (! timeout.hasTimedOut()) {
-      Thread.sleep(100);
-      DocCollection docCollection = zkStateReader.getClusterState().getCollection(collectionName);
-      if(docCollection != null) {
+    // TODO: remove this workaround for SOLR-9440
+    zkStateReader.registerCore(collectionName);
+    try {
+      while (! timeout.hasTimedOut()) {
+        Thread.sleep(100);
+        DocCollection docCollection = zkStateReader.getClusterState().getCollection(collectionName);
+        if (docCollection == null) { // someone already deleted the collection
+          return true;
+        }
         Slice slice = docCollection.getSlice(shard);
         if(slice == null || slice.getReplica(replicaName) == null) {
-          deleted =  true;
+          return true;
         }
       }
-      // Return true if either someone already deleted the collection/slice/replica.
-      if (docCollection == null || deleted) break;
+      // replica still exists after the timeout
+      return false;
+    } finally {
+      zkStateReader.unregisterCore(collectionName);
     }
-    return deleted;
   }
 
   void deleteCoreNode(String collectionName, String replicaName, Replica replica, String core) throws KeeperException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a32ba2c9/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
index 5adbe8c..ba60908 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
@@ -93,15 +93,6 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
     CountDownLatch replicasToRecover = new CountDownLatch(numLeaders);
 
     for (ZkNodeProps sourceReplica : sourceReplicas) {
-      if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false)) {
-        String shardName = sourceReplica.getStr(SHARD_ID_PROP);
-        String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
-        String collectionName = sourceReplica.getStr(COLLECTION_PROP);
-        String key = collectionName + "_" + replicaName;
-        RecoveryWatcher watcher = new RecoveryWatcher(collectionName, shardName, replicaName, replicasToRecover);
-        watchers.put(key, watcher);
-        zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
-      }
       NamedList nl = new NamedList();
       log.info("Going to create replica for collection={} shard={} on node={}", sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), target);
       ZkNodeProps msg = sourceReplica.plus("parallel", String.valueOf(parallel)).plus(CoreAdminParams.NODE, target);
@@ -128,6 +119,16 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
 
       if (addedReplica != null) {
         createdReplicas.add(addedReplica);
+        if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false)) {
+          String shardName = sourceReplica.getStr(SHARD_ID_PROP);
+          String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
+          String collectionName = sourceReplica.getStr(COLLECTION_PROP);
+          String key = collectionName + "_" + replicaName;
+          RecoveryWatcher watcher = new RecoveryWatcher(collectionName, shardName, replicaName,
+              addedReplica.getStr(ZkStateReader.CORE_NAME_PROP), replicasToRecover);
+          watchers.put(key, watcher);
+          zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
+        }
       }
     }
 
@@ -208,16 +209,27 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
   }
 
   // we use this watcher to wait for replicas to recover
-  private static class RecoveryWatcher implements CollectionStateWatcher {
+  static class RecoveryWatcher implements CollectionStateWatcher {
     String collectionId;
     String shardId;
     String replicaId;
+    String targetCore;
     CountDownLatch countDownLatch;
+    Replica recovered;
 
-    RecoveryWatcher(String collectionId, String shardId, String replicaId, CountDownLatch countDownLatch) {
+    /**
+     * Watch for recovery of a replica
+     * @param collectionId collection name
+     * @param shardId shard id
+     * @param replicaId source replica name (coreNodeName)
+     * @param targetCore specific target core name - if null then any active replica will do
+     * @param countDownLatch countdown when recovered
+     */
+    RecoveryWatcher(String collectionId, String shardId, String replicaId, String targetCore, CountDownLatch countDownLatch) {
       this.collectionId = collectionId;
       this.shardId = shardId;
       this.replicaId = replicaId;
+      this.targetCore = targetCore;
       this.countDownLatch = countDownLatch;
     }
 
@@ -241,7 +253,12 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
             continue;
           }
           // check its state
+          String coreName = replica.getStr(ZkStateReader.CORE_NAME_PROP);
+          if (targetCore != null && !targetCore.equals(coreName)) {
+            continue;
+          }
           if (replica.isActive(liveNodes)) { // recovered - stop waiting
+            recovered = replica;
             countDownLatch.countDown();
             return true;
           }
@@ -250,5 +267,9 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
       // set the watch again to wait for the new replica to recover
       return false;
     }
+
+    public Replica getRecoveredReplica() {
+      return recovered;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a32ba2c9/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSTest.java b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSTest.java
new file mode 100644
index 0000000..884d49e
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSTest.java
@@ -0,0 +1,53 @@
+package org.apache.solr.cloud;
+
+import com.carrotsearch.randomizedtesting.ThreadFilter;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.solr.cloud.hdfs.HdfsTestUtil;
+import org.apache.solr.common.cloud.ZkConfigManager;
+import org.apache.solr.util.BadHdfsThreadsFilter;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+/**
+ *
+ */
+@ThreadLeakFilters(defaultFilters = true, filters = {
+    BadHdfsThreadsFilter.class, // hdfs currently leaks thread(s)
+    MoveReplicaHDFSTest.ForkJoinThreadsFilter.class
+})
+public class MoveReplicaHDFSTest extends MoveReplicaTest {
+
+  private static MiniDFSCluster dfsCluster;
+
+  @BeforeClass
+  public static void setupClass() throws Exception {
+    dfsCluster = HdfsTestUtil.setupClass(createTempDir().toFile().getAbsolutePath());
+
+    ZkConfigManager configManager = new ZkConfigManager(zkClient());
+    configManager.uploadConfigDir(configset("cloud-hdfs"), "conf1");
+
+    System.setProperty("solr.hdfs.home", HdfsTestUtil.getDataDir(dfsCluster, "data"));
+  }
+
+  @AfterClass
+  public static void teardownClass() throws Exception {
+    cluster.shutdown(); // need to close before the MiniDFSCluster
+    HdfsTestUtil.teardownClass(dfsCluster);
+    dfsCluster = null;
+  }
+
+
+  public static class ForkJoinThreadsFilter implements ThreadFilter {
+
+    @Override
+    public boolean reject(Thread t) {
+      String name = t.getName();
+      if (name.startsWith("ForkJoinPool.commonPool")) {
+        return true;
+      }
+      return false;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a32ba2c9/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
index 4368fea..8f00431 100644
--- a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
@@ -20,6 +20,7 @@ package org.apache.solr.cloud;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
@@ -31,8 +32,10 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.CoreAdminRequest;
 import org.apache.solr.client.solrj.response.CoreAdminResponse;
 import org.apache.solr.client.solrj.response.RequestStatusState;
+import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -40,6 +43,7 @@ import org.slf4j.LoggerFactory;
 
 public class MoveReplicaTest extends SolrCloudTestCase {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   @BeforeClass
   public static void setupCluster() throws Exception {
     configureCluster(4)
@@ -56,10 +60,11 @@ public class MoveReplicaTest extends SolrCloudTestCase {
     cluster.waitForAllNodes(5000);
     String coll = "movereplicatest_coll";
     log.info("total_jettys: " + cluster.getJettySolrRunners().size());
+    int REPLICATION = 2;
 
     CloudSolrClient cloudClient = cluster.getSolrClient();
 
-    CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 2, 2);
+    CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 2, REPLICATION);
     create.setMaxShardsPerNode(2);
     cloudClient.request(create);
 
@@ -94,16 +99,87 @@ public class MoveReplicaTest extends SolrCloudTestCase {
         break;
       }
       assertFalse(rsp.getRequestStatus() == RequestStatusState.FAILED);
-      Thread.sleep(50);
+      Thread.sleep(500);
     }
     assertTrue(success);
     checkNumOfCores(cloudClient, replica.getNodeName(), 0);
-    checkNumOfCores(cloudClient, targetNode, 2);
+    assertTrue("should be at least one core on target node!", getNumOfCores(cloudClient, targetNode) > 0);
+    // wait for recovery
+    boolean recovered = false;
+    for (int i = 0; i < 300; i++) {
+      DocCollection collState = getCollectionState(coll);
+      log.debug("###### " + collState);
+      Collection<Replica> replicas = collState.getSlice(shardId).getReplicas();
+      boolean allActive = true;
+      boolean hasLeaders = true;
+      if (replicas != null && !replicas.isEmpty()) {
+        for (Replica r : replicas) {
+          if (!r.getNodeName().equals(targetNode)) {
+            continue;
+          }
+          if (!r.isActive(Collections.singleton(targetNode))) {
+            log.info("Not active: " + r);
+            allActive = false;
+          }
+        }
+      } else {
+        allActive = false;
+      }
+      for (Slice slice : collState.getSlices()) {
+        if (slice.getLeader() == null) {
+          hasLeaders = false;
+        }
+      }
+      if (allActive && hasLeaders) {
+        // check the number of active replicas
+        assertEquals("total number of replicas", REPLICATION, replicas.size());
+        recovered = true;
+        break;
+      } else {
+        log.info("--- waiting, allActive=" + allActive + ", hasLeaders=" + hasLeaders);
+        Thread.sleep(1000);
+      }
+    }
+    assertTrue("replica never fully recovered", recovered);
 
     moveReplica = new CollectionAdminRequest.MoveReplica(coll, shardId, targetNode, replica.getNodeName());
     moveReplica.process(cloudClient);
     checkNumOfCores(cloudClient, replica.getNodeName(), 1);
-    checkNumOfCores(cloudClient, targetNode, 1);
+    // wait for recovery
+    recovered = false;
+    for (int i = 0; i < 300; i++) {
+      DocCollection collState = getCollectionState(coll);
+      log.debug("###### " + collState);
+      Collection<Replica> replicas = collState.getSlice(shardId).getReplicas();
+      boolean allActive = true;
+      boolean hasLeaders = true;
+      if (replicas != null && !replicas.isEmpty()) {
+        for (Replica r : replicas) {
+          if (!r.getNodeName().equals(replica.getNodeName())) {
+            continue;
+          }
+          if (!r.isActive(Collections.singleton(replica.getNodeName()))) {
+            log.info("Not active yet: " + r);
+            allActive = false;
+          }
+        }
+      } else {
+        allActive = false;
+      }
+      for (Slice slice : collState.getSlices()) {
+        if (slice.getLeader() == null) {
+          hasLeaders = false;
+        }
+      }
+      if (allActive && hasLeaders) {
+        assertEquals("total number of replicas", REPLICATION, replicas.size());
+        recovered = true;
+        break;
+      } else {
+        Thread.sleep(1000);
+      }
+    }
+    assertTrue("replica never fully recovered", recovered);
   }
 
   private Replica getRandomReplica(String coll, CloudSolrClient cloudClient) {