You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/10/29 04:18:33 UTC

[lucene-solr] 01/02: @1064 Harden.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 07243eb28bd271d9c3aab643a1e011d1eaff54a5
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Oct 28 22:21:33 2020 -0500

    @1064 Harden.
---
 .../src/java/org/apache/solr/cloud/Overseer.java   | 18 ++++-----------
 .../apache/solr/cloud/OverseerTaskProcessor.java   |  2 +-
 .../org/apache/solr/cloud/ZkDistributedQueue.java  | 11 ++++++---
 .../apache/solr/cloud/overseer/ZkStateWriter.java  |  3 ++-
 .../java/org/apache/solr/update/UpdateHandler.java | 13 +++++++++++
 .../src/java/org/apache/solr/update/UpdateLog.java |  4 ++--
 .../java/org/apache/solr/update/VersionInfo.java   | 27 +++++++++++-----------
 .../org/apache/solr/cloud/TestCloudRecovery2.java  | 27 ++++++++++------------
 .../org/apache/solr/cloud/TestPrepRecovery.java    |  9 ++------
 .../collections/CollectionTooManyReplicasTest.java | 15 ++++--------
 .../apache/solr/common/cloud/ZkStateReader.java    |  8 -------
 .../solrj/impl/CloudHttp2SolrClientTest.java       |  2 +-
 .../apache/solr/cloud/MiniSolrCloudCluster.java    | 14 ++++++++---
 13 files changed, 73 insertions(+), 80 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index e13265f..287cd6b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -252,7 +252,7 @@ public class Overseer implements SolrCloseable {
                 // force flush to ZK after each message because there is no fallback if workQueue items
                 // are removed from workQueue but fail to be written to ZK
                 try {
-                  processQueueItem(message, clusterState, zkStateWriter, false, null);
+                  processQueueItem(message, reader.getClusterState(), zkStateWriter, false, null);
                 } catch (InterruptedException | AlreadyClosedException e) {
                   ParWork.propagateInterrupt(e);
                   return;
@@ -306,12 +306,7 @@ public class Overseer implements SolrCloseable {
           try {
             // We do not need to filter any nodes here cause all processed nodes are removed once we flush clusterstate
 
-            long wait = 10000;
-//            if (zkStateWriter.getUpdatesToWrite().isEmpty()) {
-//              wait = 100;
-//            } else {
-//              wait = 0;
-//            }
+            long wait = 5000;
             queue = new LinkedList<>(stateUpdateQueue.peekElements(1000, wait, (x) -> true));
           } catch (AlreadyClosedException e) {
             if (isClosed()) {
@@ -358,14 +353,9 @@ public class Overseer implements SolrCloseable {
                 return;
               }
               // if an event comes in the next *ms batch it together
-              int wait = 0;
-//              if (zkStateWriter.getUpdatesToWrite().isEmpty()) {
-//                wait = 10000;
-//              } else {
-//                wait = 0;
-//              }
+              int wait = 10;
               queue = new LinkedList<>(stateUpdateQueue.peekElements(100, wait, node -> !processedNodes.contains(node)));
-              if (loopCnt >= 3) {
+              if (loopCnt >= 1) {
                 break;
               }
               loopCnt++;
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
index 8be9b40..066d3aa 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
@@ -320,7 +320,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
     Set<Map.Entry<String, QueueEvent>> entrySet = completedTasks.entrySet();
     AtomicBoolean sessionExpired = new AtomicBoolean();
     AtomicBoolean interrupted = new AtomicBoolean();
-    try (ParWork work = new ParWork(this)) {
+    try (ParWork work = new ParWork(this, true, true)) {
       for (Map.Entry<String, QueueEvent> entry : entrySet) {
         work.collect("cleanWorkQueue", ()->{
           try {
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
index 33cf8bf..c8da9c4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
@@ -478,9 +478,7 @@ public class ZkDistributedQueue implements DistributedQueue {
             break;
           }
 
-          TreeSet<String> existingChildren = knownChildren;
-
-          while (existingChildren == knownChildren && existingChildren.size() == 0) {
+          while (foundChildren.size() == 0) {
             try {
               changed.await(250, TimeUnit.MILLISECONDS);
             } catch (InterruptedException e) {
@@ -490,6 +488,13 @@ public class ZkDistributedQueue implements DistributedQueue {
             if (timeout.hasTimedOut() || zookeeper.isClosed() || !zookeeper.isConnected()) {
               return Collections.emptyList();
             }
+
+            for (String child : knownChildren) {
+              if (acceptFilter.test(child)) {
+                foundChildren.add(child);
+              }
+            }
+
           }
         } finally {
           if (updateLock.isHeldByCurrentThread()) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 403d0cd..1233de6 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -297,7 +297,7 @@ public class ZkStateWriter {
                 //              }
 
                 if (col != null && col.getZNodeVersion() > finalPrevVersion) {
-                  if (log.isDebugEnabled()) log.debug("Waited for ver: {}", col.getZNodeVersion());
+                  if (log.isDebugEnabled()) log.debug("Waited for ver: {}", col.getZNodeVersion() + 1);
                   // System.out.println("found the version");
                   return true;
                 }
@@ -315,6 +315,7 @@ public class ZkStateWriter {
 
 
       if (failedUpdates.size() > 0) {
+        log.warn("Some collection updates failed {} logging last exception", failedUpdates, lastFailedException);
         failedUpdates.clear();
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, lastFailedException);
       }
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
index bc9002d..bc41035 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateHandler.java
@@ -160,6 +160,19 @@ UpdateHandler implements SolrInfoBean, Closeable {
         }
         ourUpdateLog.init(ulogPluginInfo);
         ourUpdateLog.init(this, core);
+      } else if (updateLog == null && !skipUpdateLog && core.getCoreContainer().isZooKeeperAware()) {
+        DirectoryFactory dirFactory = core.getDirectoryFactory();
+
+        ourUpdateLog = new UpdateLog();
+
+        if (!core.isReloaded() && !dirFactory.isPersistent()) {
+          ourUpdateLog.clearLog(core, ulogPluginInfo);
+        }
+
+        if (log.isInfoEnabled()) {
+          log.info("Using UpdateLog implementation: {}", ourUpdateLog.getClass().getName());
+        }
+        ourUpdateLog.init(this, core);
       } else {
         ourUpdateLog = updateLog;
       }
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index fb737f4..4f126be 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -203,8 +203,8 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
   protected final int numDeletesToKeep = 1000;
   protected final int numDeletesByQueryToKeep = 100;
   protected int numRecordsToKeep;
-  protected int maxNumLogsToKeep;
-  protected int numVersionBuckets; // This should only be used to initialize VersionInfo... the actual number of buckets may be rounded up to a power of two.
+  protected volatile int maxNumLogsToKeep;
+  protected volatile int numVersionBuckets = 65536; // This should only be used to initialize VersionInfo... the actual number of buckets may be rounded up to a power of two.
   protected Long maxVersionFromIndex = null;
   protected boolean existOldBufferLog = false;
 
diff --git a/solr/core/src/java/org/apache/solr/update/VersionInfo.java b/solr/core/src/java/org/apache/solr/update/VersionInfo.java
index efa3676..cbc9a4e 100644
--- a/solr/core/src/java/org/apache/solr/update/VersionInfo.java
+++ b/solr/core/src/java/org/apache/solr/update/VersionInfo.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.Map;
 import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.lucene.index.LeafReader;
@@ -199,35 +200,33 @@ public class VersionInfo {
     // int h = hash + (hash >>> 8) + (hash >>> 16) + (hash >>> 24);
     // Assume good hash codes for now.
     int slot;
-    buckUpdateLock.readLock().lock();
-    VersionBucket bucket;
+    buckUpdateLock.writeLock().lock();
     try {
+      VersionBucket bucket;
+
       slot = hash & (buckets.length - 1);
       bucket = buckets[slot];
-    } finally {
-      buckUpdateLock.readLock().unlock();
-    }
 
-    if (bucket == null) {
-      buckUpdateLock.writeLock().lock();
-      try {
+      if (bucket == null) {
+
         bucket = buckets[slot];
         if (bucket == null) {
 
           if (versionBucketLockTimeoutMs > 0) {
-            bucket= new TimedVersionBucket();
+            bucket = new TimedVersionBucket();
           } else {
-            bucket= new VersionBucket();
+            bucket = new VersionBucket();
           }
           bucket.updateHighest(highestVersion);
           buckets[slot] = bucket;
         }
-      } finally {
-        buckUpdateLock.writeLock().unlock();
+
       }
-    }
 
-    return bucket;
+      return bucket;
+    } finally {
+      buckUpdateLock.writeLock().unlock();
+    }
   }
 
   public Long lookupVersion(BytesRef idBytes) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java b/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java
index 3aa4c43..b355d63 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java
@@ -48,7 +48,7 @@ public class TestCloudRecovery2 extends SolrCloudTestCase {
 
     CollectionAdminRequest
         .createCollection(COLLECTION, "config", 1,2)
-        .setMaxShardsPerNode(2)
+        .setMaxShardsPerNode(100)
         .process(cluster.getSolrClient());
   }
 
@@ -59,8 +59,6 @@ public class TestCloudRecovery2 extends SolrCloudTestCase {
     try (Http2SolrClient client1 = SolrTestCaseJ4.getHttpSolrClient(node1.getBaseUrl().toString())) {
 
       node2.stop();
-      cluster.waitForJettyToStop(node2);
-      waitForState("", COLLECTION, (liveNodes, collectionState) -> liveNodes.size() == 1);
 
       UpdateRequest req = new UpdateRequest();
       for (int i = 0; i < 100; i++) {
@@ -69,8 +67,8 @@ public class TestCloudRecovery2 extends SolrCloudTestCase {
       req.commit(client1, COLLECTION);
 
       node2.start();
-      cluster.waitForNode(node2, 10);
-      waitForState("", COLLECTION, clusterShape(1, 2));
+
+      cluster.waitForActiveCollection(COLLECTION, 1, 2);
 
       try (Http2SolrClient client = SolrTestCaseJ4.getHttpSolrClient(node2.getBaseUrl().toString())) {
         long numFound = client.query(COLLECTION, new SolrQuery("q","*:*", "distrib", "false")).getResults().getNumFound();
@@ -91,8 +89,7 @@ public class TestCloudRecovery2 extends SolrCloudTestCase {
 
       //
       node2.stop();
-      cluster.waitForJettyToStop(node2);
-      waitForState("", COLLECTION, (liveNodes, collectionState) -> liveNodes.size() == 1);
+
 
       new UpdateRequest().add("id", "1", "num", "20")
           .commit(client1, COLLECTION);
@@ -100,7 +97,9 @@ public class TestCloudRecovery2 extends SolrCloudTestCase {
       assertEquals("20", v.toString());
 
       node2.start();
-      cluster.waitForNode(node2, 10);
+
+      cluster.waitForActiveCollection(COLLECTION, 1, 2);
+
       waitForState("", COLLECTION, clusterShape(1, 2));
       try (Http2SolrClient client = SolrTestCaseJ4.getHttpSolrClient(node2.getBaseUrl().toString())) {
         v = client.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num");
@@ -108,8 +107,6 @@ public class TestCloudRecovery2 extends SolrCloudTestCase {
       }
 
       node2.stop();
-      cluster.waitForJettyToStop(node2);
-      waitForState("", COLLECTION, (liveNodes, collectionState) -> liveNodes.size() == 1);
 
       new UpdateRequest().add("id", "1", "num", "30")
           .commit(client1, COLLECTION);
@@ -117,8 +114,8 @@ public class TestCloudRecovery2 extends SolrCloudTestCase {
       SolrTestCaseJ4.      assertEquals("30", v.toString());
 
       node2.start();
-      cluster.waitForNode(node2, 10);
-      waitForState("", COLLECTION, clusterShape(1, 2));
+
+      cluster.waitForActiveCollection(COLLECTION, 1, 2);
 
       try (Http2SolrClient client = SolrTestCaseJ4.getHttpSolrClient(node2.getBaseUrl().toString())) {
         v = client.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num");
@@ -129,15 +126,15 @@ public class TestCloudRecovery2 extends SolrCloudTestCase {
     }
 
     node1.stop();
-    cluster.waitForJettyToStop(node1);
     waitForState("", COLLECTION, (liveNodes, collectionState) -> {
       Replica leader = collectionState.getLeader("shard1");
       return leader != null && leader.getNodeName().equals(node2.getNodeName());
     });
 
     node1.start();
-    cluster.waitForNode(node1, 10);
-    waitForState("", COLLECTION, clusterShape(1, 2));
+
+    cluster.waitForActiveCollection(COLLECTION, 1, 2);
+
     try (Http2SolrClient client = SolrTestCaseJ4.getHttpSolrClient(node1.getBaseUrl().toString())) {
       Object v = client.query(COLLECTION, new SolrQuery("q","id:1", "distrib", "false")).getResults().get(0).get("num");
       assertEquals("30", v.toString());
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestPrepRecovery.java b/solr/core/src/test/org/apache/solr/cloud/TestPrepRecovery.java
index f6fae16..8f90b77 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestPrepRecovery.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestPrepRecovery.java
@@ -17,17 +17,13 @@
 
 package org.apache.solr.cloud;
 
-import java.util.concurrent.TimeUnit;
-
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.util.TestInjection;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 /**
@@ -62,11 +58,9 @@ public class TestPrepRecovery extends SolrCloudTestCase {
 
     String collectionName = "testLeaderUnloaded";
     CollectionAdminRequest.createCollection(collectionName, 1, 2)
+        .setMaxShardsPerNode(100)
         .process(solrClient);
 
-    waitForState("Expected collection: testLeaderUnloaded to be live with 1 shard and 2 replicas",
-        collectionName, clusterShape(1, 2));
-
     JettySolrRunner newNode = cluster.startJettySolrRunner();
 
     String newNodeName = newNode.getNodeName();
@@ -97,6 +91,7 @@ public class TestPrepRecovery extends SolrCloudTestCase {
 
     String collectionName = "testLeaderNotResponding";
     CollectionAdminRequest.createCollection(collectionName, 1, 1)
+        .setMaxShardsPerNode(100)
         .process(solrClient);
 
     TestInjection.prepRecoveryOpPauseForever = "true:100";
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionTooManyReplicasTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionTooManyReplicasTest.java
index 4352bf8..99fa3d2 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionTooManyReplicasTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionTooManyReplicasTest.java
@@ -16,10 +16,6 @@
  */
 package org.apache.solr.cloud.api.collections;
 
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -28,11 +24,13 @@ import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.zookeeper.KeeperException;
-import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
 @Slow
 public class CollectionTooManyReplicasTest extends SolrCloudTestCase {
 
@@ -43,11 +41,6 @@ public class CollectionTooManyReplicasTest extends SolrCloudTestCase {
         .configure();
   }
 
-  @Before
-  public void deleteCollections() throws Exception {
-    cluster.deleteAllCollections();
-  }
-
   @Test
   public void testAddShard() throws Exception {
 
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index f7a011a..3437007 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -838,14 +838,6 @@ public class ZkStateReader implements SolrCloseable {
       } catch (NullPointerException e) {
         // okay
       }
-      if (notifications != null) {
-        try {
-          boolean success = notifications.awaitTermination(1, TimeUnit.SECONDS);
-          if (!success) notifications.shutdownNow();
-        } catch (InterruptedException e) {
-          ParWork.propagateInterrupt(e);
-        }
-      }
 
     } finally {
       assert ObjectReleaseTracker.release(this);
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java
index 95c0c16..f6001ee 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java
@@ -914,7 +914,7 @@ public class CloudHttp2SolrClientTest extends SolrCloudTestCase {
                    // NOTE: don't use our stale_client for this -- don't tip it off of a collection change
                    .process(cluster.getSolrClient()).getStatus());
 
-      cluster.waitForActiveCollection(COL, 1, 1);
+      cluster.waitForActiveCollection(COL, 1, 1, true);
       
       // attempt a (direct) update that should succeed in spite of cached cluster state
       // pointing solely to a node that's no longer part of our collection...
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
index 23d9394..db32fa2 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
@@ -920,10 +920,14 @@ public class MiniSolrCloudCluster {
     }
     throw new SolrException(ErrorCode.NOT_FOUND, "No open Overseer found");
   }
-  
+
   public void waitForActiveCollection(String collection, long wait, TimeUnit unit, int shards, int totalReplicas) {
+    waitForActiveCollection(collection, wait, unit, shards, totalReplicas, false);
+  }
+
+  public void waitForActiveCollection(String collection, long wait, TimeUnit unit, int shards, int totalReplicas, boolean exact) {
     log.info("waitForActiveCollection: {}", collection);
-    CollectionStatePredicate predicate = BaseCloudSolrClient.expectedShardsAndActiveReplicas(shards, totalReplicas, false);
+    CollectionStatePredicate predicate = BaseCloudSolrClient.expectedShardsAndActiveReplicas(shards, totalReplicas, exact);
 
     AtomicReference<DocCollection> state = new AtomicReference<>();
     AtomicReference<Set<String>> liveNodesLastSeen = new AtomicReference<>();
@@ -944,7 +948,11 @@ public class MiniSolrCloudCluster {
   public void waitForActiveCollection(String collection, int shards, int totalReplicas) {
     waitForActiveCollection(collection,  10, TimeUnit.SECONDS, shards, totalReplicas);
   }
-  
+
+  public void waitForActiveCollection(String collection, int shards, int totalReplicas, boolean exact) {
+    waitForActiveCollection(collection,  10, TimeUnit.SECONDS, shards, totalReplicas, exact);
+  }
+
   public void waitForRemovedCollection(String collection) {
     try {
       getSolrClient().waitForState(collection, 10, TimeUnit.SECONDS, (n, c) -> {