You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2016/09/21 18:27:35 UTC

lucene-solr:branch_6x: SOLR-9446: Leader failure after creating a freshly replicated index can send nodes into recovery even if index was not changed

Repository: lucene-solr
Updated Branches:
  refs/heads/branch_6x 56f269734 -> 8502995e3


SOLR-9446: Leader failure after creating a freshly replicated index can send nodes into recovery even if index was not changed


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/8502995e
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/8502995e
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/8502995e

Branch: refs/heads/branch_6x
Commit: 8502995e3b1ce66db49be26b23a3fa3c169345a8
Parents: 56f2697
Author: Noble Paul <no...@apache.org>
Authored: Wed Sep 21 23:55:59 2016 +0530
Committer: Noble Paul <no...@apache.org>
Committed: Wed Sep 21 23:57:12 2016 +0530

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   3 +
 .../handler/component/RealTimeGetComponent.java |  17 ++
 .../java/org/apache/solr/update/PeerSync.java   |  50 ++++
 .../cloud/LeaderFailureAfterFreshStartTest.java | 295 +++++++++++++++++++
 .../solr/cloud/PeerSyncReplicationTest.java     |   9 +-
 5 files changed, 371 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8502995e/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index b9400a7..c317349 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -92,6 +92,9 @@ Optimizations
 
 * SOLR-9142: JSON Facet API: new method=dvhash can be chosen for fields with high cardinality. (David Smiley)
 
+* SOLR-9446: Leader failure after creating a freshly replicated index can send nodes into recovery even if
+  index was not changed (Pushkar Raste, noble)
+
 Other Changes
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8502995e/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
index d90b741..1a76f52 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
@@ -112,6 +112,12 @@ public class RealTimeGetComponent extends SearchComponent
       return;
     }
     
+    val = params.get("getFingerprint");
+    if(val != null) {
+      processGetFingeprint(rb);
+      return;
+    }
+    
     val = params.get("getVersions");
     if (val != null) {
       processGetVersions(rb);
@@ -597,6 +603,17 @@ public class RealTimeGetComponent extends SearchComponent
     return null;
   }
 
+  
+  
+  public void processGetFingeprint(ResponseBuilder rb) throws IOException {
+    SolrQueryRequest req = rb.req;
+    SolrParams params = req.getParams();
+    
+    long maxVersion = params.getLong("getFingerprint", Long.MAX_VALUE);
+    IndexFingerprint fingerprint = IndexFingerprint.getFingerprint(req.getCore(), Math.abs(maxVersion));
+    rb.rsp.add("fingerprint", fingerprint);
+  }
+  
 
   ///////////////////////////////////////////////////////////////////////////////////
   // Returns last versions added to index

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8502995e/solr/core/src/java/org/apache/solr/update/PeerSync.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/PeerSync.java b/solr/core/src/java/org/apache/solr/update/PeerSync.java
index 1eb69a5..dfacd4b 100644
--- a/solr/core/src/java/org/apache/solr/update/PeerSync.java
+++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java
@@ -188,6 +188,11 @@ public class PeerSync  {
           log.debug(msg() + "startingVersions=" + startingVersions.size() + " " + startingVersions);
         }
       }
+      // check if we already in sync to begin with 
+      if(doFingerprint && alreadyInSync()) {
+        return true;
+      }
+      
       
       // Fire off the requests before getting our own recent updates (for better concurrency)
       // This also allows us to avoid getting updates we don't need... if we got our updates and then got their updates,
@@ -277,6 +282,51 @@ public class PeerSync  {
       MDCLoggingContext.clear();
     }
   }
+
+  /**
+   * Check if we are already in sync. Simple fingerprint comparison should do
+   */
+  private boolean alreadyInSync() {
+    for (String replica : replicas) {
+      requestFingerprint(replica);
+    }
+    
+    for (;;) {
+      ShardResponse srsp = shardHandler.takeCompletedOrError();
+      if (srsp == null) break;
+      
+      try {
+        IndexFingerprint otherFingerprint = IndexFingerprint.fromObject(srsp.getSolrResponse().getResponse().get("fingerprint"));
+        IndexFingerprint ourFingerprint = IndexFingerprint.getFingerprint(core, Long.MAX_VALUE);
+        if(IndexFingerprint.compare(otherFingerprint, ourFingerprint) == 0) {
+          log.info("We are already in sync. No need to do a PeerSync ");
+          return true;
+        }
+      } catch(IOException e) {
+        log.warn("Could not cofirm if we are already in sync. Continue with PeerSync");
+      }
+    }
+    
+    return false;
+  }
+  
+  
+  private void requestFingerprint(String replica) {
+    SyncShardRequest sreq = new SyncShardRequest();
+    requests.add(sreq);
+
+    sreq.shards = new String[]{replica};
+    sreq.actualShards = sreq.shards;
+    sreq.params = new ModifiableSolrParams();
+    sreq.params = new ModifiableSolrParams();
+    sreq.params.set("qt","/get");
+    sreq.params.set("distrib",false);
+    sreq.params.set("getFingerprint", String.valueOf(Long.MAX_VALUE));
+    
+    shardHandler.submit(sreq, replica, sreq.params);
+  }
+  
+  
   
   private void requestVersions(String replica) {
     SyncShardRequest sreq = new SyncShardRequest();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8502995e/solr/core/src/test/org/apache/solr/cloud/LeaderFailureAfterFreshStartTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderFailureAfterFreshStartTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderFailureAfterFreshStartTest.java
new file mode 100644
index 0000000..348532c
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/LeaderFailureAfterFreshStartTest.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.cloud.ZkTestServer.LimitViolationAction;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.Slice.State;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.core.Diagnostics;
+import org.apache.solr.handler.ReplicationHandler;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * 
+ * Test for SOLR-9446
+ *
+ * This test is modeled after SyncSliceTest
+ */
+@Slow
+public class LeaderFailureAfterFreshStartTest extends AbstractFullDistribZkTestBase {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private boolean success = false;
+  int docId = 0;
+
+  List<CloudJettyRunner> nodesDown = new ArrayList<>();
+
+  @Override
+  public void distribTearDown() throws Exception {
+    if (!success) {
+      printLayoutOnTearDown = true;
+    }
+    System.clearProperty("solr.directoryFactory");
+    System.clearProperty("solr.ulog.numRecordsToKeep");
+    System.clearProperty("tests.zk.violationReportAction");
+    super.distribTearDown();
+  }
+
+  public LeaderFailureAfterFreshStartTest() {
+    super();
+    sliceCount = 1;
+    fixShardCount(3);
+  }
+
+  protected String getCloudSolrConfig() {
+    return "solrconfig-tlog.xml";
+  }
+
+  @Override
+  public void distribSetUp() throws Exception {
+    // tlog gets deleted after node restarts if we use CachingDirectoryFactory.
+    // make sure that tlog stays intact after we restart a node
+    System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
+    System.setProperty("solr.ulog.numRecordsToKeep", "1000");
+    System.setProperty("tests.zk.violationReportAction", LimitViolationAction.IGNORE.toString());
+    super.distribSetUp();
+  }
+
+  @Test
+  public void test() throws Exception {
+    handle.clear();
+    handle.put("timestamp", SKIPVAL);
+
+    try {
+      CloudJettyRunner initialLeaderJetty = shardToLeaderJetty.get("shard1");
+      List<CloudJettyRunner> otherJetties = getOtherAvailableJetties(initialLeaderJetty);
+      
+      log.info("Leader node_name: {},  url: {}", initialLeaderJetty.coreNodeName, initialLeaderJetty.url);
+      for (CloudJettyRunner cloudJettyRunner : otherJetties) {
+        log.info("Nonleader node_name: {},  url: {}", cloudJettyRunner.coreNodeName, cloudJettyRunner.url);
+      }
+      
+      CloudJettyRunner secondNode = otherJetties.get(0);
+      CloudJettyRunner freshNode = otherJetties.get(1);
+      
+      // shutdown a node to simulate fresh start
+      otherJetties.remove(freshNode);
+      forceNodeFailures(singletonList(freshNode));
+
+      del("*:*");
+      waitForThingsToLevelOut(30);
+
+      checkShardConsistency(false, true);
+
+      // index a few docs and commit
+      for (int i = 0; i < 100; i++) {
+        indexDoc(id, docId, i1, 50, tlong, 50, t1,
+            "document number " + docId++);
+      }
+      commit();
+      waitForThingsToLevelOut(30);
+
+      checkShardConsistency(false, true);
+
+      // start the freshNode 
+      ChaosMonkey.start(freshNode.jetty);
+      nodesDown.remove(freshNode);
+
+      waitTillNodesActive();
+      waitForThingsToLevelOut(30);
+      
+      //TODO check how to see if fresh node went into recovery (may be check count for replication handler on new leader) 
+      
+      long numRequestsBefore = (Long) secondNode.jetty
+          .getCoreContainer()
+          .getCores()
+          .iterator()
+          .next()
+          .getRequestHandler(ReplicationHandler.PATH)
+          .getStatistics().get("requests");
+      
+      // shutdown the original leader
+      log.info("Now shutting down initial leader");
+      forceNodeFailures(singletonList(initialLeaderJetty));
+      waitForNewLeader(cloudClient, "shard1", (Replica)initialLeaderJetty.client.info  , 15);
+      log.info("Updating mappings from zk");
+      updateMappingsFromZk(jettys, clients, true);
+      
+      long numRequestsAfter = (Long) secondNode.jetty
+          .getCoreContainer()
+          .getCores()
+          .iterator()
+          .next()
+          .getRequestHandler(ReplicationHandler.PATH)
+          .getStatistics().get("requests");
+
+      assertEquals("Node went into replication", numRequestsBefore, numRequestsAfter);
+      
+      success = true;
+    } finally {
+      System.clearProperty("solr.disableFingerprint");
+    }
+  }
+
+
+  private void forceNodeFailures(List<CloudJettyRunner> replicasToShutDown) throws Exception {
+    for (CloudJettyRunner replicaToShutDown : replicasToShutDown) {
+      chaosMonkey.killJetty(replicaToShutDown);
+      waitForNoShardInconsistency();
+    }
+
+    int totalDown = 0;
+
+    Set<CloudJettyRunner> jetties = new HashSet<>();
+    jetties.addAll(shardToJetty.get("shard1"));
+
+    if (replicasToShutDown != null) {
+      jetties.removeAll(replicasToShutDown);
+      totalDown += replicasToShutDown.size();
+    }
+
+    jetties.removeAll(nodesDown);
+    totalDown += nodesDown.size();
+
+    assertEquals(getShardCount() - totalDown, jetties.size());
+
+    nodesDown.addAll(replicasToShutDown);
+  }
+
+  
+  static void waitForNewLeader(CloudSolrClient cloudClient, String shardName, Replica oldLeader, int maxWaitInSecs)
+      throws Exception {
+    log.info("Will wait for a node to become leader for {} secs", maxWaitInSecs);
+    boolean waitForLeader = true;
+    int i = 0;
+    ZkStateReader zkStateReader = cloudClient.getZkStateReader();
+    zkStateReader.forceUpdateCollection(DEFAULT_COLLECTION);
+    
+    while(waitForLeader) {
+      ClusterState clusterState = zkStateReader.getClusterState();
+      DocCollection coll = clusterState.getCollection("collection1");
+      Slice slice = coll.getSlice(shardName);
+      if(slice.getLeader() != oldLeader && slice.getState() == State.ACTIVE) {
+        log.info("New leader got elected in {} secs", i);
+        break;
+      }
+      
+      if(i == maxWaitInSecs) {
+        Diagnostics.logThreadDumps("Could not find new leader in specified timeout");
+        zkStateReader.getZkClient().printLayoutToStdOut();
+        fail("Could not find new leader even after waiting for " + maxWaitInSecs + "secs");
+      }
+      
+      i++;
+      Thread.sleep(1000);
+    }
+  }
+    
+
+
+  private void waitTillNodesActive() throws Exception {
+    for (int i = 0; i < 60; i++) {
+      Thread.sleep(3000);
+      ZkStateReader zkStateReader = cloudClient.getZkStateReader();
+      ClusterState clusterState = zkStateReader.getClusterState();
+      DocCollection collection1 = clusterState.getCollection("collection1");
+      Slice slice = collection1.getSlice("shard1");
+      Collection<Replica> replicas = slice.getReplicas();
+      boolean allActive = true;
+
+      Collection<Replica> replicasToCheck = null;
+      replicasToCheck = replicas.stream().filter(r -> nodesDown.contains(r.getName()))
+          .collect(Collectors.toList());
+
+      for (Replica replica : replicasToCheck) {
+        if (!clusterState.liveNodesContain(replica.getNodeName()) || replica.getState() != Replica.State.ACTIVE) {
+          allActive = false;
+          break;
+        }
+      }
+      if (allActive) {
+        return;
+      }
+    }
+    printLayout();
+    fail("timeout waiting to see all nodes active");
+  }
+
+  
+  private List<CloudJettyRunner> getOtherAvailableJetties(CloudJettyRunner leader) {
+    List<CloudJettyRunner> candidates = new ArrayList<>();
+    candidates.addAll(shardToJetty.get("shard1"));
+
+    if (leader != null) {
+      candidates.remove(leader);
+    }
+
+    candidates.removeAll(nodesDown);
+
+    return candidates;
+  }
+
+  protected void indexDoc(Object... fields) throws IOException,
+      SolrServerException {
+    SolrInputDocument doc = new SolrInputDocument();
+
+    addFields(doc, fields);
+    addFields(doc, "rnd_s", RandomStringUtils.random(random().nextInt(100) + 100));
+
+    UpdateRequest ureq = new UpdateRequest();
+    ureq.add(doc);
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    ureq.setParams(params);
+    ureq.process(cloudClient);
+  }
+
+  // skip the randoms - they can deadlock...
+  @Override
+  protected void indexr(Object... fields) throws Exception {
+    SolrInputDocument doc = new SolrInputDocument();
+    addFields(doc, fields);
+    addFields(doc, "rnd_b", true);
+    indexDoc(doc);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8502995e/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java b/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java
index f85b7f1..3ded7d2 100644
--- a/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/PeerSyncReplicationTest.java
@@ -45,6 +45,9 @@ import org.apache.solr.handler.ReplicationHandler;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import static java.util.Collections.singletonList;
+
 /**
  * Test sync peer sync when a node restarts and documents are indexed when node was down.
  *
@@ -120,7 +123,7 @@ public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase {
       otherJetties.remove(neverLeader) ;
 
       // first shutdown a node that will never be a leader
-      forceNodeFailures(Arrays.asList(neverLeader));
+      forceNodeFailures(singletonList(neverLeader));
 
       // node failure and recovery via PeerSync
       log.info("Forcing PeerSync");
@@ -144,9 +147,9 @@ public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase {
 
       // now shutdown the original leader
       log.info("Now shutting down initial leader");
-      forceNodeFailures(Arrays.asList(initialLeaderJetty));
+      forceNodeFailures(singletonList(initialLeaderJetty));
       log.info("Updating mappings from zk");
-      Thread.sleep(15000); // sleep for a while for leader to change ...
+      LeaderFailureAfterFreshStartTest.waitForNewLeader(cloudClient, "shard1", (Replica) initialLeaderJetty.client.info, 15);
       updateMappingsFromZk(jettys, clients, true);
       assertEquals("PeerSynced node did not become leader", nodePeerSynced, shardToLeaderJetty.get("shard1"));