You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ma...@apache.org on 2023/09/07 20:19:03 UTC
[solr-sandbox] branch main updated: Fix race where update batches on consumer could cause polling loop to exit due to hitting Kafka's aggressive multi-threaded prevention.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/main by this push:
new 92cabca Fix race where update batches on consumer could cause polling loop to exit due to hitting Kafka's aggressive multi-threaded prevention.
92cabca is described below
commit 92cabca3d49bee155a392dd23415579d5369392e
Author: markrmiller <ma...@apache.org>
AuthorDate: Thu Sep 7 15:18:29 2023 -0500
Fix race where update batches on consumer could cause polling loop to exit due to hitting Kafka's aggressive multi-threaded prevention.
---
.../crossdc/consumer/KafkaCrossDcConsumer.java | 10 +-
.../org/apache/solr/crossdc/consumer/Util.java | 88 ++++++++++++++++
crossdc-producer/build.gradle | 3 +-
.../solr/crossdc/SolrAndKafkaIntegrationTest.java | 11 +-
...SolrAndKafkaMultiCollectionIntegrationTest.java | 113 +++++++++++++++++++--
5 files changed, 204 insertions(+), 21 deletions(-)
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index 30f9a67..1327a42 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -314,14 +314,6 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
// We don't really know what to do here
log.error("Mirroring exception occurred while resubmitting to Kafka. We are going to stop the consumer thread now.", e);
throw new RuntimeException(e);
- } finally {
- offsetCheckExecutor.submit(() -> {
- try {
- partitionManager.checkForOffsetUpdates(workUnit.partition);
- } catch (Throwable e) {
- // already logging in checkForOffsetUpdates
- }
- });
}
});
@@ -386,6 +378,8 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
log.warn("Interrupted while waiting for executor to shutdown");
} catch (Exception e) {
log.warn("Exception closing Solr client on shutdown", e);
+ } finally {
+ Util.logMetrics(metrics);
}
}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Util.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Util.java
new file mode 100644
index 0000000..37830eb
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Util.java
@@ -0,0 +1,88 @@
+package org.apache.solr.crossdc.consumer;
+
+import com.codahale.metrics.*;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+public class Util {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static void logMetrics(MetricRegistry metricRegistry) {
+ log.info("Metrics Registry:");
+ for (Map.Entry<String, Gauge> entry : metricRegistry.getGauges().entrySet()) {
+ log.info("Gauge {}: {}", entry.getKey(), entry.getValue().getValue());
+ }
+ for (Map.Entry<String, Counter> entry : metricRegistry.getCounters().entrySet()) {
+ log.info("Counter {}: {}", entry.getKey(), entry.getValue().getCount());
+ }
+ for (Map.Entry<String, Histogram> entry : metricRegistry.getHistograms().entrySet()) {
+ log.info("Histogram {}: {}", entry.getKey(), entry.getValue().getSnapshot().toString());
+ }
+ for (Map.Entry<String, Meter> entry : metricRegistry.getMeters().entrySet()) {
+ log.info("Meter {}: {}", entry.getKey(), entry.getValue().getCount());
+ }
+ for (Map.Entry<String, Timer> entry : metricRegistry.getTimers().entrySet()) {
+ log.info("Timer {}: {}", entry.getKey(), entry.getValue().getSnapshot().toString());
+ }
+ }
+
+ public static void printKafkaInfo(String host, String groupId) {
+ // Initialize the Kafka Admin Client
+ System.out.println("Kafka Info: " + host);
+ Properties adminProps = new Properties();
+ adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, host);
+ try (AdminClient adminClient = AdminClient.create(adminProps)) {
+ // Get list of topics
+ Set<String> topicNames = adminClient.listTopics(new ListTopicsOptions().listInternal(false)).names().get();
+ System.out.println("Live Topics: " + topicNames);
+
+ // Initialize the Kafka Consumer Client to fetch offsets
+ Properties consumerProps = new Properties();
+ consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host);
+ consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MirroredSolrRequestSerializer.class.getName());
+
+ try (Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
+ for (String topic : topicNames) {
+ Set<TopicPartition> topicPartitions = consumer.assignment();
+ Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+ System.out.println("Topic Partitions: " + topicPartitions.size());
+ for (TopicPartition topicPartition : topicPartitions) {
+ if (topicPartition.topic().equals(topic)) {
+ long endOffset = consumer.position(topicPartition);
+ long committedOffset = consumer.committed(topicPartition).offset();
+ long updatesInQueue = endOffset - committedOffset;
+
+ offsets.put(topicPartition, new OffsetAndMetadata(endOffset));
+ System.out.println("Topic: " + topic);
+ System.out.println(" Partition: " + topicPartition.partition());
+ System.out.println(" End Offset: " + endOffset);
+ System.out.println(" Committed Offset: " + committedOffset);
+ System.out.println(" Updates in Queue: " + updatesInQueue);
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/crossdc-producer/build.gradle b/crossdc-producer/build.gradle
index f4aec1d..3335a01 100644
--- a/crossdc-producer/build.gradle
+++ b/crossdc-producer/build.gradle
@@ -36,14 +36,13 @@ sourceSets {
dependencies {
implementation project(':crossdc-consumer')
implementation project(path: ':crossdc-commons', configuration: 'shadow')
- testImplementation project(path: ':crossdc-commons')
provided group: 'org.apache.solr', name: 'solr-core', version: '8.11.2'
testImplementation 'org.slf4j:slf4j-api:2.0.5'
testImplementation 'org.hamcrest:hamcrest:2.2'
testImplementation 'junit:junit:4.13.2'
- testImplementation('org.mockito:mockito-inline:5.2.0')
+ testImplementation('org.mockito:mockito-inline:5.1.1')
testImplementation group: 'org.apache.solr', name: 'solr-core', version: '8.11.2'
testImplementation group: 'org.apache.solr', name: 'solr-test-framework', version: '8.11.2'
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
index 1c9cf62..57357e7 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
@@ -283,13 +283,13 @@ import static org.mockito.Mockito.spy;
@Test
public void testParallelUpdatesToCluster2() throws Exception {
- ExecutorService executorService = Executors.newFixedThreadPool(50);
+ ExecutorService executorService = Executors.newFixedThreadPool(12);
List<Future<Boolean>> futures = new ArrayList<>();
CloudSolrClient client1 = solrCluster1.getSolrClient();
// Prepare and send 500 updates in parallel
- for (int i = 0; i < 500; i++) {
+ for (int i = 0; i < 5000; i++) {
final int docId = i;
Future<Boolean> future = executorService.submit(() -> {
try {
@@ -297,7 +297,6 @@ import static org.mockito.Mockito.spy;
doc.addField("id", String.valueOf(docId));
doc.addField("text", "parallel test");
client1.add(doc);
- client1.commit(COLLECTION);
return true;
} catch (Exception e) {
log.error("Exception while adding doc", e);
@@ -309,7 +308,7 @@ import static org.mockito.Mockito.spy;
// Wait for all updates to complete
executorService.shutdown();
- if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
+ if (!executorService.awaitTermination(600, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
@@ -318,8 +317,10 @@ import static org.mockito.Mockito.spy;
assertTrue(future.get());
}
+ client1.commit(COLLECTION);
+
// Check if these documents are correctly reflected in the second cluster
- assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 500);
+ assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 5000);
}
private void assertCluster2EventuallyHasDocs(String collection, String query, int expectedNumDocs) throws Exception {
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaMultiCollectionIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaMultiCollectionIntegrationTest.java
index bd1aaa5..941fcee 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaMultiCollectionIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaMultiCollectionIntegrationTest.java
@@ -18,6 +18,7 @@ import org.apache.solr.common.cloud.CollectionProperties;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.consumer.Consumer;
+import org.apache.solr.crossdc.consumer.Util;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -26,9 +27,18 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.List;
+import java.util.ArrayList;
import static org.apache.solr.crossdc.common.KafkaCrossDcConf.DEFAULT_MAX_REQUEST_SIZE;
import static org.apache.solr.crossdc.common.KafkaCrossDcConf.INDEX_UNMIRRORABLE_DOCS;
@@ -125,6 +135,8 @@ import static org.apache.solr.crossdc.common.KafkaCrossDcConf.INDEX_UNMIRRORABLE
public void afterSolrAndKafkaIntegrationTest() throws Exception {
ObjectReleaseTracker.clear();
+ Util.printKafkaInfo(kafkaCluster.bootstrapServers(), "SolrCrossDCConsumer");
+
if (solrCluster1 != null) {
solrCluster1.getZkServer().getZkClient().printLayoutToStdOut();
solrCluster1.shutdown();
@@ -223,18 +235,107 @@ import static org.apache.solr.crossdc.common.KafkaCrossDcConf.INDEX_UNMIRRORABLE
solrCluster2.getSolrClient().request(delete);
}
}
+@Test
+public void testParallelUpdatesToMultiCollections() throws Exception {
+ CollectionAdminRequest.Create create =
+ CollectionAdminRequest.createCollection(ALT_COLLECTION, "conf", 1, 1);
+
+ try {
+ solrCluster1.getSolrClient().request(create);
+ solrCluster1.waitForActiveCollection(ALT_COLLECTION, 1, 1);
+
+ solrCluster2.getSolrClient().request(create);
+ solrCluster2.waitForActiveCollection(ALT_COLLECTION, 1, 1);
+
+ // Update the collection property "enabled" to true
+ CollectionProperties cp = new CollectionProperties(solrCluster1.getZkClient());
+ cp.setCollectionProperty(ALT_COLLECTION, "crossdc.enabled", "true");
+ cp.setCollectionProperty(ALT_COLLECTION, "crossdc.topicName", TOPIC);
+ // Reloading the collection
+ CollectionAdminRequest.Reload reloadRequest = CollectionAdminRequest.reloadCollection(ALT_COLLECTION);
+ reloadRequest.process(solrCluster1.getSolrClient());
+
+ ExecutorService executorService = Executors.newFixedThreadPool(24);
+ List<Future<Boolean>> futures = new ArrayList<>();
+
+ CloudSolrClient client1 = solrCluster1.getSolrClient();
+
+ // Prepare and send N updates to COLLECTION and N updates to ALT_COLLECTION in parallel
+ int updates = 25000;
+ for (int i = 0; i < updates; i++) {
+ final int docIdForCollection = i;
+ final int docIdForAltCollection = i + updates;
+
+ Future<Boolean> futureForCollection = executorService.submit(() -> {
+ try {
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", String.valueOf(docIdForCollection));
+ doc.addField("text", "parallel test for COLLECTION");
+ client1.add(COLLECTION, doc);
+ return true;
+ } catch (Exception e) {
+ e.printStackTrace();
+ log.error("Exception while adding doc to COLLECTION", e);
+ return false;
+ }
+ });
+
+ Future<Boolean> futureForAltCollection = executorService.submit(() -> {
+ try {
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", String.valueOf(docIdForAltCollection));
+ doc.addField("text", "parallel test for ALT_COLLECTION");
+ client1.add(ALT_COLLECTION, doc);
+ return true;
+ } catch (Exception e) {
+ e.printStackTrace();
+ log.error("Exception while adding doc to ALT_COLLECTION", e);
+ return false;
+ }
+ });
+
+ futures.add(futureForCollection);
+ futures.add(futureForAltCollection);
+ }
+
+ // Wait for all updates to complete
+ executorService.shutdown();
+ if (!executorService.awaitTermination(1600, TimeUnit.SECONDS)) {
+ executorService.shutdownNow();
+ }
+
+ // Check if all updates were successful
+ for (Future<Boolean> future : futures) {
+ assertTrue(future.get());
+ }
+
+ client1.commit(COLLECTION);
+ client1.commit(ALT_COLLECTION);
+
+ // Check if these documents are correctly reflected in the second cluster
+ assertCluster2EventuallyHasDocs(ALT_COLLECTION, "*:*", updates);
+ assertCluster2EventuallyHasDocs(COLLECTION, "*:*", updates);
+
+ } finally {
+ CollectionAdminRequest.Delete delete =
+ CollectionAdminRequest.deleteCollection(ALT_COLLECTION);
+ solrCluster1.getSolrClient().request(delete);
+ solrCluster2.getSolrClient().request(delete);
+ }
+}
- private void assertClusterEventuallyHasDocs(SolrClient client, String collection, String query, int expectedNumDocs) throws Exception {
+ private void assertClusterEventuallyHasDocs (SolrClient client, String collection, String query,int expectedNumDocs) throws
+ Exception {
QueryResponse results = null;
boolean foundUpdates = false;
- for (int i = 0; i < 100; i++) {
+ for (int i = 0; i < 1500; i++) {
client.commit(collection);
results = client.query(collection, new SolrQuery(query));
if (results.getResults().getNumFound() == expectedNumDocs) {
foundUpdates = true;
} else {
- Thread.sleep(200);
+ Thread.sleep(500);
}
}