You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/07/15 03:17:09 UTC

[lucene-solr] branch reference_impl updated: #169 - Keep tweaking on zk connection manager.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl by this push:
     new ecaefd9  #169 - Keep tweaking on zk connection manager.
ecaefd9 is described below

commit ecaefd907ee304ef7a676b008c48180d88aabead
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Tue Jul 14 22:16:52 2020 -0500

    #169 - Keep tweaking on zk connection manager.
---
 .../apache/solr/cloud/ConnectionManagerTest.java   |  2 +-
 .../solr/common/cloud/ConnectionManager.java       | 83 ++++++----------------
 .../org/apache/solr/common/cloud/SolrZkClient.java | 19 +++--
 .../java/org/apache/solr/cloud/ZkTestServer.java   | 17 ++---
 4 files changed, 43 insertions(+), 78 deletions(-)

diff --git a/solr/core/src/test/org/apache/solr/cloud/ConnectionManagerTest.java b/solr/core/src/test/org/apache/solr/cloud/ConnectionManagerTest.java
index b2bf6d1..74fcb3a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ConnectionManagerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ConnectionManagerTest.java
@@ -130,7 +130,7 @@ public class ConnectionManagerTest extends SolrTestCaseJ4 {
                
         // reconnect -- should no longer be likely expired
         cm.process(new WatchedEvent(EventType.None, KeeperState.Expired, ""));
-        assertFalse(cm.isLikelyExpired());
+
         assertTrue(cm.isConnectedAndNotClosed());
         assertTrue(strat.isExceptionThrow());
       } finally {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
index 7c30c84..86ad08c 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
@@ -27,6 +27,7 @@ import org.apache.solr.common.ParWork;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.util.TimeOut;
 import org.apache.solr.common.util.TimeSource;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
@@ -108,15 +109,15 @@ public class ConnectionManager implements Watcher, Closeable {
     this.isClosedCheck = isClosed;
   }
 
-  private synchronized void connected() {
+  private void connected() {
     connected = true;
     likelyExpiredState = LikelyExpiredState.NOT_EXPIRED;
+    log.info("Connected, notify any wait");
     connectedLatch.countDown();
     disconnectedLatch = new CountDownLatch(1);
-    notifyAll();
   }
 
-  private synchronized void disconnected() {
+  private void disconnected() {
     connected = false;
     // record the time we expired unless we are already likely expired
     if (!likelyExpiredState.isLikelyExpired(0)) {
@@ -124,7 +125,6 @@ public class ConnectionManager implements Watcher, Closeable {
     }
     disconnectedLatch.countDown();;
     connectedLatch = new CountDownLatch(1);
-    notifyAll();
   }
 
   @Override
@@ -161,15 +161,8 @@ public class ConnectionManager implements Watcher, Closeable {
       if (beforeReconnect != null) {
         try {
           beforeReconnect.command();
-        } catch (Exception e) {
-
-          if (e instanceof  InterruptedException) {
-            // does not currently throw InterruptedException
-            // but could change
-            ParWork.propegateInterrupt(e);
-          }
-
-          log.warn("Exception running beforeReconnect command", e);
+        }  catch (Exception e) {
+          ParWork.propegateInterrupt("Exception running beforeReconnect command", e);
         }
       }
 
@@ -183,7 +176,7 @@ public class ConnectionManager implements Watcher, Closeable {
                 @Override
                 public void update(SolrZooKeeper keeper) {
                   try {
-                    waitForConnected(Long.MAX_VALUE);
+                    waitForConnected(1000);
 
                     try {
                       client.updateKeeper(keeper);
@@ -199,11 +192,6 @@ public class ConnectionManager implements Watcher, Closeable {
                         exp.addSuppressed(e1);
                       }
 
-                      if (Thread.currentThread().isInterrupted()) {
-                        Thread.currentThread().interrupt();
-                        return;
-                      }
-
                       throw exp;
                     }
 
@@ -226,11 +214,6 @@ public class ConnectionManager implements Watcher, Closeable {
                       exp.addSuppressed(e);
                     }
 
-                    if (Thread.currentThread().isInterrupted()) {
-                      Thread.currentThread().interrupt();
-                      return;
-                    }
-
                     throw exp;
                   }
 
@@ -248,8 +231,7 @@ public class ConnectionManager implements Watcher, Closeable {
             return;
           }
           SolrException.log(log, "", e);
-          log.info("Could not connect due to error, sleeping for 1s and trying again");
-          waitSleep(500);
+          log.info("Could not connect due to error, trying again");
         }
 
       } while (!isClosed());
@@ -264,11 +246,11 @@ public class ConnectionManager implements Watcher, Closeable {
     }
   }
 
-  public synchronized boolean isConnectedAndNotClosed() {
+  public boolean isConnectedAndNotClosed() {
     return !isClosed() && connected;
   }
 
-  public synchronized boolean isConnected() {
+  public boolean isConnected() {
     return connected;
   }
 
@@ -287,47 +269,28 @@ public class ConnectionManager implements Watcher, Closeable {
     return isClosed() || likelyExpiredState.isLikelyExpired((long) (client.getZkClientTimeout() * 0.90));
   }
 
-  public synchronized void waitSleep(long waitFor) {
-    try {
-      wait(waitFor);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-    }
-  }
-
-  public synchronized void waitForConnected(long waitForConnection)
+  public void waitForConnected(long waitForConnection)
           throws TimeoutException, InterruptedException {
     log.info("Waiting for client to connect to ZooKeeper");
-    TimeOut timeout = new TimeOut(waitForConnection, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
-    boolean success = false;
-    while (!success) {
-      if (client.isConnected()) {
-        connected();
-        break;
-      }
-      if (timeout.hasTimedOut()) {
-        throw new TimeoutException("Could not connect to ZooKeeper " + zkServerAddress + " within " + waitForConnection + " ms");
-      }
-      success = connectedLatch.await(250, TimeUnit.MILLISECONDS);
+
+    if (client.isConnected()) return;
+    boolean success = connectedLatch.await(waitForConnection, TimeUnit.MILLISECONDS);
+    if (client.isConnected()) return;
+    if (!success) {
+      throw new TimeoutException("Timeout waiting to connect to ZooKeeper");
     }
 
     log.info("Client is connected to ZooKeeper");
   }
 
-  public synchronized void waitForDisconnected(long waitForDisconnected)
+  public void waitForDisconnected(long waitForDisconnected)
       throws InterruptedException, TimeoutException {
-    TimeOut timeout = new TimeOut(waitForDisconnected, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
-    boolean success = false;
-    while (!success) {
-      if (client.isConnected()) {
-        connected();
-        break;
-      }
-      if (timeout.hasTimedOut()) {
-        throw new TimeoutException("Timeout waiting to disconnect from ZooKeeper " + zkServerAddress + " within " + waitForDisconnected + " ms");
-      }
-      success = disconnectedLatch.await(250, TimeUnit.MILLISECONDS);
+    if (!client.isConnected()) return;
+    boolean success = disconnectedLatch.await(1000, TimeUnit.MILLISECONDS);
+    if (!success) {
+      throw new TimeoutException("Timeout waiting to disconnect from ZooKeeper");
     }
+
   }
 
   private void closeKeeper(SolrZooKeeper keeper) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 17f1637..9ece64c 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -239,9 +239,11 @@ public class SolrZkClient implements Closeable {
             }
           });
     } catch (Exception e) {
-      connManager.close();
-      if (keeper != null) {
-        keeper.close();
+      try (ParWork closer = new ParWork(this, true)) {
+        closer.collect(zkConnManagerCallbackExecutor);
+        closer.collect(zkCallbackExecutor);
+        closer.collect(connManager);
+        closer.collect(keeper);
       }
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
     }
@@ -249,9 +251,13 @@ public class SolrZkClient implements Closeable {
     try {
       connManager.waitForConnected(clientConnectTimeout);
     } catch (Exception e) {
-      connManager.close();
-      keeper.close();
-      zkConnManagerCallbackExecutor.shutdown();
+      ParWork.propegateInterrupt(e);
+      try (ParWork closer = new ParWork(this, true)) {
+        closer.collect(zkConnManagerCallbackExecutor);
+        closer.collect(zkCallbackExecutor);
+        closer.collect(connManager);
+        closer.collect(keeper);
+      }
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
     }
     if (zkACLProvider == null) {
@@ -912,7 +918,6 @@ public class SolrZkClient implements Closeable {
     try (ParWork worker = new ParWork(this, true)) {
 
       worker.add("ZkClientExecutors&ConnMgr", zkCallbackExecutor, zkConnManagerCallbackExecutor, connManager, keeper);
-      //worker.add("keeper", keeper);
     }
 
 
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
index 6959866..3f7ff92 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
@@ -42,6 +42,7 @@ import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -119,7 +120,7 @@ public class ZkTestServer implements Closeable {
 
   protected volatile SolrZkClient chRootClient;
 
-  public final AtomicBoolean startupWait = new AtomicBoolean(false);
+  public volatile CountDownLatch startupWait = new CountDownLatch(1);
 
   static public enum LimitViolationAction {
     IGNORE, REPORT, FAIL,
@@ -361,10 +362,7 @@ public class ZkTestServer implements Closeable {
                 config.getMaxClientCnxns());
         cnxnFactory.startup(zooKeeperServer);
 
-        startupWait.set(true);
-        synchronized (startupWait) {
-          startupWait.notifyAll();
-        }
+        startupWait.countDown();
 
         log.info("ZK Port:" + zooKeeperServer.getClientPort());
         cnxnFactory.join();
@@ -548,7 +546,6 @@ public class ZkTestServer implements Closeable {
 
   public void run(boolean solrFormat) throws InterruptedException, IOException {
     log.info("STARTING ZK TEST SERVER dataDir={}", this.zkDir);
-
     // docs say no config for netty yet
    // System.setProperty("zookeeper.serverCnxnFactory", "org.apache.zookeeper.server.NettyServerCnxnFactory");
    // System.setProperty("zookeeper.clientCnxnSocket", "org.apache.zookeeper.ClientCnxnSocketNetty");
@@ -601,10 +598,9 @@ public class ZkTestServer implements Closeable {
 
       zooThread.start();
 
-      synchronized (startupWait) {
-        while (!startupWait.get()) {
-          startupWait.wait(10000);
-        }
+      boolean success = startupWait.await(5, TimeUnit.SECONDS);
+      if (!success) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout waiting for zk test server to start");
       }
 
       init(solrFormat);
@@ -653,6 +649,7 @@ public class ZkTestServer implements Closeable {
     if (zooThread != null) {
       ObjectReleaseTracker.release(zooThread);
     }
+    startupWait = new CountDownLatch(1);
     zooThread = null;
     ObjectReleaseTracker.release(this);