You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by an...@apache.org on 2022/09/29 15:48:08 UTC

[solr-sandbox] branch crossdc-wip updated: Check docSize against max before mirroring (#45)

This is an automated email from the ASF dual-hosted git repository.

anshum pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/crossdc-wip by this push:
     new c40007c  Check docSize against max before mirroring (#45)
c40007c is described below

commit c40007c808b8217945a903d967333624ca4df5d3
Author: Jason Gerlowski <ge...@apache.org>
AuthorDate: Thu Sep 29 11:48:03 2022 -0400

    Check docSize against max before mirroring (#45)
    
    * Check docSize against max before mirroring
    
    Prior to this commit, the MirroringUpdateProcessor had no check to
    ensure that docs weren't running afoul of the batch size limit set at
    the Kafka level.
    
    This commit changes this to ensure that docs exceeding this limit are
    not mirrored.  These offending docs may still be indexed, based on the
    value of the URP's "indexUnmirrorableDocs" config property (which
    defaults to 'false' if not set).
---
 .../update/processor/MirroringUpdateProcessor.java | 127 ++++++++++++++++++++-
 .../MirroringUpdateRequestProcessorFactory.java    |  14 ++-
 .../solr/crossdc/SolrAndKafkaIntegrationTest.java  | 107 +++++++++++++----
 .../configs/cloud-minimal/conf/solrconfig.xml      |   1 +
 4 files changed, 217 insertions(+), 32 deletions(-)

diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
index 758ca8e..9b19ab4 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
@@ -11,6 +11,7 @@ import org.apache.solr.cloud.CloudDescriptor;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.SolrInputField;
 import org.apache.solr.common.cloud.*;
 import org.apache.solr.common.params.*;
 import org.apache.solr.request.SolrQueryRequest;
@@ -24,7 +25,10 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.IdentityHashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
 
@@ -46,6 +50,14 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
   private UpdateRequest mirrorRequest;
   private final SolrParams mirrorParams;
 
+
+  /**
+   * Controls whether docs exceeding the max-size (and thus cannot be mirrored) are indexed locally.
+   */
+  private final boolean indexUnmirrorableDocs;
+  private final long maxDocSizeBytes;
+
+
   /**
    * The distributed processor downstream from us so we can establish if we're running on a leader shard
    */
@@ -57,11 +69,15 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
   private DistributedUpdateProcessor.DistribPhase distribPhase;
 
   public MirroringUpdateProcessor(final UpdateRequestProcessor next, boolean doMirroring,
+      final boolean indexUnmirrorableDocs,
+      final long maxDocSizeBytes,
       final SolrParams mirroredReqParams,
       final DistributedUpdateProcessor.DistribPhase distribPhase,
       final RequestMirroringHandler requestMirroringHandler) {
     super(next);
     this.doMirroring = doMirroring;
+    this.indexUnmirrorableDocs = indexUnmirrorableDocs;
+    this.maxDocSizeBytes = maxDocSizeBytes;
     this.mirrorParams = mirroredReqParams;
     this.distribPhase = distribPhase;
     this.requestMirroringHandler = requestMirroringHandler;
@@ -83,14 +99,23 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
 
   @Override public void processAdd(final AddUpdateCommand cmd) throws IOException {
 
-    super.processAdd(cmd); // let this throw to prevent mirroring invalid reqs
+    final SolrInputDocument doc = cmd.getSolrInputDocument().deepCopy();
+    doc.removeField(CommonParams.VERSION_FIELD); // strip internal doc version
+    final boolean tooLargeForKafka = exceedsMaxDocSize(doc);
+    if (tooLargeForKafka && !indexUnmirrorableDocs) {
+      log.warn("Skipping indexing of doc {} as it exceeds the doc-size limit ({} bytes) and is unmirrorable.", cmd.getPrintableId(), maxDocSizeBytes);
+    } else {
+      super.processAdd(cmd); // let this throw to prevent mirroring invalid reqs
+    }
 
     // submit only from the leader shards so we mirror each doc once
     boolean isLeader = isLeader(cmd.getReq(),  cmd.getIndexedIdStr(), null, cmd.getSolrInputDocument());
     if (doMirroring && isLeader) {
-      SolrInputDocument doc = cmd.getSolrInputDocument().deepCopy();
-      doc.removeField(CommonParams.VERSION_FIELD); // strip internal doc version
-      createAndOrGetMirrorRequest().add(doc, cmd.commitWithin, cmd.overwrite);
+      if (tooLargeForKafka) {
+        log.warn("Skipping mirroring of doc {} because estimated size exceeds batch size limit {} bytes", maxDocSizeBytes);
+      } else {
+        createAndOrGetMirrorRequest().add(doc, cmd.commitWithin, cmd.overwrite);
+      }
     }
 
     if (log.isDebugEnabled())
@@ -170,6 +195,11 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
     }
   }
 
+  private boolean exceedsMaxDocSize(SolrInputDocument doc) {
+    final long estimatedSizeInBytes = ObjectSizeEstimator.estimate(doc);
+    return estimatedSizeInBytes > maxDocSizeBytes;
+  }
+
   private static void processDBQResults(SolrClient client, String collection, String uniqueField,
       QueryResponse rsp)
       throws SolrServerException, IOException {
@@ -237,4 +267,93 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
       }
     }
   }
+
+  // package private for testing
+  static class ObjectSizeEstimator {
+    /**
+     * Sizes of primitive classes.
+     */
+    private static final Map<Class<?>,Integer> primitiveSizes = new IdentityHashMap<>();
+    static {
+      primitiveSizes.put(boolean.class, 1);
+      primitiveSizes.put(Boolean.class, 1);
+      primitiveSizes.put(byte.class, 1);
+      primitiveSizes.put(Byte.class, 1);
+      primitiveSizes.put(char.class, Character.BYTES);
+      primitiveSizes.put(Character.class, Character.BYTES);
+      primitiveSizes.put(short.class, Short.BYTES);
+      primitiveSizes.put(Short.class, Short.BYTES);
+      primitiveSizes.put(int.class, Integer.BYTES);
+      primitiveSizes.put(Integer.class, Integer.BYTES);
+      primitiveSizes.put(float.class, Float.BYTES);
+      primitiveSizes.put(Float.class, Float.BYTES);
+      primitiveSizes.put(double.class, Double.BYTES);
+      primitiveSizes.put(Double.class, Double.BYTES);
+      primitiveSizes.put(long.class, Long.BYTES);
+      primitiveSizes.put(Long.class, Long.BYTES);
+    }
+
+    public static long estimate(SolrInputDocument doc) {
+      if (doc == null) return 0L;
+      long size = 0;
+      for (SolrInputField inputField : doc.values()) {
+        size += primitiveEstimate(inputField.getName(), 0L);
+        size += estimate(inputField.getValue());
+      }
+
+      if (doc.hasChildDocuments()) {
+        for (SolrInputDocument childDoc : doc.getChildDocuments()) {
+          size += estimate(childDoc);
+        }
+      }
+      return size;
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    static long estimate(Object obj) {
+      if (obj instanceof SolrInputDocument) {
+        return estimate((SolrInputDocument) obj);
+      }
+
+      if (obj instanceof Map) {
+        return estimate((Map) obj);
+      }
+
+      if (obj instanceof Collection) {
+        return estimate((Collection) obj);
+      }
+
+      return primitiveEstimate(obj, 0L);
+    }
+
+    private static long primitiveEstimate(Object obj, long def) {
+      Class<?> clazz = obj.getClass();
+      if (clazz.isPrimitive()) {
+        return primitiveSizes.get(clazz);
+      }
+      if (obj instanceof String) {
+        return ((String) obj).length() * Character.BYTES;
+      }
+      return def;
+    }
+
+    private static long estimate(Map<Object, Object> map) {
+      if (map.isEmpty()) return 0;
+      long size = 0;
+      for (Map.Entry<Object, Object> entry : map.entrySet()) {
+        size += primitiveEstimate(entry.getKey(), 0L);
+        size += estimate(entry.getValue());
+      }
+      return size;
+    }
+
+    private static long estimate(@SuppressWarnings({"rawtypes"})Collection collection) {
+      if (collection.isEmpty()) return 0;
+      long size = 0;
+      for (Object obj : collection) {
+        size += estimate(obj);
+      }
+      return size;
+    }
+  }
 }
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
index c045f86..d0ec65a 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -71,18 +71,25 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
 
 
     private boolean enabled = true;
+    private boolean indexUnmirrorableDocs = false;
+    private KafkaCrossDcConf conf;
 
     private final Map<String,Object> properties = new HashMap<>();
 
     @Override
     public void init(final NamedList args) {
         super.init(args);
-        Boolean enabled = args.getBooleanArg("enabled");
 
+        Boolean enabled = args.getBooleanArg("enabled");
         if (enabled != null && !enabled) {
             this.enabled = false;
         }
 
+        final Boolean indexUnmirrorableDocsArg = args.getBooleanArg("indexUnmirrorableDocs");
+        if (indexUnmirrorableDocsArg != null && indexUnmirrorableDocsArg) {
+            this.indexUnmirrorableDocs = true;
+        }
+
         for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
             String val = args._getStr(configKey.getKey(), null);
             if (val != null && !val.isBlank()) {
@@ -179,7 +186,7 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
         // load the request mirroring sink class and instantiate.
        // mirroringHandler = core.getResourceLoader().newInstance(RequestMirroringHandler.class.getName(), KafkaRequestMirroringHandler.class);
 
-        KafkaCrossDcConf conf = new KafkaCrossDcConf(properties);
+        conf = new KafkaCrossDcConf(properties);
 
 
         KafkaMirroringSink sink = new KafkaMirroringSink(conf);
@@ -213,6 +220,7 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
 
         // Check if mirroring is disabled in request params, defaults to true
         boolean doMirroring = req.getParams().getBool(SERVER_SHOULD_MIRROR, true);
+        final long maxDocSizeBytes = conf.getInt(MAX_REQUEST_SIZE_BYTES);
 
         ModifiableSolrParams mirroredParams = null;
         if (doMirroring) {
@@ -243,7 +251,7 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
             log.trace("Create MirroringUpdateProcessor with mirroredParams={}", mirroredParams);
         }
 
-        return new MirroringUpdateProcessor(next, doMirroring, mirroredParams,
+        return new MirroringUpdateProcessor(next, doMirroring, indexUnmirrorableDocs, maxDocSizeBytes, mirroredParams,
                 DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)), doMirroring ? mirroringHandler : null);
     }
 
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
index 1151b60..ec1e7c0 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
@@ -11,8 +11,10 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.lucene.util.QuickPatchThreadsFilter;
 import org.apache.solr.SolrIgnoredThreadsFilter;
 import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.BaseHttpSolrClient;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -28,16 +30,20 @@ import org.apache.solr.crossdc.consumer.Consumer;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.sys.Prop;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import static org.apache.solr.crossdc.common.KafkaCrossDcConf.DEFAULT_MAX_REQUEST_SIZE;
 import static org.mockito.Mockito.spy;
 
 @ThreadLeakFilters(defaultFilters = true, filters = { SolrIgnoredThreadsFilter.class,
@@ -47,6 +53,8 @@ import static org.mockito.Mockito.spy;
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+  private static final int MAX_DOC_SIZE_BYTES = Integer.valueOf(DEFAULT_MAX_REQUEST_SIZE);
+
   static final String VERSION_FIELD = "_version_";
 
   private static final int NUM_BROKERS = 1;
@@ -60,6 +68,7 @@ import static org.mockito.Mockito.spy;
   private static String TOPIC = "topic1";
 
   private static String COLLECTION = "collection1";
+  private static String ALT_COLLECTION = "collection2";
 
   @BeforeClass
   public static void beforeSolrAndKafkaIntegrationTest() throws Exception {
@@ -108,6 +117,7 @@ import static org.mockito.Mockito.spy;
     properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING, solrCluster2.getZkServer().getZkAddress());
     properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
     properties.put(KafkaCrossDcConf.GROUP_ID, "group1");
+    properties.put(KafkaCrossDcConf.MAX_REQUEST_SIZE_BYTES, MAX_DOC_SIZE_BYTES);
     consumer.start(properties);
 
   }
@@ -141,6 +151,12 @@ import static org.mockito.Mockito.spy;
     solrCluster2.getSolrClient().deleteByQuery("*:*");
     solrCluster1.getSolrClient().commit();
     solrCluster2.getSolrClient().commit();
+
+    // Delete alternate collection in case it was created by any tests.
+    if (CollectionAdminRequest.listCollections(solrCluster1.getSolrClient()).contains(ALT_COLLECTION)) {
+      solrCluster1.getSolrClient().request(CollectionAdminRequest.deleteCollection(ALT_COLLECTION));
+      solrCluster2.getSolrClient().request(CollectionAdminRequest.deleteCollection(ALT_COLLECTION));
+    }
   }
 
   public void testFullCloudToCloud() throws Exception {
@@ -155,24 +171,7 @@ import static org.mockito.Mockito.spy;
 
     System.out.println("Sent producer record");
 
-    QueryResponse results = null;
-    boolean foundUpdates = false;
-    for (int i = 0; i < 100; i++) {
-      solrCluster2.getSolrClient().commit(COLLECTION);
-      solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
-      results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
-      if (results.getResults().getNumFound() == 1) {
-        foundUpdates = true;
-      } else {
-        Thread.sleep(100);
-      }
-    }
-
-    System.out.println("Closed producer");
-
-    assertTrue("results=" + results, foundUpdates);
-    System.out.println("Rest: " + results);
-
+    assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 1);
   }
 
   public void testProducerToCloud() throws Exception {
@@ -202,12 +201,73 @@ import static org.mockito.Mockito.spy;
 
     solrCluster2.getSolrClient().commit(COLLECTION);
 
+    assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 2);
+
+    producer.close();
+  }
+
+  @Test
+  public void testMirroringUpdateProcessor() throws Exception {
+    final SolrInputDocument tooLargeDoc = new SolrInputDocument();
+    tooLargeDoc.addField("id", "tooLarge-" + String.valueOf(System.currentTimeMillis()));
+    tooLargeDoc.addField("text", new String(new byte[2 * MAX_DOC_SIZE_BYTES]));
+    final SolrInputDocument normalDoc = new SolrInputDocument();
+    normalDoc.addField("id", "normalDoc-" + String.valueOf(System.currentTimeMillis()));
+    normalDoc.addField("text", "Hello world");
+    final List<SolrInputDocument> docsToIndex = new ArrayList<>();
+    docsToIndex.add(tooLargeDoc);
+    docsToIndex.add(normalDoc);
+
+    final CloudSolrClient cluster1Client = solrCluster1.getSolrClient();
+    cluster1Client.add(docsToIndex);
+    cluster1Client.commit(COLLECTION);
+
+    // Primary and secondary should each only index 'normalDoc'
+    final String normalDocQuery = "id:" + normalDoc.get("id").getFirstValue();
+    assertCluster2EventuallyHasDocs(COLLECTION, normalDocQuery, 1);
+    assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 1);
+    assertClusterEventuallyHasDocs(cluster1Client, COLLECTION, normalDocQuery, 1);
+    assertClusterEventuallyHasDocs(cluster1Client, COLLECTION, "*:*", 1);
+
+    // Create new primary+secondary collection where 'tooLarge' docs ARE indexed on the primary
+    CollectionAdminRequest.Create create =
+            CollectionAdminRequest.createCollection(ALT_COLLECTION, "conf", 1, 1)
+                    .withProperty("indexUnmirrorableDocs", "true");
+    solrCluster1.getSolrClient().request(create);
+    solrCluster2.getSolrClient().request(create);
+    solrCluster1.waitForActiveCollection(ALT_COLLECTION, 1, 1);
+    solrCluster2.waitForActiveCollection(ALT_COLLECTION, 1, 1);
+
+    cluster1Client.add(ALT_COLLECTION, docsToIndex);
+    cluster1Client.commit(ALT_COLLECTION);
+
+    // Primary should have both 'normal' and 'large' docs; secondary should only have 'normal' doc.
+    assertClusterEventuallyHasDocs(cluster1Client, ALT_COLLECTION, "*:*", 2);
+    assertCluster2EventuallyHasDocs(ALT_COLLECTION, normalDocQuery, 1);
+    assertCluster2EventuallyHasDocs(ALT_COLLECTION, "*:*", 1);
+  }
+
+  private void assertCluster2EventuallyHasDocs(String collection, String query, int expectedNumDocs) throws Exception {
+    assertClusterEventuallyHasDocs(solrCluster2.getSolrClient(), collection, query, expectedNumDocs);
+  }
+
+  private void createCollection(CloudSolrClient client, CollectionAdminRequest.Create createCmd) throws Exception {
+    final String stashedDefault = client.getDefaultCollection();
+    try {
+      //client.setDefaultCollection(null);
+      client.request(createCmd);
+    } finally {
+      //client.setDefaultCollection(stashedDefault);
+    }
+  }
+
+  private void assertClusterEventuallyHasDocs(SolrClient client, String collection, String query, int expectedNumDocs) throws Exception {
     QueryResponse results = null;
     boolean foundUpdates = false;
-    for (int i = 0; i < 50; i++) {
-      solrCluster2.getSolrClient().commit(COLLECTION);
-      results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
-      if (results.getResults().getNumFound() == 2) {
+    for (int i = 0; i < 100; i++) {
+      client.commit(collection);
+      results = client.query(collection, new SolrQuery(query));
+      if (results.getResults().getNumFound() == expectedNumDocs) {
         foundUpdates = true;
       } else {
         Thread.sleep(100);
@@ -215,8 +275,5 @@ import static org.mockito.Mockito.spy;
     }
 
     assertTrue("results=" + results, foundUpdates);
-    System.out.println("Rest: " + results);
-
-    producer.close();
   }
 }
diff --git a/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml b/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml
index 76e75ac..e1791ea 100644
--- a/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml
+++ b/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml
@@ -111,6 +111,7 @@
   <updateRequestProcessorChain  name="mirrorUpdateChain" default="true">
     <processor class="org.apache.solr.update.processor.MirroringUpdateRequestProcessorFactory">
       <bool name="enabled">${enabled:true}</bool>
+      <bool name="indexUnmirrorableDocs">${indexUnmirrorableDocs:false}</bool>
       <str name="bootstrapServers">${bootstrapServers:}</str>
       <str name="topicName">${topicName:}</str>
     </processor>