You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/11/09 06:12:43 UTC
[lucene-solr] branch reference_impl_dev updated: @1106 Harden and
polish work.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl_dev by this push:
new b27a9ad @1106 Harden and polish work.
b27a9ad is described below
commit b27a9ada64f667abe210296a3cbce70f21ba4275
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Nov 9 00:12:21 2020 -0600
@1106 Harden and polish work.
---
.../java/org/apache/solr/cloud/LeaderElector.java | 2 +
.../cloud/api/collections/DeleteReplicaCmd.java | 119 ++++++++++++---------
.../apache/solr/search/facet/TestJsonFacets.java | 11 +-
.../AbstractAtomicUpdatesMultivalueTestBase.java | 6 ++
.../processor/DistributedUpdateProcessorTest.java | 6 +-
.../solrj/io/stream/SelectWithEvaluatorsTest.java | 29 ++---
6 files changed, 95 insertions(+), 78 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
index e6b89ae..d4e7660 100644
--- a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
+++ b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
@@ -405,6 +405,8 @@ public class LeaderElector implements Closeable {
if (zk != null) {
try {
zk.removeWatches(context.leaderSeqPath, this, WatcherType.Any, true);
+ } catch (KeeperException.NoWatcherException e) {
+ // okay
} catch (InterruptedException e) {
log.info("Interrupted removing leader watch");
} catch (KeeperException e) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
index dc6c122..e2b342a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
@@ -90,11 +90,20 @@ public class DeleteReplicaCmd implements Cmd {
//If a count is specified the strategy needs be different
if (message.getStr(COUNT_PROP) != null) {
- AddReplicaCmd.Response resp = deleteReplicaBasedOnCount(clusterState, message, results);
+ ShardHandler shardHandler = null;
+ ShardRequestTracker shardRequestTracker = null;
+ if (!onlyUpdateState) {
+ String asyncId = message.getStr(ASYNC);
+ shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseerLbClient);
+ shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr("operation"));
+ }
+ AddReplicaCmd.Response resp = deleteReplicaBasedOnCount(clusterState, message, results, shardHandler, shardRequestTracker);
clusterState = resp.clusterState;
AddReplicaCmd.Response response = new AddReplicaCmd.Response();
if (results.get("failure") == null && results.get("exception") == null) {
+ ShardRequestTracker finalShardRequestTracker = shardRequestTracker;
+ ShardHandler finalShardHandler = shardHandler;
response.asyncFinalRunner = new OverseerCollectionMessageHandler.Finalize() {
@Override
public AddReplicaCmd.Response call() {
@@ -105,6 +114,14 @@ public class DeleteReplicaCmd implements Cmd {
log.error("Exception running delete replica finalizers", e);
}
}
+
+ if (finalShardRequestTracker != null) {
+ try {
+ finalShardRequestTracker.processResponses(results, finalShardHandler, false, null);
+ } catch (Exception e) {
+ log.error("Exception waiting for delete replica response");
+ }
+ }
// try {
// waitForCoreNodeGone(collectionName, shard, replicaName, 30000);
// } catch (Exception e) {
@@ -138,8 +155,16 @@ public class DeleteReplicaCmd implements Cmd {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Invalid shard name : " + shard + " in collection : " + collectionName);
}
+ ShardHandler shardHandler = null;
+ ShardRequestTracker shardRequestTracker = null;
+ if (!onlyUpdateState) {
+ String asyncId = message.getStr(ASYNC);
+ shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseerLbClient);
+ shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr("operation"));
+ }
- AddReplicaCmd.Response resp = deleteCore(clusterState, slice, collectionName, replicaName, message, shard, results);
+ AddReplicaCmd.Response resp = deleteCore(clusterState, slice, collectionName, replicaName, message,
+ shard, results, shardRequestTracker, shardHandler);
clusterState = resp.clusterState;
if (clusterState.getCollectionOrNull(collectionName).getReplica(replicaName) != null) {
@@ -148,28 +173,30 @@ public class DeleteReplicaCmd implements Cmd {
AddReplicaCmd.Response response = new AddReplicaCmd.Response();
- if (results.get("failure") == null && results.get("exception") == null) {
- response.asyncFinalRunner = new OverseerCollectionMessageHandler.Finalize() {
- @Override
- public AddReplicaCmd.Response call() {
- if (resp.asyncFinalRunner != null) {
- try {
- resp.asyncFinalRunner.call();
- } catch (Exception e) {
- log.error("", e);
- }
- }
-
- try {
- waitForCoreNodeGone(collectionName, shard, replicaName, 30000);
- } catch (Exception e) {
- log.error("", e);
- }
- AddReplicaCmd.Response response = new AddReplicaCmd.Response();
- return response;
- }
- };
- }
+ if (!onlyUpdateState) {
+ ShardRequestTracker finalShardRequestTracker = shardRequestTracker;
+ ShardHandler finalShardHandler = shardHandler;
+ response.asyncFinalRunner = new OverseerCollectionMessageHandler.Finalize() {
+ @Override
+ public AddReplicaCmd.Response call() {
+ if (finalShardRequestTracker != null) {
+ try {
+ finalShardRequestTracker.processResponses(results, finalShardHandler, false, null);
+ } catch (Exception e) {
+ log.error("Exception waiting for delete replica response");
+ }
+ }
+
+// try {
+// waitForCoreNodeGone(collectionName, shard, replicaName, 30000);
+// } catch (Exception e) {
+// log.error("", e);
+// }
+ AddReplicaCmd.Response response = new AddReplicaCmd.Response();
+ return response;
+ }
+ };
+ }
response.clusterState = clusterState;
return response;
}
@@ -181,9 +208,8 @@ public class DeleteReplicaCmd implements Cmd {
* @return
*/
@SuppressWarnings({"unchecked"})
- AddReplicaCmd.Response deleteReplicaBasedOnCount(ClusterState clusterState,
- ZkNodeProps message,
- @SuppressWarnings({"rawtypes"})NamedList results)
+ AddReplicaCmd.Response deleteReplicaBasedOnCount(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"}) NamedList results, ShardHandler shardHandler,
+ ShardRequestTracker shardRequestTracker)
throws KeeperException, InterruptedException {
ocmh.checkRequired(message, COLLECTION_PROP, COUNT_PROP);
int count = Integer.parseInt(message.getStr(COUNT_PROP));
@@ -222,7 +248,7 @@ public class DeleteReplicaCmd implements Cmd {
if (log.isDebugEnabled()) log.debug("Deleting replica {} for shard {} based on count {}", replica, shardId, count);
// nocommit - DONT DO THIS ONE AT TIME
- AddReplicaCmd.Response resp = deleteCore(clusterState, shardSlice, collectionName, replica, message, shard, results);
+ AddReplicaCmd.Response resp = deleteCore(clusterState, shardSlice, collectionName, replica, message, shard, results, shardRequestTracker, shardHandler);
clusterState = resp.clusterState;
if (resp.asyncFinalRunner != null) {
finalizers.add(resp.asyncFinalRunner);
@@ -299,7 +325,8 @@ public class DeleteReplicaCmd implements Cmd {
}
@SuppressWarnings({"unchecked"})
- AddReplicaCmd.Response deleteCore(ClusterState clusterState, Slice slice, String collectionName, String replicaName, ZkNodeProps message, String shard, @SuppressWarnings({"rawtypes"})NamedList results) throws KeeperException, InterruptedException {
+ AddReplicaCmd.Response deleteCore(ClusterState clusterState, Slice slice, String collectionName, String replicaName,
+ ZkNodeProps message, String shard, @SuppressWarnings({"rawtypes"})NamedList results, ShardRequestTracker shardRequestTracker, ShardHandler shardHandler) throws KeeperException, InterruptedException {
log.info("delete core {}", replicaName);
Replica replica = slice.getReplica(replicaName);
if (replica == null) {
@@ -326,9 +353,10 @@ public class DeleteReplicaCmd implements Cmd {
log.info("Before slice remove replica {} {}", rep, clusterState);
clusterState = new SliceMutator(ocmh.cloudManager).removeReplica(clusterState, rep);
log.info("After slice remove replica {} {}", rep, clusterState);
+ boolean isLive = false;
if (!onlyUpdateState) {
- ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseerLbClient);
+
String asyncId = message.getStr(ASYNC);
ModifiableSolrParams params = new ModifiableSolrParams();
@@ -340,7 +368,14 @@ public class DeleteReplicaCmd implements Cmd {
params.set(CoreAdminParams.DELETE_DATA_DIR, message.getBool(CoreAdminParams.DELETE_DATA_DIR, true));
params.set(CoreAdminParams.DELETE_METRICS_HISTORY, message.getBool(CoreAdminParams.DELETE_METRICS_HISTORY, true));
- boolean isLive = ocmh.zkStateReader.getClusterState().getLiveNodes().contains(replica.getNodeName());
+ isLive = ocmh.zkStateReader.getClusterState().getLiveNodes().contains(replica.getNodeName());
+
+
+ if (isLive) {
+
+ shardRequestTracker.sendShardRequest(replica.getNodeName(), params, shardHandler);
+ }
+
// try {
// ocmh.deleteCoreNode(collectionName, replicaName, replica, core);
@@ -349,31 +384,11 @@ public class DeleteReplicaCmd implements Cmd {
// results.add("failure", "Could not complete delete " + e.getMessage());
// }
- final ShardRequestTracker shardRequestTracker = ocmh.asyncRequestTracker(asyncId, message.getStr("operation"));
- if (isLive) {
- response.asyncFinalRunner = () -> {
- shardRequestTracker.sendShardRequest(replica.getNodeName(), params, shardHandler);
- return new AddReplicaCmd.Response();
- };
-
- }
- try {
- try {
- if (isLive) {
- shardRequestTracker.processResponses(results, shardHandler, false, null);
- // try and ensure core info is removed from cluster state
- }
- } catch (Exception e) {
- ParWork.propagateInterrupt(e);
- results.add("failure", "Could not complete delete " + e.getMessage());
- }
- } catch (Exception ex) {
- throw new SolrException(SolrException.ErrorCode.UNKNOWN, "Error waiting for corenode gone", ex);
- }
}
response.clusterState = clusterState;
+
return response;
}
diff --git a/solr/core/src/test/org/apache/solr/search/facet/TestJsonFacets.java b/solr/core/src/test/org/apache/solr/search/facet/TestJsonFacets.java
index 2b8d675..e3cf515 100644
--- a/solr/core/src/test/org/apache/solr/search/facet/TestJsonFacets.java
+++ b/solr/core/src/test/org/apache/solr/search/facet/TestJsonFacets.java
@@ -52,6 +52,7 @@ import org.junit.Test;
// TestJsonRangeFacets for range facet tests
@LuceneTestCase.SuppressCodecs({"Lucene3x","Lucene40","Lucene41","Lucene42","Lucene45","Appending"})
+@LuceneTestCase.Nightly // nocommit - figure out why this test can sometimes take 20 seconds - it's facet executor use?
public class TestJsonFacets extends SolrTestCaseHS {
private static SolrInstances servers; // for distributed testing
@@ -2546,11 +2547,13 @@ public class TestJsonFacets extends SolrTestCaseHS {
public void testPrelimSortingSingleNodeExtraStat() throws Exception {
doTestPrelimSortingSingleNode(true, false);
}
-
+
+ @Nightly
public void testPrelimSortingSingleNodeExtraFacet() throws Exception {
doTestPrelimSortingSingleNode(false, true);
}
-
+
+ @Nightly
public void testPrelimSortingSingleNodeExtraStatAndFacet() throws Exception {
doTestPrelimSortingSingleNode(true, true);
}
@@ -2568,7 +2571,8 @@ public class TestJsonFacets extends SolrTestCaseHS {
nodes.stop();
}
}
-
+
+ @Nightly
public void testPrelimSortingDistrib() throws Exception {
doTestPrelimSortingDistrib(false, false);
}
@@ -3435,6 +3439,7 @@ public class TestJsonFacets extends SolrTestCaseHS {
}
@Test
+ @Nightly
public void testFacetValueTypesDistrib() throws Exception {
initServers();
Client client = servers.getClient(random().nextInt());
diff --git a/solr/core/src/test/org/apache/solr/update/processor/AbstractAtomicUpdatesMultivalueTestBase.java b/solr/core/src/test/org/apache/solr/update/processor/AbstractAtomicUpdatesMultivalueTestBase.java
index 1bce422..40f7b9f 100644
--- a/solr/core/src/test/org/apache/solr/update/processor/AbstractAtomicUpdatesMultivalueTestBase.java
+++ b/solr/core/src/test/org/apache/solr/update/processor/AbstractAtomicUpdatesMultivalueTestBase.java
@@ -293,11 +293,13 @@ public abstract class AbstractAtomicUpdatesMultivalueTestBase extends EmbeddedSo
}
@Test
+ @Nightly
public void testMultivalueCollationField() throws SolrServerException, IOException {
runTestForFieldWithQuery("collationRemove", new String[] {"cf1", "cf2", "cf3", "cf4"});
}
@Test
+ @Nightly
public void testMultivalueDatePointField() throws SolrServerException, IOException {
final String s1 = "1980-01-01T00:00:00Z";
@@ -314,6 +316,7 @@ public abstract class AbstractAtomicUpdatesMultivalueTestBase extends EmbeddedSo
}
@Test
+ @Nightly
public void testMultivalueDateRangeField() throws SolrServerException, IOException {
final String s1 = "1980-01-01T00:00:00Z";
@@ -354,6 +357,7 @@ public abstract class AbstractAtomicUpdatesMultivalueTestBase extends EmbeddedSo
}
@Test
+ @Nightly
public void testMultivalueFloatPointField() throws SolrServerException, IOException {
runTestForFieldWithQuery("floatPointRemove", new Float[] {1.0f, 2.0f, 3.0f, 4.0f});
}
@@ -390,6 +394,7 @@ public abstract class AbstractAtomicUpdatesMultivalueTestBase extends EmbeddedSo
}
@Test
+ @Nightly
public void testMultivalueRandomSortField() throws SolrServerException, IOException {
runTestForFieldWithQuery("randomSortRemove", new String[] {"rsf1", "rsf2", "rsf3", "rsf4"});
}
@@ -418,6 +423,7 @@ public abstract class AbstractAtomicUpdatesMultivalueTestBase extends EmbeddedSo
}
@Test
+ @Nightly
public void testMultivalueUUIDField() throws SolrServerException, IOException {
final String[] values = new String[] {UUID.randomUUID().toString(), UUID.randomUUID().toString(),
UUID.randomUUID().toString(), UUID.randomUUID().toString()};
diff --git a/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java b/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java
index 220e34c..c3f9399 100644
--- a/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java
@@ -102,7 +102,7 @@ public class DistributedUpdateProcessorTest extends SolrTestCaseJ4 {
throw new RuntimeException(e);
}
};
- int succeeded = runCommands(threads, 500, req, versionAddFunc);
+ int succeeded = runCommands(threads, 50, req, versionAddFunc);
// only one should succeed
assertThat(succeeded, is(1));
@@ -126,7 +126,7 @@ public class DistributedUpdateProcessorTest extends SolrTestCaseJ4 {
}
};
- int succeeded = runCommands(threads, 500, req, versionDeleteFunc);
+ int succeeded = runCommands(threads, 50, req, versionDeleteFunc);
// only one should succeed
assertThat(succeeded, is(1));
@@ -159,7 +159,7 @@ public class DistributedUpdateProcessorTest extends SolrTestCaseJ4 {
locked = lock.tryLock(versionBucketLockTimeoutMs, TimeUnit.MILLISECONDS);
if (locked) {
- Thread.sleep(1000);
+ Thread.sleep(100);
return function.apply();
} else {
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java
index 75c8bbf..e5d9976 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/SelectWithEvaluatorsTest.java
@@ -54,7 +54,7 @@ public class SelectWithEvaluatorsTest extends SolrCloudTestCase {
@BeforeClass
public static void setupCluster() throws Exception {
- configureCluster(4)
+ configureCluster(4).formatZk(true)
.addConfig("conf", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
.addConfig("ml", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("ml").resolve("conf"))
.configure();
@@ -92,24 +92,15 @@ public class SelectWithEvaluatorsTest extends SolrCloudTestCase {
TupleStream stream;
List<Tuple> tuples;
StreamContext streamContext = new StreamContext();
- SolrClientCache solrClientCache = new SolrClientCache(cluster.getSolrClient().getZkStateReader());
- streamContext.setSolrClientCache(solrClientCache);
-
- StreamFactory factory = new StreamFactory()
- .withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress())
- .withFunctionName("search", CloudSolrStream.class)
- .withFunctionName("select", SelectStream.class)
- .withFunctionName("add", AddEvaluator.class)
- .withFunctionName("if", IfThenElseEvaluator.class)
- .withFunctionName("gt", GreaterThanEvaluator.class)
- ;
- try {
+ try (SolrClientCache solrClientCache = new SolrClientCache(cluster.getSolrClient().getZkStateReader())) {
+ streamContext.setSolrClientCache(solrClientCache);
+
+ StreamFactory factory = new StreamFactory().withCollectionZkHost("collection1", cluster.getZkServer().getZkAddress()).withFunctionName("search", CloudSolrStream.class)
+ .withFunctionName("select", SelectStream.class).withFunctionName("add", AddEvaluator.class).withFunctionName("if", IfThenElseEvaluator.class)
+ .withFunctionName("gt", GreaterThanEvaluator.class);
+
// Basic test
- clause = "select("
- + "id,"
- + "add(b_i,c_d) as result,"
- + "search(collection1, q=*:*, fl=\"id,a_s,b_i,c_d,d_b\", sort=\"id asc\")"
- + ")";
+ clause = "select(" + "id," + "add(b_i,c_d) as result," + "search(collection1, q=*:*, fl=\"id,a_s,b_i,c_d,d_b\", sort=\"id asc\")" + ")";
stream = factory.constructStream(clause);
stream.setStreamContext(streamContext);
tuples = getTuples(stream);
@@ -118,8 +109,6 @@ public class SelectWithEvaluatorsTest extends SolrCloudTestCase {
assertEquals(1, tuples.size());
assertDouble(tuples.get(0), "result", 4.3);
assertEquals(4.3, tuples.get(0).get("result"));
- } finally {
- solrClientCache.close();
}
}