You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2019/06/03 03:57:08 UTC

[lucene-solr] branch master updated: SOLR-13489: Stop the leader from trying to rejoin the election on session expiration and harden our zk reconnect code path.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/master by this push:
     new b09d462  SOLR-13489: Stop the leader from trying to rejoin the election on session expiration and harden our zk reconnect code path.
b09d462 is described below

commit b09d462ee4fa26642546a90f55f0b14d5c8b0124
Author: markrmiller <ma...@apache.org>
AuthorDate: Sun Jun 2 21:31:12 2019 -0500

    SOLR-13489: Stop the leader from trying to rejoin the election on session expiration and harden our zk reconnect code path.
---
 solr/CHANGES.txt                                   |  3 ++
 .../org/apache/solr/cloud/ElectionContext.java     | 14 +++++-
 .../java/org/apache/solr/cloud/LeaderElector.java  |  1 +
 .../java/org/apache/solr/cloud/ZkController.java   | 52 +++++++++++++++-------
 .../apache/solr/cloud/ConnectionManagerTest.java   |  4 +-
 .../test/org/apache/solr/cloud/OverseerTest.java   | 15 ++++---
 .../impl/ConcurrentUpdateHttp2SolrClient.java      |  2 +-
 .../solr/common/cloud/ConnectionManager.java       | 38 ++++++++--------
 .../common/cloud/DefaultConnectionStrategy.java    |  6 +--
 .../org/apache/solr/common/cloud/OnReconnect.java  |  4 +-
 10 files changed, 88 insertions(+), 51 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 59497e1..e07cb74 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -113,6 +113,9 @@ Bug Fixes
 
 * SOLR-13491: SolrZkClient's watch wrapper no longer allows zookeeper to hold the same watch object multiple times.
 
+* SOLR-13489: Stop the leader from trying to rejoin the election on session expiration and harden our zk reconnect code path.
+  (Mark Miller, Anshum Gupta)
+
 Other Changes
 ----------------------
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
index a67ce57..456daee 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
@@ -52,6 +52,7 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
 import org.apache.zookeeper.Op;
 import org.apache.zookeeper.OpResult;
 import org.apache.zookeeper.OpResult.SetDataResult;
@@ -489,6 +490,9 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
           // we made it as leader - send any recovery requests we need to
           syncStrategy.requestRecoveries();
 
+        } catch (SessionExpiredException e) {
+          throw new SolrException(ErrorCode.SERVER_ERROR,
+              "ZK session expired - cancelling election for " + collection + " " + shardId);
         } catch (Exception e) {
           isLeader = false;
           SolrException.log(log, "There was a problem trying to register as the leader", e);
@@ -503,7 +507,12 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
             core.getCoreDescriptor().getCloudDescriptor().setLeader(false);
             
             // we could not publish ourselves as leader - try and rejoin election
-            rejoinLeaderElection(core);
+            try {
+              rejoinLeaderElection(core);
+            } catch (SessionExpiredException exc) {
+              throw new SolrException(ErrorCode.SERVER_ERROR,
+                  "ZK session expired - cancelling election for " + collection + " " + shardId);
+            }
           }
         }
       } else {
@@ -719,6 +728,9 @@ final class OverseerElectionContext extends ElectionContext {
   @Override
   void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStartMs) throws KeeperException,
       InterruptedException {
+    if (isClosed) {
+      return;
+    }
     log.info("I am going to be the leader {}", id);
     final String id = leaderSeqPath
         .substring(leaderSeqPath.lastIndexOf("/") + 1);
diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
index 0cc8cac..80ce82f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
+++ b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
@@ -133,6 +133,7 @@ public  class LeaderElector {
     if (leaderSeqNodeName.equals(seqs.get(0))) {
       // I am the leader
       try {
+        if (zkClient.isClosed()) return; // but our zkClient is already closed
         runIamLeaderProcess(context, replacement);
       } catch (KeeperException.NodeExistsException e) {
         log.error("node exists",e);
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index f9fa06a..8e90dd8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -335,10 +335,11 @@ public class ZkController implements Closeable {
         new OnReconnect() {
 
           @Override
-          public void command() {
+          public void command() throws SessionExpiredException {
             log.info("ZooKeeper session re-connected ... refreshing core states after session expiration.");
             clearZkCollectionTerms();
             try {
+              // recreate our watchers first so that they exist even on any problems below
               zkStateReader.createClusterStateWatchersAndUpdate();
 
               // this is troublesome - we dont want to kill anything the old
@@ -368,8 +369,17 @@ public class ZkController implements Closeable {
               }
 
               cc.cancelCoreRecoveries();
-
-              registerAllCoresAsDown(registerOnReconnect, false);
+              
+              try {
+                registerAllCoresAsDown(registerOnReconnect, false);
+              } catch (SessionExpiredException e) {
+                // zk has to reconnect and this will all be tried again
+                throw e;
+              } catch (Exception e) {
+                // this is really best effort - in case of races or failure cases where we now need to be the leader, if anything fails,
+                // just continue
+                log.warn("Exception while trying to register all cores as DOWN", e);
+              } 
 
               // we have to register as live first to pick up docs in the buffer
               createEphemeralLiveNode();
@@ -422,6 +432,8 @@ public class ZkController implements Closeable {
               Thread.currentThread().interrupt();
               throw new ZooKeeperException(
                   SolrException.ErrorCode.SERVER_ERROR, "", e);
+            } catch (SessionExpiredException e) {
+              throw e;
             } catch (Exception e) {
               SolrException.log(log, "", e);
               throw new ZooKeeperException(
@@ -478,7 +490,7 @@ public class ZkController implements Closeable {
   }
 
   private void registerAllCoresAsDown(
-      final CurrentCoreDescriptorProvider registerOnReconnect, boolean updateLastPublished) {
+      final CurrentCoreDescriptorProvider registerOnReconnect, boolean updateLastPublished) throws SessionExpiredException {
     List<CoreDescriptor> descriptors = registerOnReconnect
         .getCurrentDescriptors();
     if (isClosed) return;
@@ -509,6 +521,9 @@ public class ZkController implements Closeable {
           continue;
         } catch (InterruptedException e2) {
           Thread.currentThread().interrupt();
+        } catch (SessionExpiredException e) {
+          // zk has to reconnect
+          throw e;
         } catch (KeeperException e) {
           log.warn("", e);
           Thread.currentThread().interrupt();
@@ -519,7 +534,7 @@ public class ZkController implements Closeable {
           log.debug("calling waitForLeaderToSeeDownState for coreZkNodeName={} collection={} shard={}", new Object[]{coreZkNodeName, collection, slice});
           waitForLeaderToSeeDownState(descriptor, coreZkNodeName);
         } catch (Exception e) {
-          SolrException.log(log, "", e);
+          log.warn("There was a problem while making a best effort to ensure the leader has seen us as down, this is not unexpected as Zookeeper has just reconnected after a session expiration", e);
           if (isClosed) {
             return;
           }
@@ -1384,19 +1399,21 @@ public class ZkController implements Closeable {
 
   /**
    * Get leader props directly from zk nodes.
+   * @throws SessionExpiredException on zk session expiration.
    */
   public ZkCoreNodeProps getLeaderProps(final String collection,
-                                        final String slice, int timeoutms) throws InterruptedException {
-    return getLeaderProps(collection, slice, timeoutms, false);
+                                        final String slice, int timeoutms) throws InterruptedException, SessionExpiredException {
+    return getLeaderProps(collection, slice, timeoutms, true);
   }
 
   /**
    * Get leader props directly from zk nodes.
    *
    * @return leader props
+   * @throws SessionExpiredException on zk session expiration.
    */
   public ZkCoreNodeProps getLeaderProps(final String collection,
-                                        final String slice, int timeoutms, boolean failImmediatelyOnExpiration) throws InterruptedException {
+                                        final String slice, int timeoutms, boolean failImmediatelyOnExpiration) throws InterruptedException, SessionExpiredException {
     int iterCount = timeoutms / 1000;
     Exception exp = null;
     while (iterCount-- > 0) {
@@ -1411,7 +1428,7 @@ public class ZkController implements Closeable {
         throw e;
       } catch (SessionExpiredException e) {
         if (failImmediatelyOnExpiration) {
-          throw new RuntimeException("Session has expired - could not get leader props", exp);
+          throw e;
         }
         exp = e;
         Thread.sleep(1000);
@@ -1856,7 +1873,7 @@ public class ZkController implements Closeable {
   }
 
   private ZkCoreNodeProps waitForLeaderToSeeDownState(
-      CoreDescriptor descriptor, final String coreZkNodeName) {
+      CoreDescriptor descriptor, final String coreZkNodeName) throws SessionExpiredException {
     // try not to wait too long here - if we are waiting too long, we should probably
     // move along and join the election
 
@@ -1876,8 +1893,10 @@ public class ZkController implements Closeable {
         // go straight to zk, not the cloud state - we want current info
         leaderProps = getLeaderProps(collection, shard, 5000);
         break;
+      } catch (SessionExpiredException e) {
+        throw e;
       } catch (Exception e) {
-        SolrException.log(log, "There was a problem finding the leader in zk", e);
+        log.info("Did not find the leader in Zookeeper", e);
         try {
           Thread.sleep(2000);
         } catch (InterruptedException e1) {
@@ -1903,11 +1922,11 @@ public class ZkController implements Closeable {
             " is already less than leader, so not waiting for leader to see down state.");
       } else {
 
-        log.info("Replica need to wait for leader to see down state.");
+        log.info("replica={} is making a best effort attempt to wait for leader={} to see it's DOWN state.", myCoreNodeName, leaderProps.getCoreUrl());
 
         try (HttpSolrClient client = new Builder(leaderBaseUrl)
-            .withConnectionTimeout(15000)
-            .withSocketTimeout(120000)
+            .withConnectionTimeout(8000) // short timeouts, we may be in a storm and this is best effort and maybe we should be the leader now
+            .withSocketTimeout(30000)
             .build()) {
           WaitForState prepCmd = new WaitForState();
           prepCmd.setCoreName(leaderCoreName);
@@ -1915,9 +1934,8 @@ public class ZkController implements Closeable {
           prepCmd.setCoreNodeName(coreZkNodeName);
           prepCmd.setState(Replica.State.DOWN);
 
-          // let's retry a couple times - perhaps the leader just went down,
-          // or perhaps he is just not quite ready for us yet
-          retries = 2;
+          // lets give it another chance, but without taking too long
+          retries = 3;
           for (int i = 0; i < retries; i++) {
             if (isClosed) {
               throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
diff --git a/solr/core/src/test/org/apache/solr/cloud/ConnectionManagerTest.java b/solr/core/src/test/org/apache/solr/cloud/ConnectionManagerTest.java
index 6684d34..3fa2108 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ConnectionManagerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ConnectionManagerTest.java
@@ -19,6 +19,8 @@ package org.apache.solr.cloud;
 import java.io.IOException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
+
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.common.cloud.ConnectionManager;
@@ -150,7 +152,7 @@ public class ConnectionManagerTest extends SolrTestCaseJ4 {
     
     @Override
     public void reconnect(final String serverAddress, final int zkClientTimeout,
-        final Watcher watcher, final ZkUpdate updater) throws IOException {
+        final Watcher watcher, final ZkUpdate updater) throws IOException, InterruptedException, TimeoutException {
       
       if(called++ < 1) {
         exceptionThrown = true;
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index 6b5c72b..73670f0 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -83,6 +83,7 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher.Event;
 import org.apache.zookeeper.data.Stat;
@@ -914,7 +915,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
 
         mockController = new MockZKController(server.getZkAddress(), "node1", overseers);
 
-        TimeOut timeout = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+        TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
         while (!timeout.hasTimedOut()) {
           try {
             mockController.createCollection(COLLECTION, 1);
@@ -924,7 +925,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
           }
         }
 
-        timeout = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+        timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
         while (!timeout.hasTimedOut()) {
           try {
             mockController.publishState(COLLECTION, "core1", "node1", "shard1", Replica.State.ACTIVE,
@@ -955,7 +956,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
 
         mockController2 = new MockZKController(server.getZkAddress(), "node2", overseers);
 
-       timeout = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+       timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
         while (!timeout.hasTimedOut()) {
           try {
             mockController.publishState(COLLECTION, "core1", "node1", "shard1", Replica.State.ACTIVE,
@@ -969,7 +970,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
         verifyShardLeader(reader, COLLECTION, "shard1", "core1");
 
 
-        timeout = new TimeOut(5, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+        timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
         while (!timeout.hasTimedOut()) {
           try {
             mockController2.publishState(COLLECTION, "core4", "node2", "shard1", Replica.State.ACTIVE,
@@ -992,11 +993,13 @@ public class OverseerTest extends SolrTestCaseJ4 {
 
           ZkCoreNodeProps leaderProps;
           try {
-            leaderProps = zkController.getLeaderProps(COLLECTION, "shard1", 1000);
+            leaderProps = zkController.getLeaderProps(COLLECTION, "shard1", 1000, false);
           } catch (SolrException e) {
             return false;
           } catch (InterruptedException e) {
             throw new RuntimeException(e);
+          } catch (SessionExpiredException e) {
+            return false;
           }
           if (leaderProps.getCoreName().equals("core4")) {
             return true;
@@ -1428,7 +1431,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
     return zkClient;
   }
 
-  private ZkController createMockZkController(String zkAddress, SolrZkClient zkClient, ZkStateReader reader) throws InterruptedException, NoSuchFieldException, SecurityException {
+  private ZkController createMockZkController(String zkAddress, SolrZkClient zkClient, ZkStateReader reader) throws InterruptedException, NoSuchFieldException, SecurityException, SessionExpiredException {
     ZkController zkController = mock(ZkController.class);
 
     if (zkClient == null) {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java
index ac7449b..fb2af94 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/ConcurrentUpdateHttp2SolrClient.java
@@ -94,7 +94,7 @@ public class ConcurrentUpdateHttp2SolrClient extends SolrClient {
     public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
       boolean success = available.tryAcquire(timeout, unit);
       if (success) {
-        queue.offer(e);
+        queue.offer(e, timeout, unit);
       }
       return success;
     }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
index 5f93e30..23871a7 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
@@ -155,27 +155,27 @@ public class ConnectionManager implements Watcher {
                 public void update(SolrZooKeeper keeper) {
                   try {
                     waitForConnected(Long.MAX_VALUE);
+
+                    try {
+                      client.updateKeeper(keeper);
+                    } catch (InterruptedException e) {
+                      closeKeeper(keeper);
+                      Thread.currentThread().interrupt();
+                      // we must have been asked to stop
+                      throw new RuntimeException(e);
+                    }
+                    
+                    if (onReconnect != null) {
+                      onReconnect.command();
+                    }
+                    
                   } catch (Exception e1) {
+                    // if there was a problem creating the new SolrZooKeeper
+                    // or if we cannot run our reconnect command, close the keeper
+                    // our retry loop will try to create one again
                     closeKeeper(keeper);
                     throw new RuntimeException(e1);
                   }
-  
-                  log.info("Connection with ZooKeeper reestablished.");
-                  try {
-                    client.updateKeeper(keeper);
-                  } catch (InterruptedException e) {
-                    closeKeeper(keeper);
-                    Thread.currentThread().interrupt();
-                    // we must have been asked to stop
-                    throw new RuntimeException(e);
-                  } catch (Exception t) {
-                    closeKeeper(keeper);
-                    throw new RuntimeException(t);
-                  }
-  
-                  if (onReconnect != null) {
-                    onReconnect.command();
-                  }
                 }
               });
           
@@ -231,7 +231,7 @@ public class ConnectionManager implements Watcher {
 
   public synchronized void waitForConnected(long waitForConnection)
       throws TimeoutException {
-    log.debug("Waiting for client to connect to ZooKeeper");
+    log.info("Waiting for client to connect to ZooKeeper");
     long expire = System.nanoTime() + TimeUnit.NANOSECONDS.convert(waitForConnection, TimeUnit.MILLISECONDS);
     long left = 1;
     while (!connected && left > 0) {
@@ -249,7 +249,7 @@ public class ConnectionManager implements Watcher {
     if (!connected) {
       throw new TimeoutException("Could not connect to ZooKeeper " + zkServerAddress + " within " + waitForConnection + " ms");
     }
-    log.debug("Client is connected to ZooKeeper");
+    log.info("Client is connected to ZooKeeper");
   }
 
   public synchronized void waitForDisconnected(long timeout)
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java
index 2ed88e2..85c4b11 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DefaultConnectionStrategy.java
@@ -21,7 +21,6 @@ import java.lang.invoke.MethodHandles;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.solr.common.AlreadyClosedException;
-import org.apache.solr.common.SolrException;
 import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,7 +48,7 @@ public class DefaultConnectionStrategy extends ZkClientConnectionStrategy {
 
   @Override
   public void reconnect(final String serverAddress, final int zkClientTimeout,
-      final Watcher watcher, final ZkUpdate updater) throws IOException {
+      final Watcher watcher, final ZkUpdate updater) throws IOException, InterruptedException, TimeoutException {
     log.warn("Connection expired - starting a new one...");
     SolrZooKeeper zk = createSolrZooKeeper(serverAddress, zkClientTimeout, watcher);
     boolean success = false;
@@ -60,9 +59,6 @@ public class DefaultConnectionStrategy extends ZkClientConnectionStrategy {
       log.info("Reconnected to ZooKeeper");
     } catch (AlreadyClosedException e) {
 
-    } catch (Exception e) {
-      SolrException.log(log, "Reconnect to ZooKeeper failed", e);
-      log.warn("Reconnect to ZooKeeper failed");
     } finally {
       if (!success) {
         try {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/OnReconnect.java b/solr/solrj/src/java/org/apache/solr/common/cloud/OnReconnect.java
index 46aed08..4c86374 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/OnReconnect.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/OnReconnect.java
@@ -16,6 +16,8 @@
  */
 package org.apache.solr.common.cloud;
 
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+
 /**
  * Implementations are expected to implement a correct hashCode and equals
  * method needed to uniquely identify the listener as listeners are managed
@@ -24,5 +26,5 @@ package org.apache.solr.common.cloud;
  * when it no longer needs to be notified of ZK reconnection events.
  */
 public interface OnReconnect {
-  void command();
+  void command() throws SessionExpiredException;
 }