You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ma...@apache.org on 2023/09/07 20:19:03 UTC

[solr-sandbox] branch main updated: Fix race where update batches on consumer could cause polling loop to exit due to hitting Kafka's aggressive multi-threaded prevention.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/main by this push:
     new 92cabca  Fix race where update batches on consumer could cause polling loop to exit due to hitting Kafka's aggressive multi-threaded prevention.
92cabca is described below

commit 92cabca3d49bee155a392dd23415579d5369392e
Author: markrmiller <ma...@apache.org>
AuthorDate: Thu Sep 7 15:18:29 2023 -0500

    Fix race where update batches on consumer could cause polling loop to exit due to hitting Kafka's aggressive multi-threaded prevention.
---
 .../crossdc/consumer/KafkaCrossDcConsumer.java     |  10 +-
 .../org/apache/solr/crossdc/consumer/Util.java     |  88 ++++++++++++++++
 crossdc-producer/build.gradle                      |   3 +-
 .../solr/crossdc/SolrAndKafkaIntegrationTest.java  |  11 +-
 ...SolrAndKafkaMultiCollectionIntegrationTest.java | 113 +++++++++++++++++++--
 5 files changed, 204 insertions(+), 21 deletions(-)

diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index 30f9a67..1327a42 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -314,14 +314,6 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
         // We don't really know what to do here
         log.error("Mirroring exception occurred while resubmitting to Kafka. We are going to stop the consumer thread now.", e);
         throw new RuntimeException(e);
-      } finally {
-        offsetCheckExecutor.submit(() -> {
-          try {
-            partitionManager.checkForOffsetUpdates(workUnit.partition);
-          } catch (Throwable e) {
-            // already logging in checkForOffsetUpdates
-          }
-        });
       }
 
     });
@@ -386,6 +378,8 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
       log.warn("Interrupted while waiting for executor to shutdown");
     } catch (Exception e) {
       log.warn("Exception closing Solr client on shutdown", e);
+    } finally {
+      Util.logMetrics(metrics);
     }
   }
 
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Util.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Util.java
new file mode 100644
index 0000000..37830eb
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Util.java
@@ -0,0 +1,88 @@
+package org.apache.solr.crossdc.consumer;
+
+import com.codahale.metrics.*;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+public class Util {
+    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+    public static void logMetrics(MetricRegistry metricRegistry) {
+        log.info("Metrics Registry:");
+        for (Map.Entry<String, Gauge> entry : metricRegistry.getGauges().entrySet()) {
+            log.info("Gauge {}: {}", entry.getKey(), entry.getValue().getValue());
+        }
+        for (Map.Entry<String, Counter> entry : metricRegistry.getCounters().entrySet()) {
+            log.info("Counter {}: {}", entry.getKey(), entry.getValue().getCount());
+        }
+        for (Map.Entry<String, Histogram> entry : metricRegistry.getHistograms().entrySet()) {
+            log.info("Histogram {}: {}", entry.getKey(), entry.getValue().getSnapshot().toString());
+        }
+        for (Map.Entry<String, Meter> entry : metricRegistry.getMeters().entrySet()) {
+            log.info("Meter {}: {}", entry.getKey(), entry.getValue().getCount());
+        }
+        for (Map.Entry<String, Timer> entry : metricRegistry.getTimers().entrySet()) {
+            log.info("Timer {}: {}", entry.getKey(), entry.getValue().getSnapshot().toString());
+        }
+    }
+
+    public static void printKafkaInfo(String host, String groupId) {
+        // Initialize the Kafka Admin Client
+        System.out.println("Kafka Info: " + host);
+        Properties adminProps = new Properties();
+        adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, host);
+        try (AdminClient adminClient = AdminClient.create(adminProps)) {
+            // Get list of topics
+            Set<String> topicNames = adminClient.listTopics(new ListTopicsOptions().listInternal(false)).names().get();
+            System.out.println("Live Topics: " + topicNames);
+
+            // Initialize the Kafka Consumer Client to fetch offsets
+            Properties consumerProps = new Properties();
+            consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host);
+            consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+            consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+            consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MirroredSolrRequestSerializer.class.getName());
+
+            try (Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
+                for (String topic : topicNames) {
+                    Set<TopicPartition> topicPartitions = consumer.assignment();
+                    Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+                    System.out.println("Topic Partitions: " + topicPartitions.size());
+                    for (TopicPartition topicPartition : topicPartitions) {
+                        if (topicPartition.topic().equals(topic)) {
+                            long endOffset = consumer.position(topicPartition);
+                            long committedOffset = consumer.committed(topicPartition).offset();
+                            long updatesInQueue = endOffset - committedOffset;
+
+                            offsets.put(topicPartition, new OffsetAndMetadata(endOffset));
+                            System.out.println("Topic: " + topic);
+                            System.out.println("  Partition: " + topicPartition.partition());
+                            System.out.println("  End Offset: " + endOffset);
+                            System.out.println("  Committed Offset: " + committedOffset);
+                            System.out.println("  Updates in Queue: " + updatesInQueue);
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}
diff --git a/crossdc-producer/build.gradle b/crossdc-producer/build.gradle
index f4aec1d..3335a01 100644
--- a/crossdc-producer/build.gradle
+++ b/crossdc-producer/build.gradle
@@ -36,14 +36,13 @@ sourceSets {
 dependencies {
     implementation project(':crossdc-consumer')
     implementation project(path: ':crossdc-commons', configuration: 'shadow')
-    testImplementation project(path: ':crossdc-commons')
 
     provided  group: 'org.apache.solr', name: 'solr-core', version: '8.11.2'
 
     testImplementation 'org.slf4j:slf4j-api:2.0.5'
     testImplementation 'org.hamcrest:hamcrest:2.2'
     testImplementation 'junit:junit:4.13.2'
-    testImplementation('org.mockito:mockito-inline:5.2.0')
+    testImplementation('org.mockito:mockito-inline:5.1.1')
     testImplementation group: 'org.apache.solr', name: 'solr-core', version: '8.11.2'
     testImplementation group: 'org.apache.solr', name: 'solr-test-framework', version: '8.11.2'
 
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
index 1c9cf62..57357e7 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
@@ -283,13 +283,13 @@ import static org.mockito.Mockito.spy;
 
     @Test
   public void testParallelUpdatesToCluster2() throws Exception {
-    ExecutorService executorService = Executors.newFixedThreadPool(50);
+    ExecutorService executorService = Executors.newFixedThreadPool(12);
     List<Future<Boolean>> futures = new ArrayList<>();
 
     CloudSolrClient client1 = solrCluster1.getSolrClient();
 
     // Prepare and send 500 updates in parallel
-    for (int i = 0; i < 500; i++) {
+    for (int i = 0; i < 5000; i++) {
       final int docId = i;
       Future<Boolean> future = executorService.submit(() -> {
         try {
@@ -297,7 +297,6 @@ import static org.mockito.Mockito.spy;
           doc.addField("id", String.valueOf(docId));
           doc.addField("text", "parallel test");
           client1.add(doc);
-          client1.commit(COLLECTION);
           return true;
         } catch (Exception e) {
           log.error("Exception while adding doc", e);
@@ -309,7 +308,7 @@ import static org.mockito.Mockito.spy;
 
     // Wait for all updates to complete
     executorService.shutdown();
-    if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
+    if (!executorService.awaitTermination(600, TimeUnit.SECONDS)) {
       executorService.shutdownNow();
     }
 
@@ -318,8 +317,10 @@ import static org.mockito.Mockito.spy;
       assertTrue(future.get());
     }
 
+    client1.commit(COLLECTION);
+
     // Check if these documents are correctly reflected in the second cluster
-    assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 500);
+    assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 5000);
   }
 
   private void assertCluster2EventuallyHasDocs(String collection, String query, int expectedNumDocs) throws Exception {
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaMultiCollectionIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaMultiCollectionIntegrationTest.java
index bd1aaa5..941fcee 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaMultiCollectionIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaMultiCollectionIntegrationTest.java
@@ -18,6 +18,7 @@ import org.apache.solr.common.cloud.CollectionProperties;
 import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.crossdc.common.KafkaCrossDcConf;
 import org.apache.solr.crossdc.consumer.Consumer;
+import org.apache.solr.crossdc.consumer.Util;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -26,9 +27,18 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.List;
+import java.util.ArrayList;
 
 import static org.apache.solr.crossdc.common.KafkaCrossDcConf.DEFAULT_MAX_REQUEST_SIZE;
 import static org.apache.solr.crossdc.common.KafkaCrossDcConf.INDEX_UNMIRRORABLE_DOCS;
@@ -125,6 +135,8 @@ import static org.apache.solr.crossdc.common.KafkaCrossDcConf.INDEX_UNMIRRORABLE
   public void afterSolrAndKafkaIntegrationTest() throws Exception {
     ObjectReleaseTracker.clear();
 
+    Util.printKafkaInfo(kafkaCluster.bootstrapServers(), "SolrCrossDCConsumer");
+
     if (solrCluster1 != null) {
       solrCluster1.getZkServer().getZkClient().printLayoutToStdOut();
       solrCluster1.shutdown();
@@ -223,18 +235,107 @@ import static org.apache.solr.crossdc.common.KafkaCrossDcConf.INDEX_UNMIRRORABLE
       solrCluster2.getSolrClient().request(delete);
     }
   }
+@Test
+public void testParallelUpdatesToMultiCollections() throws Exception {
+  CollectionAdminRequest.Create create =
+          CollectionAdminRequest.createCollection(ALT_COLLECTION, "conf", 1, 1);
+
+  try {
+    solrCluster1.getSolrClient().request(create);
+    solrCluster1.waitForActiveCollection(ALT_COLLECTION, 1, 1);
+
+    solrCluster2.getSolrClient().request(create);
+    solrCluster2.waitForActiveCollection(ALT_COLLECTION, 1, 1);
+
+    // Update the collection property "enabled" to true
+    CollectionProperties cp = new CollectionProperties(solrCluster1.getZkClient());
+    cp.setCollectionProperty(ALT_COLLECTION, "crossdc.enabled", "true");
+    cp.setCollectionProperty(ALT_COLLECTION, "crossdc.topicName", TOPIC);
+    // Reloading the collection
+    CollectionAdminRequest.Reload reloadRequest = CollectionAdminRequest.reloadCollection(ALT_COLLECTION);
+    reloadRequest.process(solrCluster1.getSolrClient());
+
+    ExecutorService executorService = Executors.newFixedThreadPool(24);
+    List<Future<Boolean>> futures = new ArrayList<>();
+
+    CloudSolrClient client1 = solrCluster1.getSolrClient();
+
+    // Prepare and send N updates to COLLECTION and N updates to ALT_COLLECTION in parallel
+    int updates = 25000;
+    for (int i = 0; i < updates; i++) {
+      final int docIdForCollection = i;
+      final int docIdForAltCollection = i + updates;
+
+      Future<Boolean> futureForCollection = executorService.submit(() -> {
+        try {
+          SolrInputDocument doc = new SolrInputDocument();
+          doc.addField("id", String.valueOf(docIdForCollection));
+          doc.addField("text", "parallel test for COLLECTION");
+          client1.add(COLLECTION, doc);
+          return true;
+        } catch (Exception e) {
+          e.printStackTrace();
+          log.error("Exception while adding doc to COLLECTION", e);
+          return false;
+        }
+      });
+
+      Future<Boolean> futureForAltCollection = executorService.submit(() -> {
+        try {
+          SolrInputDocument doc = new SolrInputDocument();
+          doc.addField("id", String.valueOf(docIdForAltCollection));
+          doc.addField("text", "parallel test for ALT_COLLECTION");
+          client1.add(ALT_COLLECTION, doc);
+          return true;
+        } catch (Exception e) {
+          e.printStackTrace();
+          log.error("Exception while adding doc to ALT_COLLECTION", e);
+          return false;
+        }
+      });
+
+      futures.add(futureForCollection);
+      futures.add(futureForAltCollection);
+    }
+
+    // Wait for all updates to complete
+    executorService.shutdown();
+    if (!executorService.awaitTermination(1600, TimeUnit.SECONDS)) {
+      executorService.shutdownNow();
+    }
+
+    // Check if all updates were successful
+    for (Future<Boolean> future : futures) {
+      assertTrue(future.get());
+    }
+
+    client1.commit(COLLECTION);
+    client1.commit(ALT_COLLECTION);
+
+    // Check if these documents are correctly reflected in the second cluster
+    assertCluster2EventuallyHasDocs(ALT_COLLECTION, "*:*", updates);
+    assertCluster2EventuallyHasDocs(COLLECTION, "*:*", updates);
+
+  } finally {
+    CollectionAdminRequest.Delete delete =
+            CollectionAdminRequest.deleteCollection(ALT_COLLECTION);
+    solrCluster1.getSolrClient().request(delete);
+    solrCluster2.getSolrClient().request(delete);
+  }
+}
 
 
-  private void assertClusterEventuallyHasDocs(SolrClient client, String collection, String query, int expectedNumDocs) throws Exception {
+  private void assertClusterEventuallyHasDocs (SolrClient client, String collection, String query,int expectedNumDocs) throws
+  Exception {
     QueryResponse results = null;
     boolean foundUpdates = false;
-    for (int i = 0; i < 100; i++) {
+    for (int i = 0; i < 1500; i++) {
       client.commit(collection);
       results = client.query(collection, new SolrQuery(query));
       if (results.getResults().getNumFound() == expectedNumDocs) {
         foundUpdates = true;
       } else {
-        Thread.sleep(200);
+        Thread.sleep(500);
       }
     }