You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/07/30 16:00:40 UTC
[lucene-solr] 04/06: @470 More working out dist updates.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 5c5cfbf1e93795b1ef54b4b36f9a5f4a0bf7f6e2
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Jul 30 08:42:06 2020 -0500
@470 More working out dist updates.
---
.../org/apache/solr/update/SolrCmdDistributor.java | 5 +-
.../processor/DistributedUpdateProcessor.java | 56 ++++++++++------------
.../DistributedUpdateProcessorFactory.java | 5 +-
.../processor/DistributedZkUpdateProcessor.java | 27 ++++++-----
.../solr/cloud/FullSolrCloudDistribCmdsTest.java | 11 +++--
.../org/apache/solr/handler/JsonLoaderTest.java | 6 +--
.../src/java/org/apache/solr/common/ParWork.java | 14 +++++-
.../src/resources/logconf/log4j2-std-debug.xml | 4 +-
8 files changed, 68 insertions(+), 60 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index c7ed8fd..da12d4c 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -57,7 +57,7 @@ public class SolrCmdDistributor implements Closeable {
private static final int MAX_RETRIES_ON_FORWARD = 3;
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private boolean finished = false; // see finish()
+ private volatile boolean finished = false; // see finish()
private int maxRetriesOnForward = MAX_RETRIES_ON_FORWARD;
@@ -208,7 +208,8 @@ public class SolrCmdDistributor implements Closeable {
}
public void blockAndDoRetries() {
- phaser.arriveAndAwaitAdvance();
+ //phaser.arriveAndAwaitAdvance();
+ solrClient.waitForOutstandingRequests();
}
void addCommit(UpdateRequest ureq, CommitUpdateCommand cmd) {
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index f1e9a11..4401302 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -92,11 +92,11 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
* Requests from leader to it's followers will be retried this amount of times by default
*/
static final int MAX_RETRIES_TO_FOLLOWERS_DEFAULT = Integer.getInteger("solr.retries.to.followers", 3);
- private long versionOnUpdate;
- private VersionBucket bucket;
- private boolean isReplayOrPeersync;
- private boolean leaderLogic;
- private boolean forwardedFromCollection;
+ private volatile long versionOnUpdate;
+ private volatile VersionBucket bucket;
+ private volatile boolean isReplayOrPeersync;
+ private volatile boolean leaderLogic;
+ private volatile boolean forwardedFromCollection;
/**
* Values this processor supports for the <code>DISTRIB_UPDATE_PARAM</code>.
@@ -125,7 +125,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
public static final String LOG_REPLAY = "log_replay";
// used to assert we don't call finish more than once, see finish()
- private boolean finished = false;
+ private volatile boolean finished = false;
protected final SolrQueryRequest req;
protected final SolrQueryResponse rsp;
@@ -135,7 +135,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
@VisibleForTesting
VersionInfo vinfo;
private final boolean versionsStored;
- private boolean returnVersions;
+ private volatile boolean returnVersions;
private NamedList<Object> addsResponse = null;
private NamedList<Object> deleteResponse = null;
@@ -146,9 +146,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// these are setup at the start of each request processing
// method in this update processor
- protected boolean isLeader = true;
- protected boolean forwardToLeader = false;
- protected boolean isSubShardLeader = false;
+ protected volatile boolean isLeader = true;
+ protected volatile boolean forwardToLeader = false;
+ protected volatile boolean isSubShardLeader = false;
protected volatile boolean isIndexChanged = false;
/**
@@ -160,7 +160,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
*/
protected final int maxRetriesToFollowers = MAX_RETRIES_TO_FOLLOWERS_DEFAULT;
- protected UpdateCommand updateCommand; // the current command this processor is working on.
+ protected volatile UpdateCommand updateCommand; // the current command this processor is working on.
protected final Replica.Type replicaType;
@@ -233,23 +233,14 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (dropCmd) {
// TODO: do we need to add anything to the response?
+ log.info("Dropping update {}", cmd.getPrintableId());
return;
}
SolrInputDocument clonedDoc = shouldCloneCmdDoc() ? cmd.solrDoc.deepCopy(): null;
- AddUpdateCommand cmd2 = null;
+
if (clonedDoc != null) {
- cmd2 = new AddUpdateCommand(cmd.getReq());
- cmd2.commitWithin = cmd.commitWithin;
- cmd2.isNested = cmd.isNested;
- cmd2.overwrite = cmd.overwrite;
- cmd2.prevVersion = cmd.prevVersion;
- cmd2.updateTerm = cmd.updateTerm;
- cmd2.isLastDocInBatch = cmd.isLastDocInBatch;
- cmd2.solrDoc = clonedDoc;
- cmd2.setVersion((long) cmd.solrDoc.getFieldValue(CommonParams.VERSION_FIELD));
- cmd2.setFlags(cmd.getFlags());
- cmd2.setRoute(cmd.getRoute());
+ cmd.solrDoc = clonedDoc;
}
try (ParWork worker = new ParWork(this)) {
if (!forwardToLeader) {
@@ -278,17 +269,17 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
});
}
- if (req.getCore().getCoreContainer().isZooKeeperAware()) {
- AddUpdateCommand finalCmd;
- if (cmd2 == null) {
- finalCmd = cmd;
- } else {
- finalCmd = cmd2;
- }
+ boolean zkAware = req.getCore().getCoreContainer().isZooKeeperAware();
+ log.info("Is zk aware {}", zkAware);
+ if (zkAware) {
+
+ log.info("Collect distrib add");
worker.collect(() -> {
+ log.info("Run distrib add collection");
try {
- doDistribAdd(finalCmd);
- } catch (Exception e) {
+ DistributedUpdateProcessor.this.doDistribAdd(cmd);
+ log.info("after distrib add collection");
+ } catch (Throwable e) {
ParWork.propegateInterrupt(e);
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
@@ -319,6 +310,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
protected void doDistribAdd(AddUpdateCommand cmd) throws IOException {
// no-op for derived classes to implement
+ log.info("in dist add");
}
// must be synchronized by bucket
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
index 93c1bf2..19093a4 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
@@ -57,8 +57,9 @@ public class DistributedUpdateProcessorFactory
new DistributedZkUpdateProcessor(req, rsp, next) :
new DistributedUpdateProcessor(req, rsp, next);
// note: will sometimes return DURP (no overhead) instead of wrapping
- return RoutedAliasUpdateProcessor.wrap(req,
- distribUpdateProcessor);
+ UpdateRequestProcessor proc = RoutedAliasUpdateProcessor
+ .wrap(req, distribUpdateProcessor);
+ return proc;
}
}
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index 52277a8..c68390b 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -78,7 +78,6 @@ import org.slf4j.LoggerFactory;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
-
private final CloudDescriptor cloudDesc;
private final ZkController zkController;
private final SolrCmdDistributor cmdDistrib;
@@ -227,15 +226,18 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
- List<SolrCmdDistributor.Node> finalUseNodes = useNodes;
- ParWork.getExecutor().submit(() -> cmdDistrib.distribCommit(cmd, finalUseNodes, params));
-
+ cmdDistrib.distribCommit(cmd, useNodes, params);
}
}
if (isLeader) {
log.info("Do a local commit on NRT endpoint for leader");
- doLocalCommit(cmd);
+ try {
+ doLocalCommit(cmd);
+ } catch (Exception e) {
+ log.error("Error on local commit");
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ }
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
@@ -251,9 +253,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
zkController.getBaseUrl(), req.getCore().getName()));
-
- List<SolrCmdDistributor.Node> finalUseNodes1 = useNodes;
- cmdDistrib.distribCommit(cmd, finalUseNodes1, params);
+ cmdDistrib.distribCommit(cmd, useNodes, params);
}
@@ -287,6 +287,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
@Override
protected void doDistribAdd(AddUpdateCommand cmd) throws IOException {
+ log.info("in zk dist add");
log.info("Distribute add cmd {} to {} {}", cmd, nodes, isLeader);
if (isLeader && !isSubShardLeader) {
DocCollection coll = clusterState.getCollection(collection);
@@ -318,8 +319,10 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
return;
}
+ } else {
+ log.info("Not a shard or sub shard leader");
}
-
+ log.info("Using nodes {}", nodes);
if (nodes != null) {
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
params.set(DISTRIB_UPDATE_PARAM,
@@ -351,9 +354,9 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
}
} else {
- if (!isLeader && params.get(DISTRIB_UPDATE_PARAM).equals(DistribPhase.FROMLEADER.toString())) {
- throw new IllegalStateException();
- }
+// if (!isLeader && params.get(DISTRIB_UPDATE_PARAM).equals(DistribPhase.FROMLEADER.toString())) {
+// throw new IllegalStateException();
+// }
try {
cmdDistrib
.distribAdd(cmd, nodes, params, false, rollupReplicationTracker,
diff --git a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
index a1ff80c..5c929f4 100644
--- a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
@@ -466,22 +466,23 @@ public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
final CloudHttp2SolrClient cloudClient = cluster.getSolrClient();
final String collectionName = createAndSetNewDefaultCollection();
- final int numDocs = 3;//atLeast(50);
+ final int numDocs = TEST_NIGHTLY ? atLeast(150) : 55;
final JettySolrRunner nodeToUpdate = cluster.getRandomJetty(random());
try (ConcurrentUpdateSolrClient indexClient
= getConcurrentUpdateSolrClient(nodeToUpdate.getBaseUrl() + "/" + collectionName, 10, 2)) {
for (int i = 0; i < numDocs; i++) {
+ log.info("add doc {}", i);
indexClient.add(sdoc("id", i, "text_t",
TestUtil.randomRealisticUnicodeString(random(), 200)));
}
indexClient.blockUntilFinished();
-
assertEquals(0, indexClient.commit().getStatus());
- assertEquals(numDocs, cloudClient.query(params("q","*:*")).getResults().getNumFound());
-
- checkShardConsistency(params("q","*:*", "rows", ""+(1 + numDocs),"_trace","addAll"));
+ indexClient.blockUntilFinished();
}
+ assertEquals(numDocs, cloudClient.query(params("q","*:*")).getResults().getNumFound());
+
+ checkShardConsistency(params("q","*:*", "rows", ""+(1 + numDocs),"_trace","addAll"));
}
/**
diff --git a/solr/core/src/test/org/apache/solr/handler/JsonLoaderTest.java b/solr/core/src/test/org/apache/solr/handler/JsonLoaderTest.java
index 49bf463..ccd832c 100644
--- a/solr/core/src/test/org/apache/solr/handler/JsonLoaderTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/JsonLoaderTest.java
@@ -647,7 +647,7 @@ public class JsonLoaderTest extends SolrTestCaseJ4 {
ignoreException("big_integer_t");
- SolrException ex = expectThrows(SolrException.class, () -> {
+ Exception ex = expectThrows(Exception.class, () -> {
updateJ(json( "[{'id':'1','big_integer_tl':12345678901234567890}]" ), null);
});
// nocommit
@@ -655,10 +655,10 @@ public class JsonLoaderTest extends SolrTestCaseJ4 {
// Adding a BigInteger to an integer field should fail
// BigInteger.intValue() returns only the low-order 32 bits.
- ex = expectThrows(SolrException.class, () -> {
+ ex = expectThrows(Exception.class, () -> {
updateJ(json( "[{'id':'1','big_integer_ti':12345678901234567890}]" ), null);
});
- assertTrue(ex.getCause() instanceof NumberFormatException);
+ assertTrue(ex.getCause().getCause() instanceof NumberFormatException);
unIgnoreException("big_integer_t");
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index 39c4535..88bf51c 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -530,7 +530,15 @@ public class ParWork implements Closeable {
continue;
closeCalls.add(() -> {
- handleObject(workUnit.label, exception, workUnitTracker, object);
+ try {
+ handleObject(workUnit.label, exception, workUnitTracker,
+ object);
+ } catch (Throwable t) {
+ log.error(RAN_INTO_AN_ERROR_WHILE_DOING_WORK, t);
+ if (exception.get() == null) {
+ exception.set(t);
+ }
+ }
return object;
});
@@ -569,7 +577,9 @@ public class ParWork implements Closeable {
} catch (Throwable t) {
log.error(RAN_INTO_AN_ERROR_WHILE_DOING_WORK, t);
- exception.set(t);
+ if (exception.get() == null) {
+ exception.set(t);
+ }
} finally {
tracker.doneClose();
diff --git a/solr/test-framework/src/resources/logconf/log4j2-std-debug.xml b/solr/test-framework/src/resources/logconf/log4j2-std-debug.xml
index 53c117b..ac820b2 100644
--- a/solr/test-framework/src/resources/logconf/log4j2-std-debug.xml
+++ b/solr/test-framework/src/resources/logconf/log4j2-std-debug.xml
@@ -37,7 +37,6 @@
<AsyncLogger name="org.apache.solr.handler.admin.CollectionsHandler" level="INFO"/>
<AsyncLogger name="org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler" level="INFO"/>
<AsyncLogger name="org.apache.solr.cloud.api.collections.CreateCollectionCmd" level="INFO"/>
- <!-- <AsyncLogger name="org.apache.solr.common.patterns.DW" level="DEBUG"/> -->
<AsyncLogger name="org.apache.solr.cloud.overseer.ZkStateWriter" level="INFO"/>
<AsyncLogger name="org.apache.solr.cloud.Overseer" level="INFO"/>
<AsyncLogger name="org.apache.solr.cloud.OverseerTaskProcessor" level="INFO"/>
@@ -47,7 +46,8 @@
<AsyncLogger name="org.apache.solr.client.solrj.impl.LBSolrClient" level="INFO"/>
<AsyncLogger name="org.apache.solr.cloud.ZkController" level="INFO"/>
<AsyncLogger name="org.apache.solr.common.cloud.ZkMaintenanceUtils" level="INFO"/>
- <AsyncLogger name="org.apache.solr.update.processor.DistributedZkUpdateProcessor" level="INFO"/>
+ <AsyncLogger name="org.apache.solr.update.processor.DistributedZkUpdateProcessor" level="DEBUG"/>
+ <AsyncLogger name="org.apache.solr.update.processor.DistributedUpdateProcessor" level="DEBUG"/>
<AsyncLogger name="org.apache.solr.update.SolrCmdDistributor" level="DEBUG"/>
<AsyncLogger name="org.apache.solr.update.processor.LogUpdateProcessorFactory" level="DEBUG"/>