You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by an...@apache.org on 2022/07/15 22:07:55 UTC
[solr-sandbox] branch crossdc-wip updated: Inefficient DBQ (#31)
This is an automated email from the ASF dual-hosted git repository.
anshum pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/crossdc-wip by this push:
new 0968329 Inefficient DBQ (#31)
0968329 is described below
commit 0968329eaca2a06e749d443efac98645e36233e1
Author: Mark Robert Miller <ma...@apache.org>
AuthorDate: Fri Jul 15 17:07:50 2022 -0500
Inefficient DBQ (#31)
---
.../update/processor/MirroringUpdateProcessor.java | 72 +++++++-
.../apache/solr/crossdc/DeleteByQueryToIdTest.java | 189 +++++++++++++++++++++
2 files changed, 257 insertions(+), 4 deletions(-)
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
index cdec627..fecd445 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
@@ -1,15 +1,21 @@
package org.apache.solr.update.processor;
+import org.apache.http.client.HttpClient;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.*;
-import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.ShardParams;
-import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.params.*;
import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.schema.IndexSchema;
+import org.apache.solr.schema.SchemaField;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
@@ -19,6 +25,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
@@ -92,6 +100,48 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
}
@Override public void processDelete(final DeleteUpdateCommand cmd) throws IOException {
+ if (doMirroring && !cmd.isDeleteById() && !"*:*".equals(cmd.query)) {
+
+ CloudDescriptor cloudDesc =
+ cmd.getReq().getCore().getCoreDescriptor().getCloudDescriptor();
+ String collection = cloudDesc.getCollectionName();
+
+ HttpClient httpClient = cmd.getReq().getCore().getCoreContainer().getUpdateShardHandler().getDefaultHttpClient();
+
+ try (HttpSolrClient client =
+ new HttpSolrClient.Builder(cmd.getReq().getCore().getCoreContainer().getZkController().getBaseUrl()).withHttpClient(httpClient).build()) {
+
+ String uniqueField = cmd.getReq().getSchema().getUniqueKeyField().getName();
+
+ int rows = Integer.getInteger("solr.crossdc.dbq_rows", 1000);
+ SolrQuery q = new SolrQuery(cmd.query).setRows(rows).setSort(SolrQuery.SortClause.asc(uniqueField)).setFields(uniqueField);
+ String cursorMark = CursorMarkParams.CURSOR_MARK_START;
+
+ int cnt = 1;
+ boolean done = false;
+ while (!done) {
+ q.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
+ QueryResponse rsp =
+ client.query(collection, q);
+ String nextCursorMark = rsp.getNextCursorMark();
+
+ if (log.isDebugEnabled()) {
+ log.debug("resp: cm={}, ncm={}, cnt={}, results={} ", cursorMark, nextCursorMark, cnt++,
+ rsp.getResults());
+ }
+
+ processDBQResults(client, collection, uniqueField, rsp);
+ if (cursorMark.equals(nextCursorMark)) {
+ done = true;
+ }
+ cursorMark = nextCursorMark;
+ }
+ } catch (SolrServerException e) {
+ throw new SolrException(SERVER_ERROR, e);
+ }
+
+ return;
+ }
super.processDelete(cmd); // let this throw to prevent mirroring invalid requests
if (doMirroring) {
@@ -111,6 +161,7 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
// In general there's no way to guarantee that these run identically on the mirror since there are no
// external doc versions.
// TODO: Can we actually support this considering DBQs aren't versioned.
+
if (distribPhase == DistributedUpdateProcessor.DistribPhase.NONE) {
createAndOrGetMirrorRequest().deleteByQuery(cmd.query);
}
@@ -121,6 +172,19 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
}
}
+ private void processDBQResults(SolrClient client, String collection, String uniqueField, QueryResponse rsp)
+ throws SolrServerException, IOException {
+ SolrDocumentList results = rsp.getResults();
+ List<String> ids = new ArrayList<>();
+ results.forEach(entries -> {
+ String id = entries.getFirstValue(uniqueField).toString();
+ ids.add(id);
+ });
+ if (ids.size() > 0) {
+ client.deleteById(collection, ids);
+ }
+ }
+
private boolean isLeader(SolrQueryRequest req, String id, String route, SolrInputDocument doc) {
CloudDescriptor cloudDesc =
req.getCore().getCoreDescriptor().getCloudDescriptor();
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
new file mode 100644
index 0000000..0f5c823
--- /dev/null
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
@@ -0,0 +1,189 @@
+package org.apache.solr.crossdc;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.lucene.util.QuickPatchThreadsFilter;
+import org.apache.solr.SolrIgnoredThreadsFilter;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.crossdc.consumer.Consumer;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.lang.invoke.MethodHandles;
+import java.util.Properties;
+
+@ThreadLeakFilters(defaultFilters = true, filters = { SolrIgnoredThreadsFilter.class,
+ QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class })
+@Ignore
+@ThreadLeakLingering(linger = 5000) public class DeleteByQueryToIdTest extends
+ SolrTestCaseJ4 {
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ static final String VERSION_FIELD = "_version_";
+
+ private static final int NUM_BROKERS = 1;
+ public static EmbeddedKafkaCluster kafkaCluster;
+
+ protected static volatile MiniSolrCloudCluster solrCluster1;
+ protected static volatile MiniSolrCloudCluster solrCluster2;
+
+ protected static volatile Consumer consumer = new Consumer();
+
+ private static String TOPIC = "topic1";
+
+ private static String COLLECTION = "collection1";
+
+ @BeforeClass
+ public static void beforeSolrAndKafkaIntegrationTest() throws Exception {
+
+ System.setProperty("solr.crossdc.dbq_rows", "1");
+
+ Properties config = new Properties();
+ config.put("unclean.leader.election.enable", "true");
+ config.put("enable.partition.eof", "false");
+
+ kafkaCluster = new EmbeddedKafkaCluster(NUM_BROKERS, config) {
+ public String bootstrapServers() {
+ return super.bootstrapServers().replaceAll("localhost", "127.0.0.1");
+ }
+ };
+ kafkaCluster.start();
+
+ kafkaCluster.createTopic(TOPIC, 1, 1);
+
+ // System.setProperty("topicName", null);
+ // System.setProperty("bootstrapServers", null);
+
+ Properties props = new Properties();
+
+ solrCluster1 = new SolrCloudTestCase.Builder(1, createTempDir()).addConfig("conf",
+ getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
+
+ props.setProperty("topicName", TOPIC);
+ props.setProperty("bootstrapServers", kafkaCluster.bootstrapServers());
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ props.store(baos, "");
+ byte[] data = baos.toByteArray();
+ solrCluster1.getSolrClient().getZkStateReader().getZkClient().makePath("/crossdc.properties", data, true);
+
+ CollectionAdminRequest.Create create =
+ CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
+ solrCluster1.getSolrClient().request(create);
+ solrCluster1.waitForActiveCollection(COLLECTION, 1, 1);
+
+ solrCluster1.getSolrClient().setDefaultCollection(COLLECTION);
+
+ solrCluster2 = new SolrCloudTestCase.Builder(1, createTempDir()).addConfig("conf",
+ getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
+
+ solrCluster2.getSolrClient().getZkStateReader().getZkClient().makePath("/crossdc.properties", data, true);
+
+ CollectionAdminRequest.Create create2 =
+ CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
+ solrCluster2.getSolrClient().request(create2);
+ solrCluster2.waitForActiveCollection(COLLECTION, 1, 1);
+
+ solrCluster2.getSolrClient().setDefaultCollection(COLLECTION);
+
+ String bootstrapServers = kafkaCluster.bootstrapServers();
+ log.info("bootstrapServers={}", bootstrapServers);
+
+ consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, false, 0);
+
+ }
+
+ @AfterClass
+ public static void afterSolrAndKafkaIntegrationTest() throws Exception {
+ ObjectReleaseTracker.clear();
+
+ consumer.shutdown();
+
+ try {
+ kafkaCluster.stop();
+ } catch (Exception e) {
+ log.error("Exception stopping Kafka cluster", e);
+ }
+
+ if (solrCluster1 != null) {
+ solrCluster1.getZkServer().getZkClient().printLayoutToStdOut();
+ solrCluster1.shutdown();
+ }
+ if (solrCluster2 != null) {
+ solrCluster2.getZkServer().getZkClient().printLayoutToStdOut();
+ solrCluster2.shutdown();
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+ solrCluster1.getSolrClient().deleteByQuery("*:*");
+ solrCluster2.getSolrClient().deleteByQuery("*:*");
+ solrCluster1.getSolrClient().commit();
+ solrCluster2.getSolrClient().commit();
+ }
+
+ public void testDBQ() throws Exception {
+
+ CloudSolrClient client = solrCluster1.getSolrClient();
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", String.valueOf(System.nanoTime()));
+ doc.addField("text", "some test");
+ client.add(doc);
+
+ SolrInputDocument doc2 = new SolrInputDocument();
+ doc2.addField("id", String.valueOf(System.nanoTime()));
+ doc2.addField("text", "some test two");
+ client.add(doc2);
+
+ SolrInputDocument doc3= new SolrInputDocument();
+ doc3.addField("id", String.valueOf(System.nanoTime()));
+ doc3.addField("text", "two of a kind");
+ client.add(doc3);
+
+ SolrInputDocument doc4= new SolrInputDocument();
+ doc4.addField("id", String.valueOf(System.nanoTime()));
+ doc4.addField("text", "one two three");
+ client.add(doc4);
+
+ client.commit(COLLECTION);
+
+ client.deleteByQuery("two");
+
+ client.commit(COLLECTION);
+
+ QueryResponse results = null;
+ boolean foundUpdates = false;
+ for (int i = 0; i < 50; i++) {
+ solrCluster2.getSolrClient().commit(COLLECTION);
+ solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+ results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+ if (results.getResults().getNumFound() == 1) {
+ foundUpdates = true;
+ } else {
+ Thread.sleep(500);
+ }
+ }
+
+ assertTrue("results=" + results, foundUpdates);
+ System.out.println("Rest: " + results);
+
+ }
+
+}