You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/10/28 21:02:55 UTC

[lucene-solr] 02/03: @1058 Harden.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit e33c44705e8cf9b459adee7620f48cfd515a1971
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Oct 28 15:29:05 2020 -0500

    @1058 Harden.
---
 .../src/java/org/apache/solr/cloud/Overseer.java   | 24 +++++++++++++++++-----
 .../apache/solr/cloud/OverseerElectionContext.java | 22 +++++++-------------
 .../solr/cloud/overseer/CollectionMutator.java     |  8 ++++----
 .../apache/solr/cloud/CollectionsAPISolrJTest.java |  5 ++++-
 .../src/java/org/apache/solr/common/ParWork.java   |  2 +-
 .../solr/common/cloud/ConnectionManager.java       | 20 ++++++++++++------
 6 files changed, 49 insertions(+), 32 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index d3af42f..7dd0116 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -237,7 +237,7 @@ public class Overseer implements SolrCloseable {
         // we do not sure which message is bad message, therefore we will re-process node one by one
         int fallbackQueueSize = Integer.MAX_VALUE;
         ZkDistributedQueue fallbackQueue = workQueue;
-        while (!isClosed()) {
+        while (!checkClosed()) {
           if (zkStateWriter == null) {
             try {
               zkStateWriter = new ZkStateWriter(reader, stats);
@@ -282,7 +282,6 @@ public class Overseer implements SolrCloseable {
               }
               // force flush at the end of the loop, if there are no pending updates, this is a no op call
               clusterState = zkStateWriter.writePendingUpdates(clusterState);
-              assert clusterState != null;
               // the workQueue is empty now, use stateUpdateQueue as fallback queue
               fallbackQueue = stateUpdateQueue;
               fallbackQueueSize = 0;
@@ -315,6 +314,10 @@ public class Overseer implements SolrCloseable {
 //            }
             queue = new LinkedList<>(stateUpdateQueue.peekElements(1000, wait, (x) -> true));
           } catch (AlreadyClosedException e) {
+            if (isClosed()) {
+              log.info("Overseer closed (AlreadyClosedException), exiting loop");
+              return;
+            }
             return;
           } catch (KeeperException.SessionExpiredException e) {
             log.warn("Solr cannot talk to ZK, exiting Overseer work queue loop", e);
@@ -350,7 +353,10 @@ public class Overseer implements SolrCloseable {
                   processedNodes.clear();
                 });
               }
-              if (isClosed()) return;
+              if (isClosed()) {
+                log.info("Overseer closed, exiting loop");
+                return;
+              }
               // if an event comes in the next *ms batch it together
               int wait = 0;
 //              if (zkStateWriter.getUpdatesToWrite().isEmpty()) {
@@ -359,7 +365,7 @@ public class Overseer implements SolrCloseable {
 //                wait = 0;
 //              }
               queue = new LinkedList<>(stateUpdateQueue.peekElements(100, wait, node -> !processedNodes.contains(node)));
-              if (loopCnt >= 1) {
+              if (loopCnt >= 3) {
                 break;
               }
               loopCnt++;
@@ -399,6 +405,14 @@ public class Overseer implements SolrCloseable {
       }
     }
 
+    private boolean checkClosed() {
+      boolean closed = isClosed();
+      if (closed) {
+        log.info("Overseer is closed, will not continue loop ...");
+      }
+      return closed;
+    }
+
     // Return true whenever the exception thrown by ZkStateWriter is correspond
     // to a invalid state or 'bad' message (in this case, we should remove that message from queue)
     private boolean isBadMessage(Exception e) {
@@ -673,11 +687,11 @@ public class Overseer implements SolrCloseable {
   }
 
   public synchronized void start(String id, ElectionContext context) throws KeeperException {
+    doClose();
     if (getCoreContainer().isShutDown() || closeAndDone) {
       if (log.isDebugEnabled()) log.debug("Already closed, exiting");
       return;
     }
-    doClose();
     closed = false;
 
     MDCLoggingContext.setNode(zkController == null ?
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
index 2c811c6..6913b48 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
@@ -76,22 +76,14 @@ final class OverseerElectionContext extends ShardLeaderElectionContextBase {
 
     log.info("Registered as Overseer leader, starting Overseer ...");
 
-    synchronized (this) {
-      if (isClosed()) {
-        log.info("Bailing on becoming leader, we are closed");
-        return;
-      }
-      if (!isClosed() && !overseer.getZkController().getCoreContainer().isShutDown() && !overseer.isDone() && (overseer.getUpdaterThread() == null || !overseer.getUpdaterThread().isAlive())) {
-
-        try {
-          overseer.start(id, context);
-        } finally {
-          if (isClosed()) {
-            overseer.close();
-          }
-        }
-      }
+    if (isClosed()) {
+      log.info("Bailing on becoming leader, we are closed");
+      return;
     }
+    if (!isClosed() && !overseer.getZkController().getCoreContainer().isShutDown() && !overseer.isDone() && (overseer.getUpdaterThread() == null || !overseer.getUpdaterThread().isAlive())) {
+      overseer.start(id, context);
+    }
+
   }
 
   private void clearQueue(ZkDistributedQueue queue)
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
index a64cee0..83c2331 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
@@ -91,11 +91,11 @@ public class CollectionMutator {
       // TODO - fix, no makePath (ensure every path part exists), async, single node
       try {
         stateManager.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collectionName
-            + "/leader_elect/" + shardId);
+            + "/leader_elect/" + shardId, null, CreateMode.PERSISTENT, false);
         stateManager.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collectionName
-            + "/leader_elect/" + shardId + LeaderElector.ELECTION_NODE);
+            + "/leader_elect/" + shardId + LeaderElector.ELECTION_NODE, null, CreateMode.PERSISTENT, false);
         stateManager.makePath(ZkStateReader.COLLECTIONS_ZKNODE+ "/" + collectionName + "/" + shardId
-            + ZkStateReader.SHARD_LEADERS_ZKNODE);
+            + ZkStateReader.SHARD_LEADERS_ZKNODE, null, CreateMode.PERSISTENT, false);
 
         stateManager.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collectionName + "/" +  shardId, null, CreateMode.PERSISTENT, false);
         stateManager.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collectionName + "/leader_elect/" +  shardId, null, CreateMode.PERSISTENT, false);
@@ -104,7 +104,7 @@ public class CollectionMutator {
         stateManager.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collectionName + "/terms/" +  shardId, ZkStateReader.emptyJson, CreateMode.PERSISTENT, false);
         stateManager.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collectionName + "/schema_lock", null, CreateMode.PERSISTENT, false);
       } catch (AlreadyExistsException e) {
-        throw new AlreadyClosedException();
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
       } catch (IOException e) {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
       } catch (KeeperException e) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
index 5c09ae8..f4c851b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
@@ -351,6 +351,8 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
       return true;
     });
 
+    cluster.waitForActiveCollection(collectionName, 3, 3);
+
     // Test splitting using split.key
     response = CollectionAdminRequest.splitShard(collectionName)
         .setSplitKey("b!")
@@ -359,7 +361,8 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
     assertEquals(0, response.getStatus());
     assertTrue(response.isSuccess());
 
-    waitForState("Expected 5 slices to be active", collectionName, (n, c) -> c.getActiveSlices().size() == 5);
+    // wait for 5 active shards
+    cluster.waitForActiveCollection(collectionName, 5, 5);
 
   }
 
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index 07e35a2..69125c8 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -640,7 +640,7 @@ public class ParWork implements Closeable {
       Thread.currentThread().interrupt();
     } else {
       if (infoLogMsg) {
-        log.info(t.getMessage());
+        log.info(t.getClass().getName() + " " + t.getMessage());
       } else {
         log.warn("Solr ran into an unexpected exception", t);
       }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
index b2bd431..55254e2 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ConnectionManager.java
@@ -22,6 +22,7 @@ import java.lang.invoke.MethodHandles;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.ParWork;
@@ -60,7 +61,7 @@ public class ConnectionManager implements Watcher, Closeable {
 
   private volatile boolean isClosed = false;
 
-  private final Object keeperLock = new Object();
+  private final ReentrantLock keeperLock = new ReentrantLock(true);
 
   private volatile CountDownLatch connectedLatch = new CountDownLatch(1);
   private volatile CountDownLatch disconnectedLatch = new CountDownLatch(1);
@@ -82,12 +83,15 @@ public class ConnectionManager implements Watcher, Closeable {
   }
 
   public ZooKeeper getKeeper() {
-    synchronized (keeperLock) {
+    keeperLock.lock();
+    try {
       SolrZooKeeper rKeeper = keeper;
       if (rKeeper == null) {
         throw new AlreadyClosedException();
       }
       return rKeeper;
+    } finally {
+      keeperLock.unlock();
     }
   }
 
@@ -189,14 +193,16 @@ public class ConnectionManager implements Watcher, Closeable {
   }
 
   private void updatezk() throws IOException {
-    synchronized (keeperLock) {
+    keeperLock.lock();
+    try {
       if (isClosed()) return;
       if (keeper != null) {
         ParWork.close(keeper);
-        keeper = null;
       }
       SolrZooKeeper zk = createSolrZooKeeper(zkServerAddress, zkTimeout, this);
       keeper = zk;
+    } finally {
+      keeperLock.unlock();
     }
   }
 
@@ -265,7 +271,8 @@ public class ConnectionManager implements Watcher, Closeable {
       }
     }
 
-    synchronized (keeperLock) {
+    keeperLock.lock();
+    try {
       if (isClosed()) return;
       if (keeper != null) {
         // if there was a problem creating the new SolrZooKeeper
@@ -281,6 +288,8 @@ public class ConnectionManager implements Watcher, Closeable {
         }
       }
 
+    } finally {
+      keeperLock.unlock();
     }
 
     do {
@@ -351,7 +360,6 @@ public class ConnectionManager implements Watcher, Closeable {
     if (keeper != null) {
       keeper.close();
     }
-    keeper = null;
 
     ExecutorUtil.awaitTermination(client.zkCallbackSerialExecutor);
     ExecutorUtil.awaitTermination(client.zkCallbackExecutor);