You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by an...@apache.org on 2022/07/15 22:07:55 UTC

[solr-sandbox] branch crossdc-wip updated: Inefficient DBQ (#31)

This is an automated email from the ASF dual-hosted git repository.

anshum pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/crossdc-wip by this push:
     new 0968329  Inefficient DBQ (#31)
0968329 is described below

commit 0968329eaca2a06e749d443efac98645e36233e1
Author: Mark Robert Miller <ma...@apache.org>
AuthorDate: Fri Jul 15 17:07:50 2022 -0500

    Inefficient DBQ (#31)
---
 .../update/processor/MirroringUpdateProcessor.java |  72 +++++++-
 .../apache/solr/crossdc/DeleteByQueryToIdTest.java | 189 +++++++++++++++++++++
 2 files changed, 257 insertions(+), 4 deletions(-)

diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
index cdec627..fecd445 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
@@ -1,15 +1,21 @@
 package org.apache.solr.update.processor;
 
+import org.apache.http.client.HttpClient;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.*;
-import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.ShardParams;
-import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.params.*;
 import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.schema.IndexSchema;
+import org.apache.solr.schema.SchemaField;
 import org.apache.solr.update.AddUpdateCommand;
 import org.apache.solr.update.CommitUpdateCommand;
 import org.apache.solr.update.DeleteUpdateCommand;
@@ -19,6 +25,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
 
 import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
 
@@ -92,6 +100,48 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
   }
 
   @Override public void processDelete(final DeleteUpdateCommand cmd) throws IOException {
+    if (doMirroring && !cmd.isDeleteById() && !"*:*".equals(cmd.query)) {
+
+      CloudDescriptor cloudDesc =
+          cmd.getReq().getCore().getCoreDescriptor().getCloudDescriptor();
+      String collection = cloudDesc.getCollectionName();
+
+      HttpClient httpClient = cmd.getReq().getCore().getCoreContainer().getUpdateShardHandler().getDefaultHttpClient();
+
+      try (HttpSolrClient client =
+          new HttpSolrClient.Builder(cmd.getReq().getCore().getCoreContainer().getZkController().getBaseUrl()).withHttpClient(httpClient).build()) {
+
+        String uniqueField = cmd.getReq().getSchema().getUniqueKeyField().getName();
+
+        int rows = Integer.getInteger("solr.crossdc.dbq_rows", 1000);
+        SolrQuery q = new SolrQuery(cmd.query).setRows(rows).setSort(SolrQuery.SortClause.asc(uniqueField)).setFields(uniqueField);
+        String cursorMark = CursorMarkParams.CURSOR_MARK_START;
+
+        int cnt = 1;
+        boolean done = false;
+        while (!done) {
+          q.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
+          QueryResponse rsp =
+              client.query(collection, q);
+          String nextCursorMark = rsp.getNextCursorMark();
+
+          if (log.isDebugEnabled()) {
+            log.debug("resp: cm={}, ncm={}, cnt={}, results={} ", cursorMark, nextCursorMark, cnt++,
+                rsp.getResults());
+          }
+
+          processDBQResults(client, collection, uniqueField, rsp);
+          if (cursorMark.equals(nextCursorMark)) {
+            done = true;
+          }
+          cursorMark = nextCursorMark;
+        }
+      } catch (SolrServerException e) {
+        throw new SolrException(SERVER_ERROR, e);
+      }
+
+      return;
+    }
     super.processDelete(cmd); // let this throw to prevent mirroring invalid requests
 
     if (doMirroring) {
@@ -111,6 +161,7 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
         // In general there's no way to guarantee that these run identically on the mirror since there are no
         // external doc versions.
         // TODO: Can we actually support this considering DBQs aren't versioned.
+
         if (distribPhase == DistributedUpdateProcessor.DistribPhase.NONE) {
           createAndOrGetMirrorRequest().deleteByQuery(cmd.query);
         }
@@ -121,6 +172,19 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
     }
   }
 
+  private void processDBQResults(SolrClient client, String collection, String uniqueField, QueryResponse rsp)
+      throws SolrServerException, IOException {
+    SolrDocumentList results = rsp.getResults();
+    List<String> ids = new ArrayList<>();
+    results.forEach(entries -> {
+      String id = entries.getFirstValue(uniqueField).toString();
+      ids.add(id);
+    });
+    if (ids.size() > 0) {
+      client.deleteById(collection, ids);
+    }
+  }
+
   private boolean isLeader(SolrQueryRequest req, String id, String route, SolrInputDocument doc) {
     CloudDescriptor cloudDesc =
         req.getCore().getCoreDescriptor().getCloudDescriptor();
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
new file mode 100644
index 0000000..0f5c823
--- /dev/null
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
@@ -0,0 +1,189 @@
+package org.apache.solr.crossdc;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.lucene.util.QuickPatchThreadsFilter;
+import org.apache.solr.SolrIgnoredThreadsFilter;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.crossdc.consumer.Consumer;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.lang.invoke.MethodHandles;
+import java.util.Properties;
+
+@ThreadLeakFilters(defaultFilters = true, filters = { SolrIgnoredThreadsFilter.class,
+    QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class })
+@Ignore
+@ThreadLeakLingering(linger = 5000) public class DeleteByQueryToIdTest extends
+    SolrTestCaseJ4 {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  static final String VERSION_FIELD = "_version_";
+
+  private static final int NUM_BROKERS = 1;
+  public static EmbeddedKafkaCluster kafkaCluster;
+
+  protected static volatile MiniSolrCloudCluster solrCluster1;
+  protected static volatile MiniSolrCloudCluster solrCluster2;
+
+  protected static volatile Consumer consumer = new Consumer();
+
+  private static String TOPIC = "topic1";
+
+  private static String COLLECTION = "collection1";
+
+  @BeforeClass
+  public static void beforeSolrAndKafkaIntegrationTest() throws Exception {
+
+    System.setProperty("solr.crossdc.dbq_rows", "1");
+
+    Properties config = new Properties();
+    config.put("unclean.leader.election.enable", "true");
+    config.put("enable.partition.eof", "false");
+
+    kafkaCluster = new EmbeddedKafkaCluster(NUM_BROKERS, config) {
+      public String bootstrapServers() {
+        return super.bootstrapServers().replaceAll("localhost", "127.0.0.1");
+      }
+    };
+    kafkaCluster.start();
+
+    kafkaCluster.createTopic(TOPIC, 1, 1);
+
+    // System.setProperty("topicName", null);
+    // System.setProperty("bootstrapServers", null);
+
+    Properties props = new Properties();
+
+    solrCluster1 = new SolrCloudTestCase.Builder(1, createTempDir()).addConfig("conf",
+        getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
+
+    props.setProperty("topicName", TOPIC);
+    props.setProperty("bootstrapServers", kafkaCluster.bootstrapServers());
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    props.store(baos, "");
+    byte[] data = baos.toByteArray();
+    solrCluster1.getSolrClient().getZkStateReader().getZkClient().makePath("/crossdc.properties", data, true);
+
+    CollectionAdminRequest.Create create =
+        CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
+    solrCluster1.getSolrClient().request(create);
+    solrCluster1.waitForActiveCollection(COLLECTION, 1, 1);
+
+    solrCluster1.getSolrClient().setDefaultCollection(COLLECTION);
+
+    solrCluster2 = new SolrCloudTestCase.Builder(1, createTempDir()).addConfig("conf",
+        getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
+
+    solrCluster2.getSolrClient().getZkStateReader().getZkClient().makePath("/crossdc.properties", data, true);
+
+    CollectionAdminRequest.Create create2 =
+        CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
+    solrCluster2.getSolrClient().request(create2);
+    solrCluster2.waitForActiveCollection(COLLECTION, 1, 1);
+
+    solrCluster2.getSolrClient().setDefaultCollection(COLLECTION);
+
+    String bootstrapServers = kafkaCluster.bootstrapServers();
+    log.info("bootstrapServers={}", bootstrapServers);
+
+    consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, false, 0);
+
+  }
+
+  @AfterClass
+  public static void afterSolrAndKafkaIntegrationTest() throws Exception {
+    ObjectReleaseTracker.clear();
+
+    consumer.shutdown();
+
+    try {
+      kafkaCluster.stop();
+    } catch (Exception e) {
+      log.error("Exception stopping Kafka cluster", e);
+    }
+
+    if (solrCluster1 != null) {
+      solrCluster1.getZkServer().getZkClient().printLayoutToStdOut();
+      solrCluster1.shutdown();
+    }
+    if (solrCluster2 != null) {
+      solrCluster2.getZkServer().getZkClient().printLayoutToStdOut();
+      solrCluster2.shutdown();
+    }
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    super.tearDown();
+    solrCluster1.getSolrClient().deleteByQuery("*:*");
+    solrCluster2.getSolrClient().deleteByQuery("*:*");
+    solrCluster1.getSolrClient().commit();
+    solrCluster2.getSolrClient().commit();
+  }
+
+  public void testDBQ() throws Exception {
+
+    CloudSolrClient client = solrCluster1.getSolrClient();
+    SolrInputDocument doc = new SolrInputDocument();
+    doc.addField("id", String.valueOf(System.nanoTime()));
+    doc.addField("text", "some test");
+    client.add(doc);
+
+    SolrInputDocument doc2 = new SolrInputDocument();
+    doc2.addField("id", String.valueOf(System.nanoTime()));
+    doc2.addField("text", "some test two");
+    client.add(doc2);
+
+    SolrInputDocument doc3= new SolrInputDocument();
+    doc3.addField("id", String.valueOf(System.nanoTime()));
+    doc3.addField("text", "two of a kind");
+    client.add(doc3);
+
+    SolrInputDocument doc4= new SolrInputDocument();
+    doc4.addField("id", String.valueOf(System.nanoTime()));
+    doc4.addField("text", "one two three");
+    client.add(doc4);
+
+    client.commit(COLLECTION);
+
+    client.deleteByQuery("two");
+
+    client.commit(COLLECTION);
+
+    QueryResponse results = null;
+    boolean foundUpdates = false;
+    for (int i = 0; i < 50; i++) {
+      solrCluster2.getSolrClient().commit(COLLECTION);
+      solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+      results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+      if (results.getResults().getNumFound() == 1) {
+        foundUpdates = true;
+      } else {
+        Thread.sleep(500);
+      }
+    }
+
+    assertTrue("results=" + results, foundUpdates);
+    System.out.println("Rest: " + results);
+
+  }
+
+}