You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2015/11/05 20:25:16 UTC

svn commit: r1712851 - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/handler/admin/ core/src/test/org/apache/solr/cloud/ solrj/src/java/org/apache/solr/client/solrj/request/ solrj/src/java/org/apache/s...

Author: noble
Date: Thu Nov  5 19:25:15 2015
New Revision: 1712851

URL: http://svn.apache.org/viewvc?rev=1712851&view=rev
Log:
SOLR-7569: A collection API called FORCELEADER when all replicas in a shard are down

Added:
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java   (with props)
Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1712851&r1=1712850&r2=1712851&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Thu Nov  5 19:25:15 2015
@@ -199,6 +199,9 @@ New Features
   ExtractingRequestHandler/ExtractingDocumentLoader (Andriy Binetsky
   via Uwe Schindler)
 
+* SOLR-7569: A collection API called FORCELEADER when all replicas in a shard are down
+  (Ishan Chattopadhyaya, Mark Miller, shalin, noble)
+
 Bug Fixes
 ----------------------
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java?rev=1712851&r1=1712850&r2=1712851&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java Thu Nov  5 19:25:15 2015
@@ -66,6 +66,10 @@ public class CloudDescriptor {
     return lastPublished;
   }
 
+  public void setLastPublished(Replica.State state) {
+    lastPublished = state;
+  }
+
   public boolean isLeader() {
     return isLeader;
   }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java?rev=1712851&r1=1712850&r2=1712851&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java Thu Nov  5 19:25:15 2015
@@ -1863,7 +1863,18 @@ public class OverseerCollectionMessageHa
     }
   }
 
-  private void sendShardRequest(String nodeName, ModifiableSolrParams params, ShardHandler shardHandler, String asyncId, Map<String, String> requestMap) {
+  private void sendShardRequest(String nodeName, ModifiableSolrParams params,
+                                ShardHandler shardHandler, String asyncId,
+                                Map<String, String> requestMap) {
+    sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap, adminPath, zkStateReader);
+
+  }
+
+  public static void sendShardRequest(String nodeName, ModifiableSolrParams params,
+                                       ShardHandler shardHandler, String asyncId,
+                                       Map<String, String> requestMap,
+                                       String adminPath, ZkStateReader zkStateReader
+  ) {
     if (asyncId != null) {
       String coreAdminAsyncId = asyncId + Math.abs(System.nanoTime());
       params.set(ASYNC, coreAdminAsyncId);

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1712851&r1=1712850&r2=1712851&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Thu Nov  5 19:25:15 2015
@@ -36,6 +36,7 @@ import org.apache.solr.client.solrj.impl
 import org.apache.solr.client.solrj.request.CoreAdminRequest;
 import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard;
 import org.apache.solr.cloud.DistributedMap;
+import org.apache.solr.cloud.OverseerCollectionMessageHandler;
 import org.apache.solr.cloud.OverseerTaskQueue;
 import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
 import org.apache.solr.cloud.Overseer;
@@ -48,12 +49,17 @@ import org.apache.solr.common.SolrExcept
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkCmdExecutor;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.Replica.State;
 import org.apache.solr.common.params.CollectionParams.CollectionAction;
+import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
+import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
@@ -63,6 +69,8 @@ import org.apache.solr.common.util.Utils
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.handler.BlobHandler;
 import org.apache.solr.handler.RequestHandlerBase;
+import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.handler.component.ShardRequest;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
 import org.apache.zookeeper.CreateMode;
@@ -439,6 +447,14 @@ public class CollectionsHandler extends
             SHARD_ID_PROP);
       }
     },
+    FORCELEADER_OP(FORCELEADER) {
+      @Override
+      Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler) throws Exception {
+        forceLeaderElection(req, handler);
+
+        return null;
+      }
+    },
     CREATESHARD_OP(CREATESHARD) {
       @Override
       Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler) throws Exception {
@@ -737,6 +753,81 @@ public class CollectionsHandler extends
     }
   }
 
+  private static void forceLeaderElection(SolrQueryRequest req, CollectionsHandler handler) {
+    ClusterState clusterState = handler.coreContainer.getZkController().getClusterState();
+    String collection = req.getParams().required().get(COLLECTION_PROP);
+    String sliceId = req.getParams().required().get(SHARD_ID_PROP);
+
+    log.info("Force leader invoked, state: {}", clusterState);
+    Slice slice = clusterState.getSlice(collection, sliceId);
+    if (slice == null) {
+      if (clusterState.hasCollection(collection)) {
+        throw new SolrException(ErrorCode.BAD_REQUEST,
+            "No shard with name " + sliceId + " exists for collection " + collection);
+      } else {
+        throw new SolrException(ErrorCode.BAD_REQUEST, "No collection with the specified name exists: " + collection);
+      }
+    }
+
+    try {
+      // if an active replica is the leader, then all is fine already
+      Replica leader = slice.getLeader();
+      if (leader != null && leader.getState() == State.ACTIVE) {
+        throw new SolrException(ErrorCode.SERVER_ERROR,
+            "The shard already has an active leader. Force leader is not applicable. State: " + slice);
+      }
+
+      // Clear out any LIR state
+      String lirPath = handler.coreContainer.getZkController().getLeaderInitiatedRecoveryZnodePath(collection, sliceId);
+      if (handler.coreContainer.getZkController().getZkClient().exists(lirPath, true)) {
+        StringBuilder sb = new StringBuilder();
+        handler.coreContainer.getZkController().getZkClient().printLayout(lirPath, 4, sb);
+        log.info("Cleaning out LIR data, which was: {}", sb);
+        handler.coreContainer.getZkController().getZkClient().clean(lirPath);
+      }
+
+      // Call all live replicas to prepare themselves for leadership, e.g. set last published
+      // state to active.
+      for (Replica rep : slice.getReplicas()) {
+        if (clusterState.getLiveNodes().contains(rep.getNodeName())) {
+          ShardHandler shardHandler = handler.coreContainer.getShardHandlerFactory().getShardHandler();
+
+          ModifiableSolrParams params = new ModifiableSolrParams();
+          params.set(CoreAdminParams.ACTION, CoreAdminAction.FORCEPREPAREFORLEADERSHIP.toString());
+          params.set(CoreAdminParams.CORE, rep.getStr("core"));
+          String nodeName = rep.getNodeName();
+
+          OverseerCollectionMessageHandler.sendShardRequest(nodeName, params, shardHandler, null, null,
+              CommonParams.CORES_HANDLER_PATH, handler.coreContainer.getZkController().getZkStateReader()); // synchronous request
+        }
+      }
+
+      // Wait till we have an active leader
+      boolean success = false;
+      for (int i = 0; i < 9; i++) {
+        Thread.sleep(5000);
+        clusterState = handler.coreContainer.getZkController().getClusterState();
+        slice = clusterState.getSlice(collection, sliceId);
+        if (slice.getLeader() != null && slice.getLeader().getState() == State.ACTIVE) {
+          success = true;
+          break;
+        }
+        log.warn("Force leader attempt {}. Waiting 5 secs for an active leader. State of the slice: {}", (i + 1), slice);
+      }
+
+      if (success) {
+        log.info("Successfully issued FORCELEADER command for collection: {}, shard: {}", collection, sliceId);
+      } else {
+        log.info("Couldn't successfully force leader, collection: {}, shard: {}. Cluster state: {}", collection, sliceId, clusterState);
+      }
+    } catch (SolrException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new SolrException(ErrorCode.SERVER_ERROR,
+          "Error executing FORCELEADER operation for collection: " + collection + " shard: " + sliceId, e);
+    }
+  }
+
   public static void verifyRuleParams(CoreContainer cc, Map<String, Object> m) {
     List l = (List) m.get(RULE);
     if (l != null) {

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1712851&r1=1712850&r2=1712851&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Thu Nov  5 19:25:15 2015
@@ -19,6 +19,7 @@ package org.apache.solr.handler.admin;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -27,6 +28,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
@@ -314,6 +316,10 @@ public class CoreAdminHandler extends Re
         case INVOKE:
           handleInvoke(req, rsp);
           break;
+        case FORCEPREPAREFORLEADERSHIP: {
+          this.handleForcePrepareForLeadership(req, rsp);
+          break;
+        }
       }
     }
     rsp.setHttpCaching(false);
@@ -895,6 +901,32 @@ public class CoreAdminHandler extends Re
 
   }
   
+  protected void handleForcePrepareForLeadership(SolrQueryRequest req,
+      SolrQueryResponse rsp) throws IOException {
+    final SolrParams params = req.getParams();
+
+    log.info("I have been forcefully prepare myself for leadership.");
+    ZkController zkController = coreContainer.getZkController();
+    if (zkController == null) {
+      throw new SolrException(ErrorCode.BAD_REQUEST, "Only valid for SolrCloud");
+    }
+    
+    String cname = params.get(CoreAdminParams.CORE);
+    if (cname == null) {
+      throw new IllegalArgumentException(CoreAdminParams.CORE + " is required");
+    }
+    try (SolrCore core = coreContainer.getCore(cname)) {
+
+      // Setting the last published state for this core to be ACTIVE
+      if (core != null) {
+        core.getCoreDescriptor().getCloudDescriptor().setLastPublished(Replica.State.ACTIVE);
+        log.info("Setting the last published state for this core, {}, to {}", core.getName(), Replica.State.ACTIVE);
+      } else {
+        SolrException.log(log, "Could not find core: " + cname);
+      }
+    }
+  }
+
   protected void handleWaitForStateAction(SolrQueryRequest req,
       SolrQueryResponse rsp) throws IOException, InterruptedException, KeeperException {
     final SolrParams params = req.getParams();
@@ -1165,6 +1197,9 @@ public class CoreAdminHandler extends Re
           info.add("schema", core.getSchemaResource());
           info.add("startTime", core.getStartTimeStamp());
           info.add("uptime", core.getUptimeMs());
+          if (coreContainer.isZooKeeperAware()) {
+            info.add("lastPublished", core.getCoreDescriptor().getCloudDescriptor().getLastPublished().toString().toLowerCase(Locale.ROOT));
+          }
           if (isIndexInfoNeeded) {
             RefCounted<SolrIndexSearcher> searcher = core.getSearcher();
             try {

Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java?rev=1712851&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java Thu Nov  5 19:25:15 2015
@@ -0,0 +1,450 @@
+package org.apache.solr.cloud;
+
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrRequest.METHOD;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest.ForceLeader;
+import org.apache.solr.client.solrj.request.GenericSolrRequest;
+import org.apache.solr.client.solrj.response.SimpleSolrResponse;
+import org.apache.solr.cloud.overseer.OverseerAction;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Replica.State;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.servlet.SolrDispatchFilter;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class ForceLeaderTest extends HttpPartitionTest {
+  protected static final transient Logger log =
+      LoggerFactory.getLogger(ForceLeaderTest.class);
+
+  @Test
+  @Override
+  @Ignore
+  public void test() throws Exception {
+
+  }
+
+  /***
+   * Tests that FORCELEADER can get an active leader after leader puts all replicas in LIR and itself goes down,
+   * hence resulting in a leaderless shard.
+   */
+  @Test
+  @Slow
+  public void testReplicasInLIRNoLeader() throws Exception {
+    handle.put("maxScore", SKIPVAL);
+    handle.put("timestamp", SKIPVAL);
+
+    String testCollectionName = "forceleader_test_collection";
+    createCollection(testCollectionName, 1, 3, 1);
+    cloudClient.setDefaultCollection(testCollectionName);
+
+    try {
+      List<Replica> notLeaders = ensureAllReplicasAreActive(testCollectionName, SHARD1, 1, 3, maxWaitSecsToSeeAllActive);
+      assertEquals("Expected 2 replicas for collection " + testCollectionName
+          + " but found " + notLeaders.size() + "; clusterState: "
+          + printClusterStateInfo(testCollectionName), 2, notLeaders.size());
+
+      Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, SHARD1);
+      JettySolrRunner notLeader0 = getJettyOnPort(getReplicaPort(notLeaders.get(0)));
+      ZkController zkController = ((SolrDispatchFilter) notLeader0.getDispatchFilter().getFilter()).getCores().getZkController();
+
+      putNonLeadersIntoLIR(testCollectionName, SHARD1, zkController, leader, notLeaders);
+
+      cloudClient.getZkStateReader().updateClusterState();
+      ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
+      int numActiveReplicas = getNumberOfActiveReplicas(clusterState, testCollectionName, SHARD1);
+      assertEquals("Expected only 0 active replica but found " + numActiveReplicas +
+          "; clusterState: " + printClusterStateInfo(), 0, numActiveReplicas);
+
+      int numReplicasOnLiveNodes = 0;
+      for (Replica rep : clusterState.getSlice(testCollectionName, SHARD1).getReplicas()) {
+        if (clusterState.getLiveNodes().contains(rep.getNodeName())) {
+          numReplicasOnLiveNodes++;
+        }
+      }
+      assertEquals(2, numReplicasOnLiveNodes);
+      log.info("Before forcing leader: " + printClusterStateInfo());
+      // Assert there is no leader yet
+      assertNull("Expected no leader right now. State: " + clusterState.getSlice(testCollectionName, SHARD1),
+          clusterState.getSlice(testCollectionName, SHARD1).getLeader());
+
+      assertSendDocFails(3);
+
+      doForceLeader(cloudClient, testCollectionName, SHARD1);
+
+      // By now we have an active leader. Wait for recoveries to begin
+      waitForRecoveriesToFinish(testCollectionName, cloudClient.getZkStateReader(), true);
+
+      cloudClient.getZkStateReader().updateClusterState();
+      clusterState = cloudClient.getZkStateReader().getClusterState();
+      log.info("After forcing leader: " + clusterState.getSlice(testCollectionName, SHARD1));
+      // we have a leader
+      Replica newLeader = clusterState.getSlice(testCollectionName, SHARD1).getLeader();
+      assertNotNull(newLeader);
+      // leader is active
+      assertEquals(State.ACTIVE, newLeader.getState());
+
+      numActiveReplicas = getNumberOfActiveReplicas(clusterState, testCollectionName, SHARD1);
+      assertEquals(2, numActiveReplicas);
+
+      // Assert that indexing works again
+      log.info("Sending doc 4...");
+      sendDoc(4);
+      log.info("Committing...");
+      cloudClient.commit();
+      log.info("Doc 4 sent and commit issued");
+
+      assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 1);
+      assertDocsExistInAllReplicas(notLeaders, testCollectionName, 4, 4);
+
+      // Docs 1 and 4 should be here. 2 was lost during the partition, 3 had failed to be indexed.
+      log.info("Checking doc counts...");
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.add("q", "*:*");
+      assertEquals("Expected only 2 documents in the index", 2, cloudClient.query(params).getResults().getNumFound());
+
+      bringBackOldLeaderAndSendDoc(testCollectionName, leader, notLeaders, 5);
+    } finally {
+      log.info("Cleaning up after the test.");
+      // try to clean up
+      try {
+        CollectionAdminRequest.Delete req = new CollectionAdminRequest.Delete();
+        req.setCollectionName(testCollectionName);
+        req.process(cloudClient);
+      } catch (Exception e) {
+        // don't fail the test
+        log.warn("Could not delete collection {} after test completed", testCollectionName);
+      }
+    }
+  }
+
+  /**
+   * Test that FORCELEADER can set last published state of all down (live) replicas to active (so
+   * that they become worthy candidates for leader election).
+   */
+  @Slow
+  public void testLastPublishedStateIsActive() throws Exception {
+    handle.put("maxScore", SKIPVAL);
+    handle.put("timestamp", SKIPVAL);
+
+    String testCollectionName = "forceleader_last_published";
+    createCollection(testCollectionName, 1, 3, 1);
+    cloudClient.setDefaultCollection(testCollectionName);
+    log.info("Collection created: " + testCollectionName);
+
+    try {
+      List<Replica> notLeaders = ensureAllReplicasAreActive(testCollectionName, SHARD1, 1, 3, maxWaitSecsToSeeAllActive);
+      assertEquals("Expected 2 replicas for collection " + testCollectionName
+          + " but found " + notLeaders.size() + "; clusterState: "
+          + printClusterStateInfo(testCollectionName), 2, notLeaders.size());
+
+      Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, SHARD1);
+      JettySolrRunner notLeader0 = getJettyOnPort(getReplicaPort(notLeaders.get(0)));
+      ZkController zkController = ((SolrDispatchFilter) notLeader0.getDispatchFilter().getFilter()).getCores().getZkController();
+
+      // Mark all replicas down
+      setReplicaState(testCollectionName, SHARD1, leader, State.DOWN);
+      for (Replica rep : notLeaders) {
+        setReplicaState(testCollectionName, SHARD1, rep, State.DOWN);
+      }
+
+      zkController.getZkStateReader().updateClusterState();
+      // Assert all replicas are down and that there is no leader
+      assertEquals(0, getActiveOrRecoveringReplicas(testCollectionName, SHARD1).size());
+
+      // Now force leader
+      doForceLeader(cloudClient, testCollectionName, SHARD1);
+
+      // Assert that last published states of the two replicas are active now
+      for (Replica rep: notLeaders) {
+        assertEquals(Replica.State.ACTIVE, getLastPublishedState(testCollectionName, SHARD1, rep));
+      }
+    } finally {
+      log.info("Cleaning up after the test.");
+      // try to clean up
+      try {
+        CollectionAdminRequest.Delete req = new CollectionAdminRequest.Delete();
+        req.setCollectionName(testCollectionName);
+        req.process(cloudClient);
+      } catch (Exception e) {
+        // don't fail the test
+        log.warn("Could not delete collection {} after test completed", testCollectionName);
+      }
+    }
+  }
+
+  protected void unsetLeader(String collection, String slice) throws Exception {
+    DistributedQueue inQueue = Overseer.getInQueue(cloudClient.getZkStateReader().getZkClient());
+    ZkStateReader zkStateReader = cloudClient.getZkStateReader();
+
+    ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
+        ZkStateReader.SHARD_ID_PROP, slice,
+        ZkStateReader.COLLECTION_PROP, collection);
+    inQueue.offer(Utils.toJSON(m));
+
+    ClusterState clusterState = null;
+    boolean transition = false;
+    for (int counter = 10; counter > 0; counter--) {
+      zkStateReader.updateClusterState();
+      clusterState = zkStateReader.getClusterState();
+      Replica newLeader = clusterState.getSlice(collection, slice).getLeader();
+      if (newLeader == null) {
+        transition = true;
+        break;
+      }
+      Thread.sleep(1000);
+    }
+
+    if (!transition) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not unset replica leader" +
+          ". Cluster state: " + printClusterStateInfo(collection));
+    }
+  }
+
+  protected void setReplicaState(String collection, String slice, Replica replica, Replica.State state) throws SolrServerException, IOException,
+      KeeperException, InterruptedException {
+    DistributedQueue inQueue = Overseer.getInQueue(cloudClient.getZkStateReader().getZkClient());
+    ZkStateReader zkStateReader = cloudClient.getZkStateReader();
+
+    String baseUrl = zkStateReader.getBaseUrlForNodeName(replica.getNodeName());
+    ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
+        ZkStateReader.BASE_URL_PROP, baseUrl,
+        ZkStateReader.NODE_NAME_PROP, replica.getNodeName(),
+        ZkStateReader.SHARD_ID_PROP, slice,
+        ZkStateReader.COLLECTION_PROP, collection,
+        ZkStateReader.CORE_NAME_PROP, replica.getStr(CORE_NAME_PROP),
+        ZkStateReader.CORE_NODE_NAME_PROP, replica.getName(),
+        ZkStateReader.STATE_PROP, state.toString());
+    inQueue.offer(Utils.toJSON(m));
+    boolean transition = false;
+
+    Replica.State replicaState = null;
+    for (int counter = 10; counter > 0; counter--) {
+      zkStateReader.updateClusterState();
+      ClusterState clusterState = zkStateReader.getClusterState();
+      replicaState = clusterState.getSlice(collection, slice).getReplica(replica.getName()).getState();
+      if (replicaState == state) {
+        transition = true;
+        break;
+      }
+      Thread.sleep(1000);
+    }
+
+    if (!transition) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not set replica [" + replica.getName() + "] as " + state +
+          ". Last known state of the replica: " + replicaState);
+    }
+  }
+  
+  /*protected void setLastPublishedState(String collection, String slice, Replica replica, Replica.State state) throws SolrServerException, IOException,
+  KeeperException, InterruptedException {
+    ZkStateReader zkStateReader = cloudClient.getZkStateReader();
+    String baseUrl = zkStateReader.getBaseUrlForNodeName(replica.getNodeName());
+
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(CoreAdminParams.ACTION, CoreAdminAction.FORCEPREPAREFORLEADERSHIP.toString());
+    params.set(CoreAdminParams.CORE, replica.getStr("core"));
+    params.set(ZkStateReader.STATE_PROP, state.toString());
+
+    SolrRequest<SimpleSolrResponse> req = new GenericSolrRequest(METHOD.GET, "/admin/cores", params);
+    NamedList resp = null;
+    try (HttpSolrClient hsc = new HttpSolrClient(baseUrl)) {
+       resp = hsc.request(req);
+    }
+  }*/
+
+  protected Replica.State getLastPublishedState(String collection, String slice, Replica replica) throws SolrServerException, IOException,
+  KeeperException, InterruptedException {
+    ZkStateReader zkStateReader = cloudClient.getZkStateReader();
+    String baseUrl = zkStateReader.getBaseUrlForNodeName(replica.getNodeName());
+
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(CoreAdminParams.ACTION, CoreAdminAction.STATUS.toString());
+    params.set(CoreAdminParams.CORE, replica.getStr("core"));
+
+    SolrRequest<SimpleSolrResponse> req = new GenericSolrRequest(METHOD.GET, "/admin/cores", params);
+    NamedList resp = null;
+    try (HttpSolrClient hsc = new HttpSolrClient(baseUrl)) {
+       resp = hsc.request(req);
+    }
+
+    String lastPublished = (((NamedList<NamedList<String>>)resp.get("status")).get(replica.getStr("core"))).get("lastPublished");
+    return Replica.State.getState(lastPublished);
+  }
+
+  void assertSendDocFails(int docId) throws Exception {
+    // sending a doc in this state fails
+    try {
+      sendDoc(docId);
+      log.error("Should've failed indexing during a down state. Cluster state: " + printClusterStateInfo());
+      fail("Should've failed indexing during a down state.");
+    } catch (SolrException ex) {
+      log.info("Document couldn't be sent, which is expected.");
+    }
+  }
+
+  void putNonLeadersIntoLIR(String collectionName, String shard, ZkController zkController, Replica leader, List<Replica> notLeaders) throws Exception {
+    SocketProxy[] nonLeaderProxies = new SocketProxy[notLeaders.size()];
+    for (int i = 0; i < notLeaders.size(); i++)
+      nonLeaderProxies[i] = getProxyForReplica(notLeaders.get(i));
+
+    sendDoc(1);
+
+    // ok, now introduce a network partition between the leader and both replicas
+    log.info("Closing proxies for the non-leader replicas...");
+    for (SocketProxy proxy : nonLeaderProxies)
+      proxy.close();
+
+    // indexing during a partition
+    log.info("Sending a doc during the network partition...");
+    sendDoc(2);
+
+    // Wait a little
+    Thread.sleep(2000);
+
+    // Kill the leader
+    log.info("Killing leader for shard1 of " + collectionName + " on node " + leader.getNodeName() + "");
+    JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
+    getProxyForReplica(leader).close();
+    leaderJetty.stop();
+
+    // Wait for a steady state, till LIR flags have been set and the shard is leaderless
+    log.info("Sleep and periodically wake up to check for state...");
+    for (int i = 0; i < 20; i++) {
+      Thread.sleep(1000);
+      State lirStates[] = new State[notLeaders.size()];
+      for (int j = 0; j < notLeaders.size(); j++)
+        lirStates[j] = zkController.getLeaderInitiatedRecoveryState(collectionName, shard, notLeaders.get(j).getName());
+
+      zkController.getZkStateReader().updateClusterState();
+      ClusterState clusterState = zkController.getZkStateReader().getClusterState();
+      boolean allDown = true;
+      for (State lirState : lirStates)
+        if (Replica.State.DOWN.equals(lirState) == false)
+          allDown = false;
+      if (allDown && clusterState.getSlice(collectionName, shard).getLeader() == null) {
+        break;
+      }
+      log.warn("Attempt " + i + ", waiting on for 1 sec to settle down in the steady state. State: " +
+          printClusterStateInfo(collectionName));
+      log.warn("LIR state: " + getLIRState(zkController, collectionName, shard));
+    }
+    log.info("Waking up...");
+
+    // remove the network partition
+    log.info("Reopening the proxies for the non-leader replicas...");
+    for (SocketProxy proxy : nonLeaderProxies)
+      proxy.reopen();
+
+    log.info("LIR state: " + getLIRState(zkController, collectionName, shard));
+
+    State lirStates[] = new State[notLeaders.size()];
+    for (int j = 0; j < notLeaders.size(); j++)
+      lirStates[j] = zkController.getLeaderInitiatedRecoveryState(collectionName, shard, notLeaders.get(j).getName());
+    for (State lirState : lirStates)
+      assertEquals("Expected that the LIR state would've been down by now",
+          Replica.State.DOWN, (lirState));
+  }
+
+  protected void bringBackOldLeaderAndSendDoc(String collection, Replica leader, List<Replica> notLeaders, int docid) throws Exception {
+    // Bring back the leader which was stopped
+    log.info("Bringing back originally killed leader...");
+    JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
+    leaderJetty.start();
+    waitForRecoveriesToFinish(collection, cloudClient.getZkStateReader(), true);
+    cloudClient.getZkStateReader().updateClusterState();
+    ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
+    log.info("After bringing back leader: " + clusterState.getSlice(collection, SHARD1));
+    int numActiveReplicas = getNumberOfActiveReplicas(clusterState, collection, SHARD1);
+    assertEquals(1+notLeaders.size(), numActiveReplicas);
+    log.info("Sending doc "+docid+"...");
+    sendDoc(docid);
+    log.info("Committing...");
+    cloudClient.commit();
+    log.info("Doc "+docid+" sent and commit issued");
+    assertDocsExistInAllReplicas(notLeaders, collection, docid, docid);
+    assertDocsExistInAllReplicas(Collections.singletonList(leader), collection, docid, docid);
+  }
+
+  protected String getLIRState(ZkController zkController, String collection, String shard) throws KeeperException, InterruptedException {
+    StringBuilder sb = new StringBuilder();
+    String path = zkController.getLeaderInitiatedRecoveryZnodePath(collection, shard);
+    if (path == null)
+      return null;
+    try {
+      zkController.getZkClient().printLayout(path, 4, sb);
+    } catch (NoNodeException ex) {
+      return null;
+    }
+    return sb.toString();
+  }
+
+  @Override
+  protected int sendDoc(int docId) throws Exception {
+    SolrInputDocument doc = new SolrInputDocument();
+    doc.addField(id, String.valueOf(docId));
+    doc.addField("a_t", "hello" + docId);
+
+    return sendDocsWithRetry(Collections.singletonList(doc), 1, 5, 1);
+  }
+
+  private void doForceLeader(SolrClient client, String collectionName, String shard) throws IOException, SolrServerException {
+      CollectionAdminRequest.ForceLeader forceLeader = new CollectionAdminRequest.ForceLeader();
+      forceLeader.setCollectionName(collectionName);
+      forceLeader.setShardName(shard);
+      client.request(forceLeader);
+  }
+
+  protected int getNumberOfActiveReplicas(ClusterState clusterState, String collection, String sliceId) {
+    int numActiveReplicas = 0;
+    // Assert all replicas are active
+    for (Replica rep : clusterState.getSlice(collection, sliceId).getReplicas()) {
+      if (rep.getState().equals(State.ACTIVE)) {
+        numActiveReplicas++;
+      }
+    }
+    return numActiveReplicas;
+  }
+}
+

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java?rev=1712851&r1=1712850&r2=1712851&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java Thu Nov  5 19:25:15 2015
@@ -434,6 +434,29 @@ public abstract class CollectionAdminReq
       return this;
     }
   }
+  
+  // FORCELEADER request
+  public static class ForceLeader extends CollectionShardAdminRequest<ForceLeader> {
+    protected String asyncId;
+
+    public ForceLeader() {
+      action = CollectionAction.FORCELEADER;
+    }
+
+    @Override
+    protected ForceLeader getThis() {
+      return this;
+    }
+
+    @Override
+    public SolrParams getParams() {
+      ModifiableSolrParams params = getCommonParams();
+      if (asyncId != null) {
+        params.set(CommonAdminParams.ASYNC, asyncId);
+      }
+      return params;
+    }
+  }
 
   // REQUESTSTATUS request
   public static class RequestStatus extends CollectionAdminRequest<RequestStatus> {
@@ -447,7 +470,7 @@ public abstract class CollectionAdminReq
       this.requestId = requestId;
       return this;
     }
-    
+
     public String getRequestId() {
       return this.requestId;
     }
@@ -464,7 +487,7 @@ public abstract class CollectionAdminReq
       return this;
     }
   }
-  
+
   // CREATEALIAS request
   public static class CreateAlias extends CollectionAdminRequest<CreateAlias> {
     protected String aliasName;

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java?rev=1712851&r1=1712850&r2=1712851&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java Thu Nov  5 19:25:15 2015
@@ -350,6 +350,34 @@ public class CoreAdminRequest extends So
     }
   }
   
+  public static class OverrideLastPublished extends CoreAdminRequest {
+    protected String state;
+
+    public OverrideLastPublished() {
+      action = CoreAdminAction.FORCEPREPAREFORLEADERSHIP;
+    }
+
+    @Override
+    public SolrParams getParams() {
+      if( action == null ) {
+        throw new RuntimeException( "no action specified!" );
+      }
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.set(CoreAdminParams.ACTION, action.toString());
+      params.set(CoreAdminParams.CORE, core);
+      params.set(ZkStateReader.STATE_PROP, state);
+      return params;
+    }
+
+    public String getState() {
+      return state;
+    }
+
+    public void setState(String state) {
+      this.state = state;
+    }
+  }
+
   public static class MergeIndexes extends CoreAdminRequest {
     protected List<String> indexDirs;
     protected List<String> srcCores;

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java?rev=1712851&r1=1712850&r2=1712851&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java Thu Nov  5 19:25:15 2015
@@ -19,8 +19,6 @@ package org.apache.solr.common.params;
 
 import java.util.Locale;
 
-import org.apache.solr.common.SolrException;
-
 public interface CollectionParams 
 {
   /** What action **/
@@ -40,6 +38,7 @@ public interface CollectionParams
     DELETESHARD(true),
     CREATESHARD(true),
     DELETEREPLICA(true),
+    FORCELEADER(true),
     MIGRATE(true),
     ADDROLE(true),
     REMOVEROLE(true),

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java?rev=1712851&r1=1712850&r2=1712851&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java Thu Nov  5 19:25:15 2015
@@ -142,6 +142,8 @@ public abstract class CoreAdminParams
     OVERSEEROP,
     REQUESTSTATUS,
     REJOINLEADERELECTION,
+    //internal API used by force shard leader election
+    FORCEPREPAREFORLEADERSHIP,
     INVOKE;
 
     public static CoreAdminAction get( String p )