You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/02/03 20:13:54 UTC
[lucene-solr] 01/02: @1326 Taking a closer look at delete by query
logic.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit b779739be3aad08fbd21b3ef02fad3c3d14a69c2
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Feb 3 13:43:27 2021 -0600
@1326 Taking a closer look at delete by query logic.
---
.../processor/DistributedUpdateProcessor.java | 47 +++++++++-------------
.../processor/DistributedZkUpdateProcessor.java | 29 +++----------
.../org/apache/solr/common/util/StopWatch.java | 16 +++++---
3 files changed, 35 insertions(+), 57 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index c3a7ec2..673ecbe 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -906,7 +906,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// at this point, there is an update we need to try and apply.
// we may or may not be the leader.
- boolean drop = versionDeleteByQuery(cmd);
+ boolean drop = false;
+ if (!forwardToLeader) {
+ drop = versionDeleteByQuery(cmd);
+ }
if (drop) {
return;
@@ -922,6 +925,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
return null;
};
+
if (!forwardToLeader) {
distFuture = ParWork.getRootSharedExecutor().submit(distCall);
} else {
@@ -936,36 +940,25 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
if (!forwardToLeader) {
- distFuture = ParWork.getRootSharedExecutor().submit(distCall);
- } else {
try {
- distCall.call();
+ doLocalDelete(cmd);
} catch (Exception e) {
- if (e instanceof RuntimeException) {
- throw (RuntimeException) e;
+ log.error("Exception on local deleteByQuery", e);
+ Throwable t;
+ if (e instanceof ExecutionException) {
+ t = e.getCause();
+ } else {
+ t = e;
}
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
- }
- }
-
- try {
- doLocalDelete(cmd);
- } catch (Exception e) {
- log.error("Exception on local deleteByQuery", e);
- Throwable t;
- if (e instanceof ExecutionException) {
- t = e.getCause();
- } else {
- t = e;
- }
- if (distFuture != null && isLeader && !forwardToLeader) {
- distFuture.cancel(false);
- cancelCmds.add(cmd);
- }
- if (t instanceof SolrException) {
- throw (SolrException) t;
+ if (distFuture != null && isLeader && !forwardToLeader) {
+ distFuture.cancel(false);
+ cancelCmds.add(cmd);
+ }
+ if (t instanceof SolrException) {
+ throw (SolrException) t;
+ }
+ throw new SolrException(ErrorCode.SERVER_ERROR, t);
}
- throw new SolrException(ErrorCode.SERVER_ERROR, t);
}
if (distFuture != null) {
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index f762ccf..1a0f37a 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -99,11 +99,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
// 3) in general, not controlling carefully enough exactly when our view of clusterState is updated
protected volatile ClusterState clusterState;
- // should we clone the document before sending it to replicas?
- // this is set to true in the constructor if the next processors in the chain
- // are custom and may modify the SolrInputDocument racing with its serialization for replication
- private final boolean cloneRequiredOnLeader;
-
//used for keeping track of replicas that have processed an add/update from the leader
private volatile RollupRequestReplicationTracker rollupReplicationTracker = null;
private volatile LeaderRequestReplicationTracker leaderReplicationTracker = null;
@@ -119,7 +114,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
zkController = cc.getZkController();
cmdDistrib = new SolrCmdDistributor(zkController.getZkStateReader(), cc.getUpdateShardHandler(), new IsCCClosed(req));
try {
- cloneRequiredOnLeader = isCloneRequiredOnLeader(next);
collection = cloudDesc.getCollectionName();
clusterState = zkController.getClusterState();
DocCollection coll = clusterState.getCollectionOrNull(collection, true);
@@ -286,8 +280,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
- clusterState = zkController.getClusterState();
-
try {
if (isReadOnly()) {
@@ -388,7 +380,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
if (isReadOnly()) {
throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
}
- clusterState = zkController.getClusterState();
super.processDelete(cmd);
}
@@ -450,7 +441,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
@Override
protected void doDeleteByQuery(DeleteUpdateCommand cmd) throws IOException {
zkCheck();
-
+ clusterState = zkController.getClusterState();
// NONE: we are the first to receive this deleteByQuery
// - it must be forwarded to the leader of every shard
// TO: we are a leader receiving a forwarded deleteByQuery... we must:
@@ -482,19 +473,12 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
List<SolrCmdDistributor.Node> leaders = new ArrayList<>(slices.size());
for (Slice slice : slices) {
String sliceName = slice.getName();
- Replica leader;
- try {
- leader = zkController.getZkStateReader().getLeaderRetry(collection, sliceName);
- } catch (Exception e) {
- log.error("Exception finding leader for shard " + sliceName, e);
- continue;
- }
+ Replica leader = slice.getLeader();
// TODO: What if leaders changed in the meantime?
// should we send out slice-at-a-time and if a node returns "hey, I'm not a leader" (or we get an error because it went down) then look up the new leader?
// Am I the leader for this slice?
-
String leaderCoreNodeName = leader.getName();
String coreName = desc.getName();
isLeader = coreName.equals(leaderCoreNodeName);
@@ -819,8 +803,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
@Override
protected boolean shouldCloneCmdDoc() {
- boolean willDistrib = isLeader && nodes != null && nodes.size() > 0;
- return willDistrib & cloneRequiredOnLeader;
+ return true;
}
// helper method, processAdd was getting a bit large.
@@ -862,10 +845,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
if (onlyLeaders) {
Replica replica = docCollection.getLeader(slice.getName());
if (replica != null) {
- Replica nodeProps = replica;
- nodeProps.getProperties().put(ZkStateReader.CORE_NAME_PROP, replica.getName());
- if (zkController.getZkStateReader().isNodeLive(nodeProps.getNodeName())) {
- urls.add(new SolrCmdDistributor.StdNode(zkController.getZkStateReader(), nodeProps, collection, slice.getName()));
+ if (zkController.getZkStateReader().isNodeLive(replica.getNodeName())) {
+ urls.add(new SolrCmdDistributor.StdNode(zkController.getZkStateReader(), replica, collection, slice.getName()));
}
}
continue;
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/StopWatch.java b/solr/solrj/src/java/org/apache/solr/common/util/StopWatch.java
index 1f744ca..040a939 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/StopWatch.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/StopWatch.java
@@ -25,16 +25,20 @@ import java.util.concurrent.TimeUnit;
public class StopWatch {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final long start;
- private final String name;
+ private long start;
+ private String name;
public StopWatch(String name) {
- this.name = "StopWatch-" + name;
- start = System.nanoTime();
+ if (log.isDebugEnabled()) {
+ this.name = "StopWatch-" + name;
+ start = System.nanoTime();
+ }
}
public void done() {
- long time = TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS);
- log.info("Time taken for {}={}ms", name, time);
+ if (log.isDebugEnabled()) {
+ long time = TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS);
+ log.debug("Time taken for {}={}ms", name, time);
+ }
}
}