You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2015/11/05 20:25:16 UTC
svn commit: r1712851 - in /lucene/dev/trunk/solr: ./
core/src/java/org/apache/solr/cloud/
core/src/java/org/apache/solr/handler/admin/
core/src/test/org/apache/solr/cloud/
solrj/src/java/org/apache/solr/client/solrj/request/
solrj/src/java/org/apache/s...
Author: noble
Date: Thu Nov 5 19:25:15 2015
New Revision: 1712851
URL: http://svn.apache.org/viewvc?rev=1712851&view=rev
Log:
SOLR-7569: A collection API called FORCELEADER when all replicas in a shard are down
Added:
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java (with props)
Modified:
lucene/dev/trunk/solr/CHANGES.txt
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1712851&r1=1712850&r2=1712851&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Thu Nov 5 19:25:15 2015
@@ -199,6 +199,9 @@ New Features
ExtractingRequestHandler/ExtractingDocumentLoader (Andriy Binetsky
via Uwe Schindler)
+* SOLR-7569: A collection API called FORCELEADER when all replicas in a shard are down
+ (Ishan Chattopadhyaya, Mark Miller, shalin, noble)
+
Bug Fixes
----------------------
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java?rev=1712851&r1=1712850&r2=1712851&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java Thu Nov 5 19:25:15 2015
@@ -66,6 +66,10 @@ public class CloudDescriptor {
return lastPublished;
}
+ public void setLastPublished(Replica.State state) {
+ lastPublished = state;
+ }
+
public boolean isLeader() {
return isLeader;
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java?rev=1712851&r1=1712850&r2=1712851&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java Thu Nov 5 19:25:15 2015
@@ -1863,7 +1863,18 @@ public class OverseerCollectionMessageHa
}
}
- private void sendShardRequest(String nodeName, ModifiableSolrParams params, ShardHandler shardHandler, String asyncId, Map<String, String> requestMap) {
+ private void sendShardRequest(String nodeName, ModifiableSolrParams params,
+ ShardHandler shardHandler, String asyncId,
+ Map<String, String> requestMap) {
+ sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap, adminPath, zkStateReader);
+
+ }
+
+ public static void sendShardRequest(String nodeName, ModifiableSolrParams params,
+ ShardHandler shardHandler, String asyncId,
+ Map<String, String> requestMap,
+ String adminPath, ZkStateReader zkStateReader
+ ) {
if (asyncId != null) {
String coreAdminAsyncId = asyncId + Math.abs(System.nanoTime());
params.set(ASYNC, coreAdminAsyncId);
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1712851&r1=1712850&r2=1712851&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Thu Nov 5 19:25:15 2015
@@ -36,6 +36,7 @@ import org.apache.solr.client.solrj.impl
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard;
import org.apache.solr.cloud.DistributedMap;
+import org.apache.solr.cloud.OverseerCollectionMessageHandler;
import org.apache.solr.cloud.OverseerTaskQueue;
import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
import org.apache.solr.cloud.Overseer;
@@ -48,12 +49,17 @@ import org.apache.solr.common.SolrExcept
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.Replica.State;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
+import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
+import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
@@ -63,6 +69,8 @@ import org.apache.solr.common.util.Utils
import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.BlobHandler;
import org.apache.solr.handler.RequestHandlerBase;
+import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.zookeeper.CreateMode;
@@ -439,6 +447,14 @@ public class CollectionsHandler extends
SHARD_ID_PROP);
}
},
+ FORCELEADER_OP(FORCELEADER) {
+ @Override
+ Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler) throws Exception {
+ forceLeaderElection(req, handler);
+
+ return null;
+ }
+ },
CREATESHARD_OP(CREATESHARD) {
@Override
Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler) throws Exception {
@@ -737,6 +753,81 @@ public class CollectionsHandler extends
}
}
+ private static void forceLeaderElection(SolrQueryRequest req, CollectionsHandler handler) {
+ ClusterState clusterState = handler.coreContainer.getZkController().getClusterState();
+ String collection = req.getParams().required().get(COLLECTION_PROP);
+ String sliceId = req.getParams().required().get(SHARD_ID_PROP);
+
+ log.info("Force leader invoked, state: {}", clusterState);
+ Slice slice = clusterState.getSlice(collection, sliceId);
+ if (slice == null) {
+ if (clusterState.hasCollection(collection)) {
+ throw new SolrException(ErrorCode.BAD_REQUEST,
+ "No shard with name " + sliceId + " exists for collection " + collection);
+ } else {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "No collection with the specified name exists: " + collection);
+ }
+ }
+
+ try {
+ // if an active replica is the leader, then all is fine already
+ Replica leader = slice.getLeader();
+ if (leader != null && leader.getState() == State.ACTIVE) {
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "The shard already has an active leader. Force leader is not applicable. State: " + slice);
+ }
+
+ // Clear out any LIR state
+ String lirPath = handler.coreContainer.getZkController().getLeaderInitiatedRecoveryZnodePath(collection, sliceId);
+ if (handler.coreContainer.getZkController().getZkClient().exists(lirPath, true)) {
+ StringBuilder sb = new StringBuilder();
+ handler.coreContainer.getZkController().getZkClient().printLayout(lirPath, 4, sb);
+ log.info("Cleaning out LIR data, which was: {}", sb);
+ handler.coreContainer.getZkController().getZkClient().clean(lirPath);
+ }
+
+ // Call all live replicas to prepare themselves for leadership, e.g. set last published
+ // state to active.
+ for (Replica rep : slice.getReplicas()) {
+ if (clusterState.getLiveNodes().contains(rep.getNodeName())) {
+ ShardHandler shardHandler = handler.coreContainer.getShardHandlerFactory().getShardHandler();
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.FORCEPREPAREFORLEADERSHIP.toString());
+ params.set(CoreAdminParams.CORE, rep.getStr("core"));
+ String nodeName = rep.getNodeName();
+
+ OverseerCollectionMessageHandler.sendShardRequest(nodeName, params, shardHandler, null, null,
+ CommonParams.CORES_HANDLER_PATH, handler.coreContainer.getZkController().getZkStateReader()); // synchronous request
+ }
+ }
+
+ // Wait till we have an active leader
+ boolean success = false;
+ for (int i = 0; i < 9; i++) {
+ Thread.sleep(5000);
+ clusterState = handler.coreContainer.getZkController().getClusterState();
+ slice = clusterState.getSlice(collection, sliceId);
+ if (slice.getLeader() != null && slice.getLeader().getState() == State.ACTIVE) {
+ success = true;
+ break;
+ }
+ log.warn("Force leader attempt {}. Waiting 5 secs for an active leader. State of the slice: {}", (i + 1), slice);
+ }
+
+ if (success) {
+ log.info("Successfully issued FORCELEADER command for collection: {}, shard: {}", collection, sliceId);
+ } else {
+ log.info("Couldn't successfully force leader, collection: {}, shard: {}. Cluster state: {}", collection, sliceId, clusterState);
+ }
+ } catch (SolrException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "Error executing FORCELEADER operation for collection: " + collection + " shard: " + sliceId, e);
+ }
+ }
+
public static void verifyRuleParams(CoreContainer cc, Map<String, Object> m) {
List l = (List) m.get(RULE);
if (l != null) {
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1712851&r1=1712850&r2=1712851&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Thu Nov 5 19:25:15 2015
@@ -19,6 +19,7 @@ package org.apache.solr.handler.admin;
import java.io.File;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -27,6 +28,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
@@ -314,6 +316,10 @@ public class CoreAdminHandler extends Re
case INVOKE:
handleInvoke(req, rsp);
break;
+ case FORCEPREPAREFORLEADERSHIP: {
+ this.handleForcePrepareForLeadership(req, rsp);
+ break;
+ }
}
}
rsp.setHttpCaching(false);
@@ -895,6 +901,32 @@ public class CoreAdminHandler extends Re
}
+ protected void handleForcePrepareForLeadership(SolrQueryRequest req,
+ SolrQueryResponse rsp) throws IOException {
+ final SolrParams params = req.getParams();
+
+ log.info("I have been forcefully prepare myself for leadership.");
+ ZkController zkController = coreContainer.getZkController();
+ if (zkController == null) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Only valid for SolrCloud");
+ }
+
+ String cname = params.get(CoreAdminParams.CORE);
+ if (cname == null) {
+ throw new IllegalArgumentException(CoreAdminParams.CORE + " is required");
+ }
+ try (SolrCore core = coreContainer.getCore(cname)) {
+
+ // Setting the last published state for this core to be ACTIVE
+ if (core != null) {
+ core.getCoreDescriptor().getCloudDescriptor().setLastPublished(Replica.State.ACTIVE);
+ log.info("Setting the last published state for this core, {}, to {}", core.getName(), Replica.State.ACTIVE);
+ } else {
+ SolrException.log(log, "Could not find core: " + cname);
+ }
+ }
+ }
+
protected void handleWaitForStateAction(SolrQueryRequest req,
SolrQueryResponse rsp) throws IOException, InterruptedException, KeeperException {
final SolrParams params = req.getParams();
@@ -1165,6 +1197,9 @@ public class CoreAdminHandler extends Re
info.add("schema", core.getSchemaResource());
info.add("startTime", core.getStartTimeStamp());
info.add("uptime", core.getUptimeMs());
+ if (coreContainer.isZooKeeperAware()) {
+ info.add("lastPublished", core.getCoreDescriptor().getCloudDescriptor().getLastPublished().toString().toLowerCase(Locale.ROOT));
+ }
if (isIndexInfoNeeded) {
RefCounted<SolrIndexSearcher> searcher = core.getSearcher();
try {
Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java?rev=1712851&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java Thu Nov 5 19:25:15 2015
@@ -0,0 +1,450 @@
+package org.apache.solr.cloud;
+
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrRequest.METHOD;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest.ForceLeader;
+import org.apache.solr.client.solrj.request.GenericSolrRequest;
+import org.apache.solr.client.solrj.response.SimpleSolrResponse;
+import org.apache.solr.cloud.overseer.OverseerAction;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Replica.State;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.servlet.SolrDispatchFilter;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class ForceLeaderTest extends HttpPartitionTest {
+ protected static final transient Logger log =
+ LoggerFactory.getLogger(ForceLeaderTest.class);
+
+ @Test
+ @Override
+ @Ignore
+ public void test() throws Exception {
+
+ }
+
+ /***
+ * Tests that FORCELEADER can get an active leader after leader puts all replicas in LIR and itself goes down,
+ * hence resulting in a leaderless shard.
+ */
+ @Test
+ @Slow
+ public void testReplicasInLIRNoLeader() throws Exception {
+ handle.put("maxScore", SKIPVAL);
+ handle.put("timestamp", SKIPVAL);
+
+ String testCollectionName = "forceleader_test_collection";
+ createCollection(testCollectionName, 1, 3, 1);
+ cloudClient.setDefaultCollection(testCollectionName);
+
+ try {
+ List<Replica> notLeaders = ensureAllReplicasAreActive(testCollectionName, SHARD1, 1, 3, maxWaitSecsToSeeAllActive);
+ assertEquals("Expected 2 replicas for collection " + testCollectionName
+ + " but found " + notLeaders.size() + "; clusterState: "
+ + printClusterStateInfo(testCollectionName), 2, notLeaders.size());
+
+ Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, SHARD1);
+ JettySolrRunner notLeader0 = getJettyOnPort(getReplicaPort(notLeaders.get(0)));
+ ZkController zkController = ((SolrDispatchFilter) notLeader0.getDispatchFilter().getFilter()).getCores().getZkController();
+
+ putNonLeadersIntoLIR(testCollectionName, SHARD1, zkController, leader, notLeaders);
+
+ cloudClient.getZkStateReader().updateClusterState();
+ ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
+ int numActiveReplicas = getNumberOfActiveReplicas(clusterState, testCollectionName, SHARD1);
+ assertEquals("Expected only 0 active replica but found " + numActiveReplicas +
+ "; clusterState: " + printClusterStateInfo(), 0, numActiveReplicas);
+
+ int numReplicasOnLiveNodes = 0;
+ for (Replica rep : clusterState.getSlice(testCollectionName, SHARD1).getReplicas()) {
+ if (clusterState.getLiveNodes().contains(rep.getNodeName())) {
+ numReplicasOnLiveNodes++;
+ }
+ }
+ assertEquals(2, numReplicasOnLiveNodes);
+ log.info("Before forcing leader: " + printClusterStateInfo());
+ // Assert there is no leader yet
+ assertNull("Expected no leader right now. State: " + clusterState.getSlice(testCollectionName, SHARD1),
+ clusterState.getSlice(testCollectionName, SHARD1).getLeader());
+
+ assertSendDocFails(3);
+
+ doForceLeader(cloudClient, testCollectionName, SHARD1);
+
+ // By now we have an active leader. Wait for recoveries to begin
+ waitForRecoveriesToFinish(testCollectionName, cloudClient.getZkStateReader(), true);
+
+ cloudClient.getZkStateReader().updateClusterState();
+ clusterState = cloudClient.getZkStateReader().getClusterState();
+ log.info("After forcing leader: " + clusterState.getSlice(testCollectionName, SHARD1));
+ // we have a leader
+ Replica newLeader = clusterState.getSlice(testCollectionName, SHARD1).getLeader();
+ assertNotNull(newLeader);
+ // leader is active
+ assertEquals(State.ACTIVE, newLeader.getState());
+
+ numActiveReplicas = getNumberOfActiveReplicas(clusterState, testCollectionName, SHARD1);
+ assertEquals(2, numActiveReplicas);
+
+ // Assert that indexing works again
+ log.info("Sending doc 4...");
+ sendDoc(4);
+ log.info("Committing...");
+ cloudClient.commit();
+ log.info("Doc 4 sent and commit issued");
+
+ assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 1);
+ assertDocsExistInAllReplicas(notLeaders, testCollectionName, 4, 4);
+
+ // Docs 1 and 4 should be here. 2 was lost during the partition, 3 had failed to be indexed.
+ log.info("Checking doc counts...");
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.add("q", "*:*");
+ assertEquals("Expected only 2 documents in the index", 2, cloudClient.query(params).getResults().getNumFound());
+
+ bringBackOldLeaderAndSendDoc(testCollectionName, leader, notLeaders, 5);
+ } finally {
+ log.info("Cleaning up after the test.");
+ // try to clean up
+ try {
+ CollectionAdminRequest.Delete req = new CollectionAdminRequest.Delete();
+ req.setCollectionName(testCollectionName);
+ req.process(cloudClient);
+ } catch (Exception e) {
+ // don't fail the test
+ log.warn("Could not delete collection {} after test completed", testCollectionName);
+ }
+ }
+ }
+
+ /**
+ * Test that FORCELEADER can set last published state of all down (live) replicas to active (so
+ * that they become worthy candidates for leader election).
+ */
+ @Slow
+ public void testLastPublishedStateIsActive() throws Exception {
+ handle.put("maxScore", SKIPVAL);
+ handle.put("timestamp", SKIPVAL);
+
+ String testCollectionName = "forceleader_last_published";
+ createCollection(testCollectionName, 1, 3, 1);
+ cloudClient.setDefaultCollection(testCollectionName);
+ log.info("Collection created: " + testCollectionName);
+
+ try {
+ List<Replica> notLeaders = ensureAllReplicasAreActive(testCollectionName, SHARD1, 1, 3, maxWaitSecsToSeeAllActive);
+ assertEquals("Expected 2 replicas for collection " + testCollectionName
+ + " but found " + notLeaders.size() + "; clusterState: "
+ + printClusterStateInfo(testCollectionName), 2, notLeaders.size());
+
+ Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, SHARD1);
+ JettySolrRunner notLeader0 = getJettyOnPort(getReplicaPort(notLeaders.get(0)));
+ ZkController zkController = ((SolrDispatchFilter) notLeader0.getDispatchFilter().getFilter()).getCores().getZkController();
+
+ // Mark all replicas down
+ setReplicaState(testCollectionName, SHARD1, leader, State.DOWN);
+ for (Replica rep : notLeaders) {
+ setReplicaState(testCollectionName, SHARD1, rep, State.DOWN);
+ }
+
+ zkController.getZkStateReader().updateClusterState();
+ // Assert all replicas are down and that there is no leader
+ assertEquals(0, getActiveOrRecoveringReplicas(testCollectionName, SHARD1).size());
+
+ // Now force leader
+ doForceLeader(cloudClient, testCollectionName, SHARD1);
+
+ // Assert that last published states of the two replicas are active now
+ for (Replica rep: notLeaders) {
+ assertEquals(Replica.State.ACTIVE, getLastPublishedState(testCollectionName, SHARD1, rep));
+ }
+ } finally {
+ log.info("Cleaning up after the test.");
+ // try to clean up
+ try {
+ CollectionAdminRequest.Delete req = new CollectionAdminRequest.Delete();
+ req.setCollectionName(testCollectionName);
+ req.process(cloudClient);
+ } catch (Exception e) {
+ // don't fail the test
+ log.warn("Could not delete collection {} after test completed", testCollectionName);
+ }
+ }
+ }
+
+ protected void unsetLeader(String collection, String slice) throws Exception {
+ DistributedQueue inQueue = Overseer.getInQueue(cloudClient.getZkStateReader().getZkClient());
+ ZkStateReader zkStateReader = cloudClient.getZkStateReader();
+
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
+ ZkStateReader.SHARD_ID_PROP, slice,
+ ZkStateReader.COLLECTION_PROP, collection);
+ inQueue.offer(Utils.toJSON(m));
+
+ ClusterState clusterState = null;
+ boolean transition = false;
+ for (int counter = 10; counter > 0; counter--) {
+ zkStateReader.updateClusterState();
+ clusterState = zkStateReader.getClusterState();
+ Replica newLeader = clusterState.getSlice(collection, slice).getLeader();
+ if (newLeader == null) {
+ transition = true;
+ break;
+ }
+ Thread.sleep(1000);
+ }
+
+ if (!transition) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not unset replica leader" +
+ ". Cluster state: " + printClusterStateInfo(collection));
+ }
+ }
+
+ protected void setReplicaState(String collection, String slice, Replica replica, Replica.State state) throws SolrServerException, IOException,
+ KeeperException, InterruptedException {
+ DistributedQueue inQueue = Overseer.getInQueue(cloudClient.getZkStateReader().getZkClient());
+ ZkStateReader zkStateReader = cloudClient.getZkStateReader();
+
+ String baseUrl = zkStateReader.getBaseUrlForNodeName(replica.getNodeName());
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
+ ZkStateReader.BASE_URL_PROP, baseUrl,
+ ZkStateReader.NODE_NAME_PROP, replica.getNodeName(),
+ ZkStateReader.SHARD_ID_PROP, slice,
+ ZkStateReader.COLLECTION_PROP, collection,
+ ZkStateReader.CORE_NAME_PROP, replica.getStr(CORE_NAME_PROP),
+ ZkStateReader.CORE_NODE_NAME_PROP, replica.getName(),
+ ZkStateReader.STATE_PROP, state.toString());
+ inQueue.offer(Utils.toJSON(m));
+ boolean transition = false;
+
+ Replica.State replicaState = null;
+ for (int counter = 10; counter > 0; counter--) {
+ zkStateReader.updateClusterState();
+ ClusterState clusterState = zkStateReader.getClusterState();
+ replicaState = clusterState.getSlice(collection, slice).getReplica(replica.getName()).getState();
+ if (replicaState == state) {
+ transition = true;
+ break;
+ }
+ Thread.sleep(1000);
+ }
+
+ if (!transition) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not set replica [" + replica.getName() + "] as " + state +
+ ". Last known state of the replica: " + replicaState);
+ }
+ }
+
+ /*protected void setLastPublishedState(String collection, String slice, Replica replica, Replica.State state) throws SolrServerException, IOException,
+ KeeperException, InterruptedException {
+ ZkStateReader zkStateReader = cloudClient.getZkStateReader();
+ String baseUrl = zkStateReader.getBaseUrlForNodeName(replica.getNodeName());
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.FORCEPREPAREFORLEADERSHIP.toString());
+ params.set(CoreAdminParams.CORE, replica.getStr("core"));
+ params.set(ZkStateReader.STATE_PROP, state.toString());
+
+ SolrRequest<SimpleSolrResponse> req = new GenericSolrRequest(METHOD.GET, "/admin/cores", params);
+ NamedList resp = null;
+ try (HttpSolrClient hsc = new HttpSolrClient(baseUrl)) {
+ resp = hsc.request(req);
+ }
+ }*/
+
+ protected Replica.State getLastPublishedState(String collection, String slice, Replica replica) throws SolrServerException, IOException,
+ KeeperException, InterruptedException {
+ ZkStateReader zkStateReader = cloudClient.getZkStateReader();
+ String baseUrl = zkStateReader.getBaseUrlForNodeName(replica.getNodeName());
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.STATUS.toString());
+ params.set(CoreAdminParams.CORE, replica.getStr("core"));
+
+ SolrRequest<SimpleSolrResponse> req = new GenericSolrRequest(METHOD.GET, "/admin/cores", params);
+ NamedList resp = null;
+ try (HttpSolrClient hsc = new HttpSolrClient(baseUrl)) {
+ resp = hsc.request(req);
+ }
+
+ String lastPublished = (((NamedList<NamedList<String>>)resp.get("status")).get(replica.getStr("core"))).get("lastPublished");
+ return Replica.State.getState(lastPublished);
+ }
+
+ void assertSendDocFails(int docId) throws Exception {
+ // sending a doc in this state fails
+ try {
+ sendDoc(docId);
+ log.error("Should've failed indexing during a down state. Cluster state: " + printClusterStateInfo());
+ fail("Should've failed indexing during a down state.");
+ } catch (SolrException ex) {
+ log.info("Document couldn't be sent, which is expected.");
+ }
+ }
+
+ void putNonLeadersIntoLIR(String collectionName, String shard, ZkController zkController, Replica leader, List<Replica> notLeaders) throws Exception {
+ SocketProxy[] nonLeaderProxies = new SocketProxy[notLeaders.size()];
+ for (int i = 0; i < notLeaders.size(); i++)
+ nonLeaderProxies[i] = getProxyForReplica(notLeaders.get(i));
+
+ sendDoc(1);
+
+ // ok, now introduce a network partition between the leader and both replicas
+ log.info("Closing proxies for the non-leader replicas...");
+ for (SocketProxy proxy : nonLeaderProxies)
+ proxy.close();
+
+ // indexing during a partition
+ log.info("Sending a doc during the network partition...");
+ sendDoc(2);
+
+ // Wait a little
+ Thread.sleep(2000);
+
+ // Kill the leader
+ log.info("Killing leader for shard1 of " + collectionName + " on node " + leader.getNodeName() + "");
+ JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
+ getProxyForReplica(leader).close();
+ leaderJetty.stop();
+
+ // Wait for a steady state, till LIR flags have been set and the shard is leaderless
+ log.info("Sleep and periodically wake up to check for state...");
+ for (int i = 0; i < 20; i++) {
+ Thread.sleep(1000);
+ State lirStates[] = new State[notLeaders.size()];
+ for (int j = 0; j < notLeaders.size(); j++)
+ lirStates[j] = zkController.getLeaderInitiatedRecoveryState(collectionName, shard, notLeaders.get(j).getName());
+
+ zkController.getZkStateReader().updateClusterState();
+ ClusterState clusterState = zkController.getZkStateReader().getClusterState();
+ boolean allDown = true;
+ for (State lirState : lirStates)
+ if (Replica.State.DOWN.equals(lirState) == false)
+ allDown = false;
+ if (allDown && clusterState.getSlice(collectionName, shard).getLeader() == null) {
+ break;
+ }
+ log.warn("Attempt " + i + ", waiting on for 1 sec to settle down in the steady state. State: " +
+ printClusterStateInfo(collectionName));
+ log.warn("LIR state: " + getLIRState(zkController, collectionName, shard));
+ }
+ log.info("Waking up...");
+
+ // remove the network partition
+ log.info("Reopening the proxies for the non-leader replicas...");
+ for (SocketProxy proxy : nonLeaderProxies)
+ proxy.reopen();
+
+ log.info("LIR state: " + getLIRState(zkController, collectionName, shard));
+
+ State lirStates[] = new State[notLeaders.size()];
+ for (int j = 0; j < notLeaders.size(); j++)
+ lirStates[j] = zkController.getLeaderInitiatedRecoveryState(collectionName, shard, notLeaders.get(j).getName());
+ for (State lirState : lirStates)
+ assertEquals("Expected that the LIR state would've been down by now",
+ Replica.State.DOWN, (lirState));
+ }
+
+ protected void bringBackOldLeaderAndSendDoc(String collection, Replica leader, List<Replica> notLeaders, int docid) throws Exception {
+ // Bring back the leader which was stopped
+ log.info("Bringing back originally killed leader...");
+ JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
+ leaderJetty.start();
+ waitForRecoveriesToFinish(collection, cloudClient.getZkStateReader(), true);
+ cloudClient.getZkStateReader().updateClusterState();
+ ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
+ log.info("After bringing back leader: " + clusterState.getSlice(collection, SHARD1));
+ int numActiveReplicas = getNumberOfActiveReplicas(clusterState, collection, SHARD1);
+ assertEquals(1+notLeaders.size(), numActiveReplicas);
+ log.info("Sending doc "+docid+"...");
+ sendDoc(docid);
+ log.info("Committing...");
+ cloudClient.commit();
+ log.info("Doc "+docid+" sent and commit issued");
+ assertDocsExistInAllReplicas(notLeaders, collection, docid, docid);
+ assertDocsExistInAllReplicas(Collections.singletonList(leader), collection, docid, docid);
+ }
+
+ protected String getLIRState(ZkController zkController, String collection, String shard) throws KeeperException, InterruptedException {
+ StringBuilder sb = new StringBuilder();
+ String path = zkController.getLeaderInitiatedRecoveryZnodePath(collection, shard);
+ if (path == null)
+ return null;
+ try {
+ zkController.getZkClient().printLayout(path, 4, sb);
+ } catch (NoNodeException ex) {
+ return null;
+ }
+ return sb.toString();
+ }
+
+ @Override
+ protected int sendDoc(int docId) throws Exception {
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField(id, String.valueOf(docId));
+ doc.addField("a_t", "hello" + docId);
+
+ return sendDocsWithRetry(Collections.singletonList(doc), 1, 5, 1);
+ }
+
+ private void doForceLeader(SolrClient client, String collectionName, String shard) throws IOException, SolrServerException {
+ CollectionAdminRequest.ForceLeader forceLeader = new CollectionAdminRequest.ForceLeader();
+ forceLeader.setCollectionName(collectionName);
+ forceLeader.setShardName(shard);
+ client.request(forceLeader);
+ }
+
+ protected int getNumberOfActiveReplicas(ClusterState clusterState, String collection, String sliceId) {
+ int numActiveReplicas = 0;
+ // Assert all replicas are active
+ for (Replica rep : clusterState.getSlice(collection, sliceId).getReplicas()) {
+ if (rep.getState().equals(State.ACTIVE)) {
+ numActiveReplicas++;
+ }
+ }
+ return numActiveReplicas;
+ }
+}
+
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java?rev=1712851&r1=1712850&r2=1712851&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java Thu Nov 5 19:25:15 2015
@@ -434,6 +434,29 @@ public abstract class CollectionAdminReq
return this;
}
}
+
+ // FORCELEADER request
+ public static class ForceLeader extends CollectionShardAdminRequest<ForceLeader> {
+ protected String asyncId;
+
+ public ForceLeader() {
+ action = CollectionAction.FORCELEADER;
+ }
+
+ @Override
+ protected ForceLeader getThis() {
+ return this;
+ }
+
+ @Override
+ public SolrParams getParams() {
+ ModifiableSolrParams params = getCommonParams();
+ if (asyncId != null) {
+ params.set(CommonAdminParams.ASYNC, asyncId);
+ }
+ return params;
+ }
+ }
// REQUESTSTATUS request
public static class RequestStatus extends CollectionAdminRequest<RequestStatus> {
@@ -447,7 +470,7 @@ public abstract class CollectionAdminReq
this.requestId = requestId;
return this;
}
-
+
public String getRequestId() {
return this.requestId;
}
@@ -464,7 +487,7 @@ public abstract class CollectionAdminReq
return this;
}
}
-
+
// CREATEALIAS request
public static class CreateAlias extends CollectionAdminRequest<CreateAlias> {
protected String aliasName;
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java?rev=1712851&r1=1712850&r2=1712851&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java Thu Nov 5 19:25:15 2015
@@ -350,6 +350,34 @@ public class CoreAdminRequest extends So
}
}
+ public static class OverrideLastPublished extends CoreAdminRequest {
+ protected String state;
+
+ public OverrideLastPublished() {
+ action = CoreAdminAction.FORCEPREPAREFORLEADERSHIP;
+ }
+
+ @Override
+ public SolrParams getParams() {
+ if( action == null ) {
+ throw new RuntimeException( "no action specified!" );
+ }
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, action.toString());
+ params.set(CoreAdminParams.CORE, core);
+ params.set(ZkStateReader.STATE_PROP, state);
+ return params;
+ }
+
+ public String getState() {
+ return state;
+ }
+
+ public void setState(String state) {
+ this.state = state;
+ }
+ }
+
public static class MergeIndexes extends CoreAdminRequest {
protected List<String> indexDirs;
protected List<String> srcCores;
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java?rev=1712851&r1=1712850&r2=1712851&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java Thu Nov 5 19:25:15 2015
@@ -19,8 +19,6 @@ package org.apache.solr.common.params;
import java.util.Locale;
-import org.apache.solr.common.SolrException;
-
public interface CollectionParams
{
/** What action **/
@@ -40,6 +38,7 @@ public interface CollectionParams
DELETESHARD(true),
CREATESHARD(true),
DELETEREPLICA(true),
+ FORCELEADER(true),
MIGRATE(true),
ADDROLE(true),
REMOVEROLE(true),
Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java?rev=1712851&r1=1712850&r2=1712851&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java Thu Nov 5 19:25:15 2015
@@ -142,6 +142,8 @@ public abstract class CoreAdminParams
OVERSEEROP,
REQUESTSTATUS,
REJOINLEADERELECTION,
+ //internal API used by force shard leader election
+ FORCEPREPAREFORLEADERSHIP,
INVOKE;
public static CoreAdminAction get( String p )