You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/11/09 06:12:43 UTC

[lucene-solr] branch reference_impl_dev updated: @1106 Harden and polish work.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl_dev by this push:
     new b27a9ad  @1106 Harden and polish work.
b27a9ad is described below

commit b27a9ada64f667abe210296a3cbce70f21ba4275
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Nov 9 00:12:21 2020 -0600

    @1106 Harden and polish work.
---
 .../java/org/apache/solr/cloud/LeaderElector.java  |   2 +
 .../cloud/api/collections/DeleteReplicaCmd.java    | 119 ++++++++++++---------
 .../apache/solr/search/facet/TestJsonFacets.java   |  11 +-
 .../AbstractAtomicUpdatesMultivalueTestBase.java   |   6 ++
 .../processor/DistributedUpdateProcessorTest.java  |   6 +-
 .../solrj/io/stream/SelectWithEvaluatorsTest.java  |  29 ++---
 6 files changed, 95 insertions(+), 78 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
index e6b89ae..d4e7660 100644
--- a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
+++ b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
@@ -405,6 +405,8 @@ public class LeaderElector implements Closeable {
       if (zk != null) {
         try {
           zk.removeWatches(context.leaderSeqPath, this, WatcherType.Any, true);
+        } catch (KeeperException.NoWatcherException e) {
+          // okay
         } catch (InterruptedException e) {
           log.info("Interrupted removing leader watch");
         } catch (KeeperException e) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
index dc6c122..e2b342a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
@@ -90,11 +90,20 @@ public class DeleteReplicaCmd implements Cmd {
 
     //If a count is specified the strategy needs be different
     if (message.getStr(COUNT_PROP) != null) {
-      AddReplicaCmd.Response resp = deleteReplicaBasedOnCount(clusterState, message, results);
+      ShardHandler shardHandler = null;
+      ShardRequestTracker shardRequestTracker = null;
+      if (!onlyUpdateState) {
+        String asyncId = message.getStr(ASYNC);
+        shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseerLbClient);
+        shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr("operation"));
+      }
+      AddReplicaCmd.Response resp = deleteReplicaBasedOnCount(clusterState, message, results, shardHandler, shardRequestTracker);
       clusterState = resp.clusterState;
       AddReplicaCmd.Response response = new AddReplicaCmd.Response();
 
       if (results.get("failure") == null && results.get("exception") == null) {
+        ShardRequestTracker finalShardRequestTracker = shardRequestTracker;
+        ShardHandler finalShardHandler = shardHandler;
         response.asyncFinalRunner = new OverseerCollectionMessageHandler.Finalize() {
           @Override
           public AddReplicaCmd.Response call() {
@@ -105,6 +114,14 @@ public class DeleteReplicaCmd implements Cmd {
                 log.error("Exception running delete replica finalizers", e);
               }
             }
+
+            if (finalShardRequestTracker != null) {
+              try {
+                finalShardRequestTracker.processResponses(results, finalShardHandler, false, null);
+              } catch (Exception e) {
+                log.error("Exception waiting for delete replica response");
+              }
+            }
             //          try {
             //            waitForCoreNodeGone(collectionName, shard, replicaName, 30000);
             //          } catch (Exception e) {
@@ -138,8 +155,16 @@ public class DeleteReplicaCmd implements Cmd {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
               "Invalid shard name : " +  shard + " in collection : " +  collectionName);
     }
+    ShardHandler shardHandler = null;
+    ShardRequestTracker shardRequestTracker = null;
+    if (!onlyUpdateState) {
+      String asyncId = message.getStr(ASYNC);
+      shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseerLbClient);
+      shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr("operation"));
+    }
 
-    AddReplicaCmd.Response resp = deleteCore(clusterState, slice, collectionName, replicaName, message, shard, results);
+    AddReplicaCmd.Response resp = deleteCore(clusterState, slice, collectionName, replicaName, message,
+        shard, results, shardRequestTracker, shardHandler);
     clusterState = resp.clusterState;
 
     if (clusterState.getCollectionOrNull(collectionName).getReplica(replicaName) != null) {
@@ -148,28 +173,30 @@ public class DeleteReplicaCmd implements Cmd {
 
     AddReplicaCmd.Response response = new AddReplicaCmd.Response();
 
-    if (results.get("failure") == null && results.get("exception") == null) {
-      response.asyncFinalRunner = new OverseerCollectionMessageHandler.Finalize() {
-        @Override
-        public AddReplicaCmd.Response call() {
-          if (resp.asyncFinalRunner != null) {
-            try {
-              resp.asyncFinalRunner.call();
-            } catch (Exception e) {
-              log.error("", e);
-            }
-          }
-
-          try {
-            waitForCoreNodeGone(collectionName, shard, replicaName, 30000);
-          } catch (Exception e) {
-            log.error("", e);
-          }
-          AddReplicaCmd.Response response = new AddReplicaCmd.Response();
-          return response;
-        }
-      };
-    }
+   if (!onlyUpdateState) {
+     ShardRequestTracker finalShardRequestTracker = shardRequestTracker;
+     ShardHandler finalShardHandler = shardHandler;
+     response.asyncFinalRunner = new OverseerCollectionMessageHandler.Finalize() {
+       @Override
+       public AddReplicaCmd.Response call() {
+         if (finalShardRequestTracker != null) {
+           try {
+             finalShardRequestTracker.processResponses(results, finalShardHandler, false, null);
+           } catch (Exception e) {
+             log.error("Exception waiting for delete replica response");
+           }
+         }
+
+//         try {
+//           waitForCoreNodeGone(collectionName, shard, replicaName, 30000);
+//         } catch (Exception e) {
+//           log.error("", e);
+//         }
+         AddReplicaCmd.Response response = new AddReplicaCmd.Response();
+         return response;
+       }
+     };
+   }
     response.clusterState = clusterState;
     return response;
   }
@@ -181,9 +208,8 @@ public class DeleteReplicaCmd implements Cmd {
    * @return
    */
   @SuppressWarnings({"unchecked"})
-  AddReplicaCmd.Response deleteReplicaBasedOnCount(ClusterState clusterState,
-                                 ZkNodeProps message,
-                                 @SuppressWarnings({"rawtypes"})NamedList results)
+  AddReplicaCmd.Response deleteReplicaBasedOnCount(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"}) NamedList results, ShardHandler shardHandler,
+      ShardRequestTracker shardRequestTracker)
           throws KeeperException, InterruptedException {
     ocmh.checkRequired(message, COLLECTION_PROP, COUNT_PROP);
     int count = Integer.parseInt(message.getStr(COUNT_PROP));
@@ -222,7 +248,7 @@ public class DeleteReplicaCmd implements Cmd {
         if (log.isDebugEnabled()) log.debug("Deleting replica {}  for shard {} based on count {}", replica, shardId, count);
         // nocommit - DONT DO THIS ONE AT TIME
 
-        AddReplicaCmd.Response resp = deleteCore(clusterState, shardSlice, collectionName, replica, message, shard, results);
+        AddReplicaCmd.Response resp = deleteCore(clusterState, shardSlice, collectionName, replica, message, shard, results, shardRequestTracker, shardHandler);
         clusterState = resp.clusterState;
         if (resp.asyncFinalRunner != null) {
           finalizers.add(resp.asyncFinalRunner);
@@ -299,7 +325,8 @@ public class DeleteReplicaCmd implements Cmd {
   }
 
   @SuppressWarnings({"unchecked"})
-  AddReplicaCmd.Response deleteCore(ClusterState clusterState, Slice slice, String collectionName, String replicaName, ZkNodeProps message, String shard, @SuppressWarnings({"rawtypes"})NamedList results) throws KeeperException, InterruptedException {
+  AddReplicaCmd.Response deleteCore(ClusterState clusterState, Slice slice, String collectionName, String replicaName,
+      ZkNodeProps message, String shard, @SuppressWarnings({"rawtypes"})NamedList results, ShardRequestTracker shardRequestTracker, ShardHandler shardHandler) throws KeeperException, InterruptedException {
     log.info("delete core {}", replicaName);
     Replica replica = slice.getReplica(replicaName);
     if (replica == null) {
@@ -326,9 +353,10 @@ public class DeleteReplicaCmd implements Cmd {
     log.info("Before slice remove replica {} {}", rep, clusterState);
     clusterState = new SliceMutator(ocmh.cloudManager).removeReplica(clusterState, rep);
     log.info("After slice remove replica {} {}", rep, clusterState);
+    boolean isLive = false;
 
     if (!onlyUpdateState) {
-      ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseerLbClient);
+
       String asyncId = message.getStr(ASYNC);
 
       ModifiableSolrParams params = new ModifiableSolrParams();
@@ -340,7 +368,14 @@ public class DeleteReplicaCmd implements Cmd {
       params.set(CoreAdminParams.DELETE_DATA_DIR, message.getBool(CoreAdminParams.DELETE_DATA_DIR, true));
       params.set(CoreAdminParams.DELETE_METRICS_HISTORY, message.getBool(CoreAdminParams.DELETE_METRICS_HISTORY, true));
 
-      boolean isLive = ocmh.zkStateReader.getClusterState().getLiveNodes().contains(replica.getNodeName());
+      isLive = ocmh.zkStateReader.getClusterState().getLiveNodes().contains(replica.getNodeName());
+
+
+      if (isLive) {
+
+          shardRequestTracker.sendShardRequest(replica.getNodeName(), params, shardHandler);
+      }
+
 
       //    try {
       //      ocmh.deleteCoreNode(collectionName, replicaName, replica, core);
@@ -349,31 +384,11 @@ public class DeleteReplicaCmd implements Cmd {
       //      results.add("failure", "Could not complete delete " + e.getMessage());
       //    }
 
-      final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr("operation"));
-      if (isLive) {
-        response.asyncFinalRunner = () -> {
-          shardRequestTracker.sendShardRequest(replica.getNodeName(), params, shardHandler);
-          return new AddReplicaCmd.Response();
-        };
-
-      }
 
-      try {
-        try {
-          if (isLive) {
-            shardRequestTracker.processResponses(results, shardHandler, false, null);
-            // try and ensure core info is removed from cluster state
-          }
 
-        } catch (Exception e) {
-          ParWork.propagateInterrupt(e);
-          results.add("failure", "Could not complete delete " + e.getMessage());
-        }
-      } catch (Exception ex) {
-        throw new SolrException(SolrException.ErrorCode.UNKNOWN, "Error waiting for corenode gone", ex);
-      }
     }
     response.clusterState = clusterState;
+
     return response;
   }
 
diff --git a/solr/core/src/test/org/apache/solr/search/facet/TestJsonFacets.java b/solr/core/src/test/org/apache/solr/search/facet/TestJsonFacets.java
index 2b8d675..e3cf515 100644
--- a/solr/core/src/test/org/apache/solr/search/facet/TestJsonFacets.java
+++ b/solr/core/src/test/org/apache/solr/search/facet/TestJsonFacets.java
@@ -52,6 +52,7 @@ import org.junit.Test;
 //   TestJsonRangeFacets for range facet tests
 
 @LuceneTestCase.SuppressCodecs({"Lucene3x","Lucene40","Lucene41","Lucene42","Lucene45","Appending"})
+@LuceneTestCase.Nightly // nocommit - figure out why this test can sometimes take 20 seconds - it's facet executor use?
 public class TestJsonFacets extends SolrTestCaseHS {
   
   private static SolrInstances servers;  // for distributed testing
@@ -2546,11 +2547,13 @@ public class TestJsonFacets extends SolrTestCaseHS {
   public void testPrelimSortingSingleNodeExtraStat() throws Exception {
     doTestPrelimSortingSingleNode(true, false);
   }
-  
+
+  @Nightly
   public void testPrelimSortingSingleNodeExtraFacet() throws Exception {
     doTestPrelimSortingSingleNode(false, true);
   }
-  
+
+  @Nightly
   public void testPrelimSortingSingleNodeExtraStatAndFacet() throws Exception {
     doTestPrelimSortingSingleNode(true, true);
   }
@@ -2568,7 +2571,8 @@ public class TestJsonFacets extends SolrTestCaseHS {
       nodes.stop();
     }
   }
-  
+
+  @Nightly
   public void testPrelimSortingDistrib() throws Exception {
     doTestPrelimSortingDistrib(false, false);
   }
@@ -3435,6 +3439,7 @@ public class TestJsonFacets extends SolrTestCaseHS {
   }
 
   @Test
+  @Nightly
   public void testFacetValueTypesDistrib() throws Exception {
     initServers();
     Client client = servers.getClient(random().nextInt());
diff --git a/solr/core/src/test/org/apache/solr/update/processor/AbstractAtomicUpdatesMultivalueTestBase.java b/solr/core/src/test/org/apache/solr/update/processor/AbstractAtomicUpdatesMultivalueTestBase.java
index 1bce422..40f7b9f 100644
--- a/solr/core/src/test/org/apache/solr/update/processor/AbstractAtomicUpdatesMultivalueTestBase.java
+++ b/solr/core/src/test/org/apache/solr/update/processor/AbstractAtomicUpdatesMultivalueTestBase.java
@@ -293,11 +293,13 @@ public abstract class AbstractAtomicUpdatesMultivalueTestBase extends EmbeddedSo
   }
 
   @Test
+  @Nightly
   public void testMultivalueCollationField() throws SolrServerException, IOException {
     runTestForFieldWithQuery("collationRemove", new String[] {"cf1", "cf2", "cf3", "cf4"});
   }
 
   @Test
+  @Nightly
   public void testMultivalueDatePointField() throws SolrServerException, IOException {
 
     final String s1 = "1980-01-01T00:00:00Z";
@@ -314,6 +316,7 @@ public abstract class AbstractAtomicUpdatesMultivalueTestBase extends EmbeddedSo
   }
 
   @Test
+  @Nightly
   public void testMultivalueDateRangeField() throws SolrServerException, IOException {
 
     final String s1 = "1980-01-01T00:00:00Z";
@@ -354,6 +357,7 @@ public abstract class AbstractAtomicUpdatesMultivalueTestBase extends EmbeddedSo
   }
 
   @Test
+  @Nightly
   public void testMultivalueFloatPointField() throws SolrServerException, IOException {
     runTestForFieldWithQuery("floatPointRemove", new Float[] {1.0f, 2.0f, 3.0f, 4.0f});
   }
@@ -390,6 +394,7 @@ public abstract class AbstractAtomicUpdatesMultivalueTestBase extends EmbeddedSo
   }
 
   @Test
+  @Nightly
   public void testMultivalueRandomSortField() throws SolrServerException, IOException {
     runTestForFieldWithQuery("randomSortRemove", new String[] {"rsf1", "rsf2", "rsf3", "rsf4"});
   }
@@ -418,6 +423,7 @@ public abstract class AbstractAtomicUpdatesMultivalueTestBase extends EmbeddedSo
   }
 
   @Test
+  @Nightly
   public void testMultivalueUUIDField() throws SolrServerException, IOException {
     final String[] values = new String[] {UUID.randomUUID().toString(), UUID.randomUUID().toString(),
         UUID.randomUUID().toString(), UUID.randomUUID().toString()};
diff --git a/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java
index 220e34c..c3f9399 100644
--- a/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java
@@ -102,7 +102,7 @@ public class DistributedUpdateProcessorTest extends SolrTestCaseJ4 {
         throw new RuntimeException(e);
       }
     };
-    int succeeded = runCommands(threads, 500, req, versionAddFunc);
+    int succeeded = runCommands(threads, 50, req, versionAddFunc);
     // only one should succeed
     assertThat(succeeded, is(1));
 
@@ -126,7 +126,7 @@ public class DistributedUpdateProcessorTest extends SolrTestCaseJ4 {
       }
     };
 
-    int succeeded = runCommands(threads, 500, req, versionDeleteFunc);
+    int succeeded = runCommands(threads, 50, req, versionDeleteFunc);
     // only one should succeed
     assertThat(succeeded, is(1));
 
@@ -159,7 +159,7 @@ public class DistributedUpdateProcessorTest extends SolrTestCaseJ4 {
               locked = lock.tryLock(versionBucketLockTimeoutMs, TimeUnit.MILLISECONDS);
               if (locked) {
 
-                Thread.sleep(1000);
+                Thread.sleep(100);
 
                 return function.apply();
               } else {
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java
index 75c8bbf..e5d9976 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java
@@ -54,7 +54,7 @@ public class SelectWithEvaluatorsTest extends SolrCloudTestCase {
 
   @BeforeClass
   public static void setupCluster() throws Exception {
-    configureCluster(4)
+    configureCluster(4).formatZk(true)
         .addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
         .addConfig("ml", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("ml").resolve("conf"))
         .configure();
@@ -92,24 +92,15 @@ public class SelectWithEvaluatorsTest extends SolrCloudTestCase {
     TupleStream stream;
     List<Tuple> tuples;
     StreamContext streamContext = new StreamContext();
-    SolrClientCache solrClientCache = new SolrClientCache(cluster.getSolrClient().getZkStateReader());
-    streamContext.setSolrClientCache(solrClientCache);
-    
-    StreamFactory factory = new StreamFactory()
-      .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
-      .withFunctionName("search", CloudSolrStream.class)
-      .withFunctionName("select", SelectStream.class)
-      .withFunctionName("add", AddEvaluator.class)
-      .withFunctionName("if", IfThenElseEvaluator.class)
-      .withFunctionName("gt", GreaterThanEvaluator.class)
-      ;
-    try {
+    try (SolrClientCache solrClientCache = new SolrClientCache(cluster.getSolrClient().getZkStateReader())) {
+      streamContext.setSolrClientCache(solrClientCache);
+
+      StreamFactory factory = new StreamFactory().withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()).withFunctionName("search", CloudSolrStream.class)
+          .withFunctionName("select", SelectStream.class).withFunctionName("add", AddEvaluator.class).withFunctionName("if", IfThenElseEvaluator.class)
+          .withFunctionName("gt", GreaterThanEvaluator.class);
+
       // Basic test
-      clause = "select("
-          + "id,"
-          + "add(b_i,c_d) as result,"
-          + "search(collection1, q=*:*, fl=\"id,a_s,b_i,c_d,d_b\", sort=\"id asc\")"
-          + ")";
+      clause = "select(" + "id," + "add(b_i,c_d) as result," + "search(collection1, q=*:*, fl=\"id,a_s,b_i,c_d,d_b\", sort=\"id asc\")" + ")";
       stream = factory.constructStream(clause);
       stream.setStreamContext(streamContext);
       tuples = getTuples(stream);
@@ -118,8 +109,6 @@ public class SelectWithEvaluatorsTest extends SolrCloudTestCase {
       assertEquals(1, tuples.size());
       assertDouble(tuples.get(0), "result", 4.3);
       assertEquals(4.3, tuples.get(0).get("result"));
-    } finally {
-      solrClientCache.close();
     }
   }