You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@lucene.apache.org by GitBox <gi...@apache.org> on 2021/03/01 15:31:57 UTC

[GitHub] [lucene-solr] sigram commented on a change in pull request #2403: SOLR-15164: Implement Task Management Interface

sigram commented on a change in pull request #2403:
URL: https://github.com/apache/lucene-solr/pull/2403#discussion_r582707248



##########
File path: lucene/core/src/java/org/apache/lucene/search/CancellableCollector.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.search;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.lucene.index.LeafReaderContext;
+
+/** Allows a query to be cancelled */
+public class CancellableCollector implements Collector, CancellableTask {
+
+  /** Thrown when a query gets cancelled */
+  public static class QueryCancelledException extends RuntimeException {}
+
+  private Collector collector;
+  private AtomicBoolean isQueryCancelled;
+
+  public CancellableCollector(Collector collector) {
+    if (collector == null) {
+      throw new IllegalStateException(
+          "Internal collector not provided but wrapper collector accessed");
+    }
+
+    this.collector = collector;
+    this.isQueryCancelled = new AtomicBoolean();
+  }
+
+  @Override
+  public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
+
+    if (isQueryCancelled.compareAndSet(true, false)) {
+      throw new QueryCancelledException();
+    }
+
+    return new FilterLeafCollector(collector.getLeafCollector(context)) {
+
+      @Override
+      public void collect(int doc) throws IOException {
+        if (isQueryCancelled.compareAndSet(true, false)) {
+          throw new QueryCancelledException();
+        }
+

Review comment:
       Pretty sure some of the whitespace in this method could be removed ;)

##########
File path: solr/core/src/java/org/apache/solr/core/SolrCore.java
##########
@@ -3245,6 +3252,75 @@ public void postClose(SolrCore core) {
     return blobRef;
   }
 
+  /** Generates a UUID for the given query or if the user provided a UUID
+   * for this query, uses that.
+   */
+  public String generateQueryID(SolrQueryRequest req) {
+    String queryID;
+    String customQueryUUID = req.getParams().get(CUSTOM_QUERY_UUID, null);
+
+    if (customQueryUUID != null) {
+      queryID = customQueryUUID;
+    } else {
+      queryID = UUID.randomUUID().toString();
+    }
+
+    if (activeQueriesGenerated.containsKey(queryID)) {
+      if (customQueryUUID != null) {
+        throw new IllegalArgumentException("Duplicate query UUID given");
+      } else {
+        while (activeQueriesGenerated.get(queryID) != null) {
+          queryID = UUID.randomUUID().toString();
+        }
+      }
+    }
+
+    activeQueriesGenerated.put(queryID, req.getHttpSolrCall().getReq().getQueryString());
+
+    return queryID;
+  }
+
+  public void releaseQueryID(String inputQueryID) {
+    if (inputQueryID == null) {
+      return;
+    }
+
+    activeQueriesGenerated.remove(inputQueryID);
+  }
+
+  public boolean isQueryIdActive(String queryID) {
+    return activeQueriesGenerated.containsKey(queryID);
+  }
+
+  public void addShardLevelActiveQuery(String queryID, CancellableCollector collector) {
+    if (queryID == null) {
+      return;
+    }
+
+    activeCancellableQueries.put(queryID, collector);
+  }
+
+  public CancellableTask getCancellableTask(String queryID) {
+    if (queryID == null) {
+      throw new IllegalArgumentException("Input queryID is null");
+    }
+
+    return activeCancellableQueries.get(queryID);
+  }
+
+  public void removeCancellableQuery(String queryID) {
+    if (queryID == null) {
+      // Some components, such as CaffeineCache, use the searcher to fire internal queries which are not tracked
+      return;
+    }
+
+    activeCancellableQueries.remove(queryID);
+  }
+
+  public Iterator<Map.Entry<String, String>> getActiveQueriesGenerated() {
+    return activeQueriesGenerated.entrySet().iterator();
+  }
+

Review comment:
       All these methods add a significant API surface to the already bloated SolrCore. I suggest putting all of the methods and the tracking maps into a utility class, say `CancellableQueryTracker`.

##########
File path: solr/solrj/src/java/org/apache/solr/client/solrj/response/QueryResponse.java
##########
@@ -184,6 +187,15 @@ else if ( "terms".equals( n ) ) {
       else if ( "moreLikeThis".equals( n ) ) {
         _moreLikeThisInfo = (NamedList<SolrDocumentList>) res.getVal( i );
       }
+      else if ("taskList".equals( n )) {

Review comment:
       Do we need three separate sections for these? I don't think it's possible to return all three in the same call, it's either this or that, so one section eg. `taskInfo` should be enough?

##########
File path: solr/solrj/src/java/org/apache/solr/common/params/CommonParams.java
##########
@@ -160,6 +160,26 @@
    */
   String TIME_ALLOWED = "timeAllowed";
 
+  /**
+   * Is the query cancellable?
+   */
+  String IS_QUERY_CANCELLABLE = "canCancel";
+
+  /**
+   * Custom query UUID if provided.
+   */
+  String CUSTOM_QUERY_UUID = "queryUUID";

Review comment:
       We can drop the CUSTOM prefix here.

##########
File path: solr/core/src/java/org/apache/solr/handler/component/TaskManagementHandler.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.component;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.ShardParams;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.RequestHandlerBase;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.security.PermissionNameProvider;
+import org.apache.solr.util.plugin.SolrCoreAware;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.solr.common.params.CommonParams.DISTRIB;
+import static org.apache.solr.common.params.CommonParams.PATH;
+
+/**
+ * Abstract class which serves as the root of all task managing handlers
+ */
+public abstract class TaskManagementHandler extends RequestHandlerBase implements SolrCoreAware, PermissionNameProvider {
+    private ShardHandlerFactory shardHandlerFactory;
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public abstract void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception;

Review comment:
       There's no need to override this abstract method here.

##########
File path: solr/solrj/src/java/org/apache/solr/common/params/CommonParams.java
##########
@@ -160,6 +160,26 @@
    */
   String TIME_ALLOWED = "timeAllowed";
 
+  /**
+   * Is the query cancellable?
+   */
+  String IS_QUERY_CANCELLABLE = "canCancel";
+
+  /**
+   * Custom query UUID if provided.
+   */
+  String CUSTOM_QUERY_UUID = "queryUUID";
+
+  /**
+   * UUID for query to be cancelled
+   */
+  String QUERY_CANCELLATION_UUID = "cancelUUID";

Review comment:
       Hmm. Why do we need to use a separate parameter name? We either submit a new query (and it's handled by one handler) or cancel a query (and it's handled by a completely separate handler) so there's no danger of confusion. I think we should just use `queryUUID` in all places.

##########
File path: solr/core/src/java/org/apache/solr/handler/component/ResponseBuilder.java
##########
@@ -65,12 +65,20 @@
   public boolean doAnalytics;
   public MergeStrategy mergeFieldHandler;
 
+  public String queryID;
+
   private boolean needDocList = false;
   private boolean needDocSet = false;
   private int fieldFlags = 0;
   //private boolean debug = false;
   private boolean debugTimings, debugQuery, debugResults, debugTrack;
 
+  private boolean isCancellation;
+  private String cancellationUUID;
+
+  private boolean isTaskListRequest;
+  private String taskStatusCheckUUID;

Review comment:
       These could be probably limited to just a presence / absence of the UUID, without the additional boolean flag - just check that the corresponding UUID exists, and this will imply that the flag is turned on.

##########
File path: solr/solrj/src/java/org/apache/solr/common/params/ShardParams.java
##########
@@ -28,6 +28,9 @@
 public interface ShardParams {
   /** the shards to use (distributed configuration) */
   String SHARDS = "shards";
+
+  /** UUID of the query */
+  String QUERY_ID = "queryID";

Review comment:
       Why not `queryUUID` (and reference the same constant as in other places)?

##########
File path: solr/core/src/java/org/apache/solr/search/SolrIndexSearcher.java
##########
@@ -224,6 +232,11 @@ private Collector buildAndRunCollectorChain(QueryResult qr, Query query, Collect
     if (collector instanceof DelegatingCollector) {
       ((DelegatingCollector) collector).finish();
     }
+
+    if (cmd.isQueryCancellable()) {

Review comment:
       This code should go under the `finally` section to make sure we always properly handle de-registration of queries even if they throw exceptions.

##########
File path: solr/solrj/src/java/org/apache/solr/common/params/CommonParams.java
##########
@@ -160,6 +160,26 @@
    */
   String TIME_ALLOWED = "timeAllowed";
 
+  /**
+   * Is the query cancellable?
+   */
+  String IS_QUERY_CANCELLABLE = "canCancel";
+
+  /**
+   * Custom query UUID if provided.
+   */
+  String CUSTOM_QUERY_UUID = "queryUUID";
+
+  /**
+   * UUID for query to be cancelled
+   */
+  String QUERY_CANCELLATION_UUID = "cancelUUID";
+
+  /**
+   * UUID of the task whose status is to be checked
+   */
+  String TASK_CHECK_UUID = "taskUUID";

Review comment:
       We may leave this as is (although I would be tempted to use `queryUUID` here too) if you think we will have other types of cancellable tasks in the future.

##########
File path: solr/core/src/java/org/apache/solr/handler/component/ActiveTasksListComponent.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.component;
+
+import org.apache.solr.common.util.NamedList;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+/** List the active tasks that can be cancelled */
+public class ActiveTasksListComponent extends SearchComponent {
+    public static final String COMPONENT_NAME = "activetaskslistcomponent";
+
+    private boolean shouldProcess;
+
+    @Override
+    public void prepare(ResponseBuilder rb) throws IOException {
+        if (rb.isTaskListRequest()) {
+            shouldProcess = true;
+        }
+    }
+
+    @Override
+    public void process(ResponseBuilder rb) {
+        if (!shouldProcess) {
+            return;
+        }
+
+        if (rb.getTaskStatusCheckUUID() != null) {
+            boolean isActiveOnThisShard = rb.req.getCore().isQueryIdActive(rb.getTaskStatusCheckUUID());
+
+            rb.rsp.add("taskStatus", isActiveOnThisShard);
+            return;
+        }
+
+        NamedList<String> temp = new NamedList<>();
+
+        Iterator<Map.Entry<String, String>> iterator = rb.req.getCore().getActiveQueriesGenerated();
+
+        while (iterator.hasNext()) {
+            Map.Entry<String, String> entry = iterator.next();
+            temp.add(entry.getKey(), entry.getValue());
+        }
+
+        rb.rsp.add("taskList", temp);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void handleResponses(ResponseBuilder rb, ShardRequest sreq) {
+        if (!shouldProcess) {
+            return;
+        }
+
+        NamedList<String> resultList = new NamedList<>();
+
+        for (ShardResponse r : sreq.responses) {
+
+            if (rb.getTaskStatusCheckUUID() != null) {
+                boolean isTaskActiveOnShard = (boolean) r.getSolrResponse().getResponse().get("taskStatus");
+
+                if (isTaskActiveOnShard == true) {
+                    rb.rsp.getValues().add("taskStatus", rb.getTaskStatusCheckUUID() + ":" + isTaskActiveOnShard);
+                    return;
+                } else {
+                    continue;
+                }
+            }
+
+            NamedList<String> result = (NamedList<String>) r.getSolrResponse()
+                    .getResponse().get("taskList");
+
+            Iterator<Map.Entry<String, String>> iterator = result.iterator();
+
+            while (iterator.hasNext()) {
+                Map.Entry<String, String> entry = iterator.next();
+
+                resultList.add(entry.getKey(), entry.getValue());
+            }
+        }
+
+        if (rb.getTaskStatusCheckUUID() != null) {
+            // We got here with the specific taskID check being specified -- this means that the taskID was not
+            // found in active tasks on any shard
+            rb.rsp.getValues().add("taskStatus", rb.getTaskStatusCheckUUID() + ":" + false);
+            return;
+        }
+
+        rb.rsp.getValues().add("taskList", resultList);
+    }
+
+    @Override
+    public String getDescription() {
+        return "activetaskslist";

Review comment:
       CamelCase? or just use a human-readable description.

##########
File path: solr/core/src/java/org/apache/solr/handler/component/TaskManagementHandler.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.component;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.ShardParams;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.RequestHandlerBase;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.security.PermissionNameProvider;
+import org.apache.solr.util.plugin.SolrCoreAware;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.solr.common.params.CommonParams.DISTRIB;
+import static org.apache.solr.common.params.CommonParams.PATH;
+
+/**
+ * Abstract class which serves as the root of all task managing handlers
+ */
+public abstract class TaskManagementHandler extends RequestHandlerBase implements SolrCoreAware, PermissionNameProvider {
+    private ShardHandlerFactory shardHandlerFactory;
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public abstract void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception;
+
+    @Override
+    public void inform(SolrCore core) {
+        this.shardHandlerFactory = core.getCoreContainer().getShardHandlerFactory();
+    }
+
+    /**
+     * Process the actual request.
+     * extraParams is required for allowing sub handlers to pass in custom parameters to be put in the
+     * outgoing shard request
+     */
+    protected void processRequest(SolrQueryRequest req, ResponseBuilder rb, Map<String, String> extraParams) throws IOException {
+        ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+        List<SearchComponent> components = rb.components;
+
+        shardHandler.prepDistributed(rb);
+
+        for(SearchComponent c : components) {
+            c.prepare(rb);
+        }
+
+        if (!rb.isDistrib) {
+            for (SearchComponent component : components) {
+                component.process(rb);
+            }
+        } else {
+            ShardRequest sreq = new ShardRequest();
+
+            // Distribute to all shards
+            sreq.shards = rb.shards;
+            sreq.actualShards = sreq.shards;
+
+            sreq.responses = new ArrayList<>(sreq.actualShards.length);
+            rb.finished = new ArrayList<>();
+
+            for (String shard : sreq.actualShards) {
+                ModifiableSolrParams params = new ModifiableSolrParams(sreq.params);
+                String reqPath = (String) req.getContext().get(PATH);
+
+                params.set(CommonParams.QT, reqPath);
+                params.remove(ShardParams.SHARDS);      // not a top-level request
+                params.set(DISTRIB, "false");               // not a top-level request
+                params.remove("indent");
+                params.remove(CommonParams.HEADER_ECHO_PARAMS);
+                params.set(ShardParams.IS_SHARD, true);  // a sub (shard) request
+                params.set(ShardParams.SHARDS_PURPOSE, sreq.purpose);
+                params.set(ShardParams.SHARD_URL, shard); // so the shard knows what was asked
+                params.set(CommonParams.OMIT_HEADER, false);
+
+                if (extraParams != null) {
+                    Iterator<Map.Entry<String, String>> iterator = extraParams.entrySet().iterator();
+
+                    while (iterator.hasNext()) {
+                        Map.Entry<String, String> entry = iterator.next();
+

Review comment:
       Too much whitespace..

##########
File path: solr/core/src/java/org/apache/solr/handler/component/ActiveTasksListComponent.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.handler.component;
+
+import org.apache.solr.common.util.NamedList;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+/** List the active tasks that can be cancelled */
+public class ActiveTasksListComponent extends SearchComponent {
+    public static final String COMPONENT_NAME = "activetaskslistcomponent";
+
+    private boolean shouldProcess;
+
+    @Override
+    public void prepare(ResponseBuilder rb) throws IOException {
+        if (rb.isTaskListRequest()) {
+            shouldProcess = true;
+        }
+    }
+
+    @Override
+    public void process(ResponseBuilder rb) {
+        if (!shouldProcess) {
+            return;
+        }
+
+        if (rb.getTaskStatusCheckUUID() != null) {
+            boolean isActiveOnThisShard = rb.req.getCore().isQueryIdActive(rb.getTaskStatusCheckUUID());
+
+            rb.rsp.add("taskStatus", isActiveOnThisShard);
+            return;
+        }
+
+        NamedList<String> temp = new NamedList<>();
+
+        Iterator<Map.Entry<String, String>> iterator = rb.req.getCore().getActiveQueriesGenerated();
+
+        while (iterator.hasNext()) {
+            Map.Entry<String, String> entry = iterator.next();
+            temp.add(entry.getKey(), entry.getValue());
+        }
+
+        rb.rsp.add("taskList", temp);

Review comment:
       you could use `MapWriter` here to avoid allocating the NamedList.

##########
File path: solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java
##########
@@ -435,6 +435,9 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throw
               params.set(ShardParams.SHARDS_PURPOSE, sreq.purpose);
               params.set(ShardParams.SHARD_URL, shard); // so the shard knows what was asked
               params.set(CommonParams.OMIT_HEADER, false);
+
+              // Distributed request -- need to send queryID as a part of the distributed request
+              params.set(ShardParams.QUERY_ID, rb.queryID);

Review comment:
       `params.setNonNull`.

##########
File path: lucene/core/src/java/org/apache/lucene/search/CancellableCollector.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.search;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.lucene.index.LeafReaderContext;
+
+/** Allows a query to be cancelled */
+public class CancellableCollector implements Collector, CancellableTask {
+
+  /** Thrown when a query gets cancelled */
+  public static class QueryCancelledException extends RuntimeException {}
+
+  private Collector collector;
+  private AtomicBoolean isQueryCancelled;
+
+  public CancellableCollector(Collector collector) {
+    if (collector == null) {

Review comment:
       Maybe use `Objects.requireNonNull` instead?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@lucene.apache.org
For additional commands, e-mail: issues-help@lucene.apache.org