You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ma...@apache.org on 2023/08/31 16:56:23 UTC
[solr-sandbox] branch main updated: Use a second executor for offset updates.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/main by this push:
new 55aa6fa Use a second executor for offset updates.
55aa6fa is described below
commit 55aa6fa8c90e0c550c2485dba787245b6c94adba
Author: markrmiller <ma...@apache.org>
AuthorDate: Thu Aug 31 11:24:43 2023 -0500
Use a second executor for offset updates.
---
.../crossdc/consumer/KafkaCrossDcConsumer.java | 14 ++-
.../solr/crossdc/SolrAndKafkaIntegrationTest.java | 45 +++++++++
.../SolrKafkaTestsIgnoredThreadsFilter.java | 103 +++++++++++----------
3 files changed, 110 insertions(+), 52 deletions(-)
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index 54596a0..30f9a67 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -48,9 +48,13 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
private final CloudSolrClient solrClient;
- private final ThreadPoolExecutor executor;
-
+ private final ThreadPoolExecutor executor;
+ private final ExecutorService offsetCheckExecutor = Executors.newCachedThreadPool(r -> {
+ Thread t = new Thread(r);
+ t.setName("offset-check-thread");
+ return t;
+ });
private PartitionManager partitionManager;
private BlockingQueue<Runnable> queue = new BlockingQueue<>(10);
@@ -311,7 +315,7 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
log.error("Mirroring exception occurred while resubmitting to Kafka. We are going to stop the consumer thread now.", e);
throw new RuntimeException(e);
} finally {
- executor.submit(() -> {
+ offsetCheckExecutor.submit(() -> {
try {
partitionManager.checkForOffsetUpdates(workUnit.partition);
} catch (Throwable e) {
@@ -372,6 +376,10 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
}
+ if (!offsetCheckExecutor.isShutdown()) {
+ offsetCheckExecutor.shutdown();
+ offsetCheckExecutor.awaitTermination(30, TimeUnit.SECONDS);
+ }
solrClient.close();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
index 5c19268..1c9cf62 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
@@ -40,6 +40,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import static org.apache.solr.crossdc.common.KafkaCrossDcConf.DEFAULT_MAX_REQUEST_SIZE;
import static org.apache.solr.crossdc.common.KafkaCrossDcConf.INDEX_UNMIRRORABLE_DOCS;
@@ -277,6 +281,47 @@ import static org.mockito.Mockito.spy;
}
}
+ @Test
+ public void testParallelUpdatesToCluster2() throws Exception {
+ ExecutorService executorService = Executors.newFixedThreadPool(50);
+ List<Future<Boolean>> futures = new ArrayList<>();
+
+ CloudSolrClient client1 = solrCluster1.getSolrClient();
+
+ // Prepare and send 500 updates in parallel
+ for (int i = 0; i < 500; i++) {
+ final int docId = i;
+ Future<Boolean> future = executorService.submit(() -> {
+ try {
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", String.valueOf(docId));
+ doc.addField("text", "parallel test");
+ client1.add(doc);
+ client1.commit(COLLECTION);
+ return true;
+ } catch (Exception e) {
+ log.error("Exception while adding doc", e);
+ return false;
+ }
+ });
+ futures.add(future);
+ }
+
+ // Wait for all updates to complete
+ executorService.shutdown();
+ if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
+ executorService.shutdownNow();
+ }
+
+ // Check if all updates were successful
+ for (Future<Boolean> future : futures) {
+ assertTrue(future.get());
+ }
+
+ // Check if these documents are correctly reflected in the second cluster
+ assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 500);
+ }
+
private void assertCluster2EventuallyHasDocs(String collection, String query, int expectedNumDocs) throws Exception {
assertClusterEventuallyHasDocs(solrCluster2.getSolrClient(), collection, query, expectedNumDocs);
}
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java
index 747b06f..9e8b2d7 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java
@@ -1,49 +1,54 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.crossdc;
-
-import org.apache.lucene.search.TimeLimitingCollector.TimerThread;
-
-import com.carrotsearch.randomizedtesting.ThreadFilter;
-
-
-/**
- * This ignores those threads in Solr for which there is no way to
- * clean up after a suite.
- */
-public class SolrKafkaTestsIgnoredThreadsFilter implements ThreadFilter {
- @Override
- public boolean reject(Thread t) {
-
- String threadName = t.getName();
-
- if (threadName.startsWith("metrics-meter-tick-thread")) {
- return true;
- }
-
- if (threadName.startsWith("pool-")) {
- return true;
- }
-
- if (threadName.startsWith("kafka-")) { // TODO
- return true;
- }
-
-
- return false;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc;
+
+import org.apache.lucene.search.TimeLimitingCollector.TimerThread;
+
+import com.carrotsearch.randomizedtesting.ThreadFilter;
+
+
+/**
+ * This ignores those threads in Solr for which there is no way to
+ * clean up after a suite.
+ */
+public class SolrKafkaTestsIgnoredThreadsFilter implements ThreadFilter {
+ @Override
+ public boolean reject(Thread t) {
+
+ String threadName = t.getName();
+
+ if (threadName.startsWith("metrics-meter-tick-thread")) {
+ return true;
+ }
+
+ if (threadName.startsWith("pool-")) {
+ return true;
+ }
+
+ if (threadName.startsWith("kafka-")) { // TODO
+ return true;
+ }
+
+ if (threadName.startsWith("KafkaCrossDcConsumerWorker")) {
+ return true;
+ }
+
+
+
+ return false;
+ }
+}