You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ma...@apache.org on 2023/05/10 19:04:54 UTC

[solr-sandbox] branch main updated: Some refactoring, a bunch of simple mock tests, test fixes & improvements, dep updates and fixes, rework offset management.. (#56)

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/main by this push:
     new 5238ca6  Some refactoring, a bunch of simple mock tests, test fixes & improvements, dep updates and fixes, rework offset management.. (#56)
5238ca6 is described below

commit 5238ca6a0d15dd048608089be5eda66d8d9bc4a0
Author: Mark Robert Miller <ma...@apache.org>
AuthorDate: Wed May 10 14:04:49 2023 -0500

    Some refactoring, a bunch of simple mock tests, test fixes & improvements, dep updates and fixes, rework offset management.. (#56)
    
    * Some refactoring, a bunch of simple mock tests, test fixes & improvements, dep updates and fixes, rework offset managment..
    
    * Version bump.
---
 build.gradle                                       |   2 +-
 crossdc-commons/build.gradle                       |   5 +-
 crossdc-commons/gradle.properties                  |   2 +-
 crossdc-consumer/build.gradle                      |  32 +-
 crossdc-consumer/gradle.properties                 |   2 +-
 crossdc-consumer/machinet.conf                     |   3 +
 .../solr/crossdc/consumer/BlockingQueue.java       |  25 ++
 .../crossdc/consumer/KafkaCrossDcConsumer.java     | 180 +++------
 .../solr/crossdc/consumer/PartitionManager.java    | 129 ++++++
 .../messageprocessor/SolrMessageProcessor.java     |   5 +-
 .../solr/crossdc/NoOpResubmitBackoffPolicy.java    |  11 +
 .../solr/crossdc/SimpleSolrIntegrationTest.java    |   2 +
 .../apache/solr/crossdc/TestMessageProcessor.java  |   7 -
 .../crossdc/consumer/KafkaCrossDcConsumerTest.java | 445 +++++++++++++++------
 .../crossdc/consumer/PartitionManagerTest.java     | 168 ++++++++
 .../messageprocessor/SolrMessageProcessorTest.java | 109 +++++
 crossdc-producer/build.gradle                      |  10 +-
 crossdc-producer/gradle.properties                 |   2 +-
 .../apache/solr/crossdc/DeleteByQueryToIdTest.java |  12 +-
 .../solr/crossdc/RetryQueueIntegrationTest.java    |   7 +
 .../solr/crossdc/SolrAndKafkaIntegrationTest.java  |   8 +-
 ...SolrAndKafkaMultiCollectionIntegrationTest.java |   1 +
 .../solr/crossdc/SolrAndKafkaReindexTest.java      |   5 +
 .../solr/crossdc/ZkConfigIntegrationTest.java      |   5 +
 .../processor/MirroringUpdateProcessorTest.java    | 358 ++++++++++-------
 gradle/wrapper/gradle-wrapper.jar                  | Bin 59203 -> 61574 bytes
 gradle/wrapper/gradle-wrapper.properties           |   3 +-
 gradlew                                            | 294 ++++++++------
 gradlew.bat                                        |  49 +--
 29 files changed, 1292 insertions(+), 589 deletions(-)

diff --git a/build.gradle b/build.gradle
index 25af428..9a38422 100644
--- a/build.gradle
+++ b/build.gradle
@@ -28,5 +28,5 @@ description 'Root for Solr plugins sandbox'
 
 subprojects {
     group "org.apache.solr.crossdc"
-    //group "org.apache.solr.encryption"
+    group "org.apache.solr.encryption"
 }
diff --git a/crossdc-commons/build.gradle b/crossdc-commons/build.gradle
index a687058..57cb915 100644
--- a/crossdc-commons/build.gradle
+++ b/crossdc-commons/build.gradle
@@ -34,7 +34,10 @@ sourceSets {
 }
 
 dependencies {
-    provided 'org.apache.solr:solr-solrj:8.11.2'
+    provided 'org.apache.solr:solr-solrj:8.11.2', {
+        exclude group: "org.apache.logging.log4j", module: "*"
+        exclude group: "org.slf4j", module: "*"
+    }
     implementation 'org.apache.kafka:kafka-clients:2.8.1'
     implementation 'com.google.guava:guava:14.0'
 }
diff --git a/crossdc-commons/gradle.properties b/crossdc-commons/gradle.properties
index 66975c6..21aee2d 100644
--- a/crossdc-commons/gradle.properties
+++ b/crossdc-commons/gradle.properties
@@ -1,2 +1,2 @@
 group=org.apache.solr
-version=0.9-SNAPSHOT
\ No newline at end of file
+version=1.0-SNAPSHOT
\ No newline at end of file
diff --git a/crossdc-consumer/build.gradle b/crossdc-consumer/build.gradle
index 466bbfa..cca42fc 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-consumer/build.gradle
@@ -32,24 +32,34 @@ dependencies {
     implementation group: 'org.apache.solr', name: 'solr-solrj', version: '8.11.2'
     implementation project(path: ':crossdc-commons', configuration: 'shadow')
 
-    implementation 'io.dropwizard.metrics:metrics-core:4.2.9'
-    implementation 'org.slf4j:slf4j-api:1.7.36'
+    implementation 'io.dropwizard.metrics:metrics-core:4.2.17'
+    implementation 'org.slf4j:slf4j-api:2.0.5'
     implementation 'org.eclipse.jetty:jetty-http:9.4.41.v20210516'
     implementation 'org.eclipse.jetty:jetty-server:9.4.41.v20210516'
+    implementation group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.20.0' // log4j impl can use StackLocatorUtil which is in the api jar
     implementation 'org.eclipse.jetty:jetty-servlet:9.4.41.v20210516'
-    implementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.17.2'
-    runtimeOnly ('com.google.protobuf:protobuf-java-util:3.19.2')
-    runtimeOnly ('commons-codec:commons-codec:1.13')
+    implementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.20.0'
+    testImplementation project(path: ':crossdc-commons')
+    testImplementation project(path: ':crossdc-commons')
+    runtimeOnly ('com.google.protobuf:protobuf-java-util:3.22.2')
+    runtimeOnly ('commons-codec:commons-codec:1.15')
     testImplementation 'org.hamcrest:hamcrest:2.2'
     testImplementation 'junit:junit:4.13.2'
-    testImplementation('org.mockito:mockito-core:4.3.1', {
-        exclude group: "net.bytebuddy", module: "byte-buddy-agent"
-    })
+    testImplementation('org.mockito:mockito-inline:5.2.0')
 
     testImplementation  project(':crossdc-producer')
 
-    testImplementation group: 'org.apache.solr', name: 'solr-core', version: '8.11.2'
-    testImplementation group: 'org.apache.solr', name: 'solr-test-framework', version: '8.11.2'
+    testImplementation group: 'org.apache.solr', name: 'solr-core', version: '8.11.2', {
+        exclude group: "org.apache.logging.log4j", module: "*"
+        exclude group: "org.slf4j", module: "*"
+        exclude group: "org.eclipse.jetty", module: "jetty-http"
+        exclude group: "org.eclipse.jetty", module: "jetty-server"
+        exclude group: "org.eclipse.jetty", module: "jetty-servlet"
+    }
+    testImplementation group: 'org.apache.solr', name: 'solr-test-framework', version: '8.11.2', {
+        exclude group: "org.apache.logging.log4j", module: "*"
+        exclude group: "org.slf4j", module: "*"
+    }
     testImplementation 'org.apache.kafka:kafka_2.13:2.8.1'
     testImplementation 'org.apache.kafka:kafka-streams:2.8.1'
     testImplementation 'org.apache.kafka:kafka_2.13:2.8.1:test'
@@ -58,6 +68,8 @@ dependencies {
 
 test {
     jvmArgs '-Djava.security.egd=file:/dev/./urandom'
+    minHeapSize = "128m"
+    maxHeapSize = "512m"
 }
 
 tasks.withType(Tar){
diff --git a/crossdc-consumer/gradle.properties b/crossdc-consumer/gradle.properties
index 66975c6..21aee2d 100644
--- a/crossdc-consumer/gradle.properties
+++ b/crossdc-consumer/gradle.properties
@@ -1,2 +1,2 @@
 group=org.apache.solr
-version=0.9-SNAPSHOT
\ No newline at end of file
+version=1.0-SNAPSHOT
\ No newline at end of file
diff --git a/crossdc-consumer/machinet.conf b/crossdc-consumer/machinet.conf
new file mode 100644
index 0000000..8831cd1
--- /dev/null
+++ b/crossdc-consumer/machinet.conf
@@ -0,0 +1,3 @@
+### Please DO NOT modify the contents of this file. For internal purpose only
+root=7badab89-0174-350c-a9c1-20ef5b1bf5a5
+rootId=ba95d78a-7c94-3571-9853-08775a97a3a0
\ No newline at end of file
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/BlockingQueue.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/BlockingQueue.java
new file mode 100644
index 0000000..bc414c8
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/BlockingQueue.java
@@ -0,0 +1,25 @@
+package org.apache.solr.crossdc.consumer;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+public class BlockingQueue<E> extends ArrayBlockingQueue<E> {
+
+    public BlockingQueue(int capacity) {
+        super(capacity);
+    }
+
+
+    @Override
+    public boolean offer(E r) {
+        //return super.offer(r);
+        try {
+            super.put(r);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            return false;
+        }
+        return true;
+    }
+
+
+}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index 9ac506a..e8195ac 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -38,7 +38,7 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
 
   private final MetricRegistry metrics = SharedMetricRegistries.getOrCreate("metrics");
 
-  private final KafkaConsumer<String,MirroredSolrRequest> consumer;
+  private final KafkaConsumer<String,MirroredSolrRequest> kafkaConsumer;
   private final CountDownLatch startLatch;
   KafkaMirroringSink kafkaMirroringSink;
 
@@ -48,22 +48,14 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
 
   private final CloudSolrClient solrClient;
 
-  private final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(10) {
-    @Override public boolean offer(Runnable r) {
-      //return super.offer(r);
-      try {
-        super.put(r);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        return false;
-      }
-      return true;
-    }
+   private final ThreadPoolExecutor executor;
+
+
+  private PartitionManager partitionManager;
+
+  private BlockingQueue<Runnable> queue = new BlockingQueue<>(10);
 
-  };
-  private final ThreadPoolExecutor executor;
 
-  private final ConcurrentHashMap<TopicPartition,PartitionWork> partitionWorkMap = new ConcurrentHashMap<>();
 
   /**
    * @param conf       The Kafka consumer configuration
@@ -95,22 +87,26 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
 
     kafkaConsumerProps.putAll(conf.getAdditionalProperties());
     int threads = conf.getInt(KafkaCrossDcConf.CONSUMER_PROCESSING_THREADS);
+
     executor = new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS, queue, new ThreadFactory() {
-      @Override public Thread newThread(Runnable r) {
-        Thread t = new Thread(r);
-        t.setName("KafkaCrossDcConsumerWorker");
-        return t;
-      }
+            @Override
+            public Thread newThread(Runnable r) {
+                Thread t = new Thread(r);
+                t.setName("KafkaCrossDcConsumerWorker");
+                return t;
+            }
     });
     executor.prestartAllCoreThreads();
 
     solrClient = createSolrClient(conf);
 
-    messageProcessor = new SolrMessageProcessor(solrClient, resubmitRequest -> 0L);
+    messageProcessor = createSolrMessageProcessor();
 
-    log.info("Creating Kafka consumer with configuration {}", kafkaConsumerProps);
-    consumer = createConsumer(kafkaConsumerProps);
 
+
+    log.info("Creating Kafka consumer with configuration {}", kafkaConsumerProps);
+    kafkaConsumer = createKafkaConsumer(kafkaConsumerProps);
+    partitionManager = new PartitionManager(kafkaConsumer);
     // Create producer for resubmitting failed requests
     log.info("Creating Kafka resubmit producer");
     this.kafkaMirroringSink = createKafkaMirroringSink(conf);
@@ -118,7 +114,11 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
 
   }
 
-  public KafkaConsumer<String,MirroredSolrRequest> createConsumer(Properties properties) {
+  protected SolrMessageProcessor createSolrMessageProcessor() {
+    return new SolrMessageProcessor(solrClient, resubmitRequest -> 0L);
+  }
+
+  public KafkaConsumer<String,MirroredSolrRequest> createKafkaConsumer(Properties properties) {
     return new KafkaConsumer<>(properties, new StringDeserializer(), new MirroredSolrRequestSerializer());
   }
 
@@ -133,7 +133,7 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
 
     try {
 
-      consumer.subscribe(Arrays.asList((topicNames)));
+      kafkaConsumer.subscribe(Arrays.asList((topicNames)));
 
       log.info("Consumer started");
       startLatch.countDown();
@@ -144,7 +144,7 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
 
       log.info("Closed kafka consumer. Exiting now.");
       try {
-        consumer.close();
+        kafkaConsumer.close();
       } catch (Exception e) {
         log.warn("Failed to close kafka consumer", e);
       }
@@ -167,11 +167,14 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
   boolean pollAndProcessRequests() {
     log.trace("Entered pollAndProcessRequests loop");
     try {
-      for (TopicPartition partition : partitionWorkMap.keySet()) {
-        checkForOffsetUpdates(partition);
+      try {
+        partitionManager.checkOffsetUpdates();
+      } catch (Throwable e) {
+        log.error("Error while checking offset updates, shutting down", e);
+        return false;
       }
 
-      ConsumerRecords<String,MirroredSolrRequest> records = consumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_TIMEOUT_MS));
+      ConsumerRecords<String,MirroredSolrRequest> records = kafkaConsumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_TIMEOUT_MS));
 
       if (log.isTraceEnabled()) {
         log.trace("poll return {} records", records.count());
@@ -184,14 +187,9 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
       for (TopicPartition partition : records.partitions()) {
         List<ConsumerRecord<String,MirroredSolrRequest>> partitionRecords = records.records(partition);
 
-        PartitionWork partitionWork = partitionWorkMap.compute(partition, (k, v) -> {
-          if (v == null) {
-            return new PartitionWork();
-          }
-          return v;
-        });
-        WorkUnit workUnit = new WorkUnit();
-        workUnit.nextOffset = getOffsetForPartition(partitionRecords);
+        PartitionManager.PartitionWork partitionWork = partitionManager.getPartitionWork(partition);
+        PartitionManager.WorkUnit workUnit = new PartitionManager.WorkUnit(partition);
+        workUnit.nextOffset = PartitionManager.getOffsetForPartition(partitionRecords);
         partitionWork.partitionQueue.add(workUnit);
         try {
           ModifiableSolrParams lastParams = null;
@@ -218,8 +216,8 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
               lastParamsAsNamedList = null;
               sendBatch(solrReqBatch, lastRecord, workUnit);
               solrReqBatch = new UpdateRequest();
-              workUnit = new WorkUnit();
-              workUnit.nextOffset = getOffsetForPartition(partitionRecords);
+              workUnit = new PartitionManager.WorkUnit(partition);
+              workUnit.nextOffset = PartitionManager.getOffsetForPartition(partitionRecords);
               partitionWork.partitionQueue.add(workUnit);
             }
 
@@ -247,8 +245,12 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
           }
 
           sendBatch(solrReqBatch, lastRecord, workUnit);
-
-          checkForOffsetUpdates(partition);
+          try {
+            partitionManager.checkForOffsetUpdates(partition);
+          } catch (Throwable e) {
+            log.error("Error while checking offset updates, shutting down", e);
+            return false;
+          }
 
           // handleItem sets the thread interrupt, let's exit if there has been an interrupt set
           if (Thread.currentThread().isInterrupted()) {
@@ -263,15 +265,18 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
 
           if (e instanceof ClassCastException || e instanceof SerializationException) { // TODO: optional
             log.error("Non retryable error", e);
-            break;
+            return false;
           }
           log.error("Exception occurred in Kafka consumer thread, stopping the Consumer.", e);
-          break;
+          return false;
         }
       }
 
-      for (TopicPartition partition : partitionWorkMap.keySet()) {
-        checkForOffsetUpdates(partition);
+      try {
+        partitionManager.checkOffsetUpdates();
+      } catch (Throwable e) {
+        log.error("Error while checking offset updates, shutting down", e);
+        return false;
       }
 
     } catch (WakeupException e) {
@@ -289,53 +294,32 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
     return true;
   }
 
-  public void sendBatch(UpdateRequest solrReqBatch, ConsumerRecord<String,MirroredSolrRequest> lastRecord, WorkUnit workUnit) {
+  public void sendBatch(UpdateRequest solrReqBatch, ConsumerRecord<String,MirroredSolrRequest> lastRecord, PartitionManager.WorkUnit workUnit) {
     UpdateRequest finalSolrReqBatch = solrReqBatch;
     Future<?> future = executor.submit(() -> {
-      IQueueHandler.Result<MirroredSolrRequest> result = messageProcessor.handleItem(new MirroredSolrRequest(finalSolrReqBatch));
       try {
+        IQueueHandler.Result<MirroredSolrRequest> result = messageProcessor.handleItem(new MirroredSolrRequest(finalSolrReqBatch));
+
         processResult(lastRecord, result);
       } catch (MirroringException e) {
         // We don't really know what to do here
         log.error("Mirroring exception occurred while resubmitting to Kafka. We are going to stop the consumer thread now.", e);
         throw new RuntimeException(e);
+      } finally {
+        executor.submit(() -> {
+          try {
+            partitionManager.checkForOffsetUpdates(workUnit.partition);
+          } catch (Throwable e) {
+            // already logging in checkForOffsetUpdates
+          }
+        });
       }
+
     });
     workUnit.workItems.add(future);
   }
 
-  private void checkForOffsetUpdates(TopicPartition partition) {
-    PartitionWork work;
-    while ((work = partitionWorkMap.get(partition)) != null) {
-      WorkUnit workUnit = work.partitionQueue.peek();
-      if (workUnit != null) {
-        for (Future<?> future : workUnit.workItems) {
-          if (!future.isDone()) {
-            if (log.isTraceEnabled()) {
-              log.trace("Future for update is not done topic={}", partition.topic());
-            }
-            return;
-          }
-          if (log.isTraceEnabled()) {
-            log.trace("Future for update is done topic={}", partition.topic());
-          }
-          try {
-            future.get();
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-          } catch (ExecutionException e) {
-            log.error("Exception resubmitting updates to Kafka, stopping the Consumer thread", e);
-            work.partitionQueue.poll();
-            throw new RuntimeException("Exception resubmitting updates to Kafka, stopping the Consumer thread", e);
-          }
-          work.partitionQueue.poll();
-        }
-
-        updateOffset(partition, workUnit.nextOffset);
 
-      }
-    }
-  }
 
   void processResult(ConsumerRecord<String,MirroredSolrRequest> record, IQueueHandler.Result<MirroredSolrRequest> result) throws MirroringException {
     switch (result.status()) {
@@ -370,44 +354,13 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
     }
   }
 
-  /**
-   * Reset the local offset so that the consumer reads the records from Kafka again.
-   *
-   * @param partition        The TopicPartition to reset the offset for
-   * @param partitionRecords PartitionRecords for the specified partition
-   */
-  private void resetOffsetForPartition(TopicPartition partition, List<ConsumerRecord<String,MirroredSolrRequest>> partitionRecords) {
-    if (log.isTraceEnabled()) {
-      log.trace("Resetting offset to: {}", partitionRecords.get(0).offset());
-    }
-    long resetOffset = partitionRecords.get(0).offset();
-    consumer.seek(partition, resetOffset);
-  }
-
-  /**
-   * Logs and updates the commit point for the partition that has been processed.
-   *
-   * @param partition  The TopicPartition to update the offset for
-   * @param nextOffset The next offset to commit for this partition.
-   */
-  private void updateOffset(TopicPartition partition, long nextOffset) {
-    if (log.isTraceEnabled()) {
-      log.trace("Updated offset for topic={} partition={} to offset={}", partition.topic(), partition.partition(), nextOffset);
-    }
-
-    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(nextOffset)));
-  }
 
-  private static long getOffsetForPartition(List<ConsumerRecord<String,MirroredSolrRequest>> partitionRecords) {
-    long nextOffset = partitionRecords.get(partitionRecords.size() - 1).offset() + 1;
-    return nextOffset;
-  }
 
   /**
    * Shutdown the Kafka consumer by calling wakeup.
    */
   public final void shutdown() {
-    consumer.wakeup();
+    kafkaConsumer.wakeup();
     log.info("Shutdown called on KafkaCrossDcConsumer");
     try {
       if (!executor.isShutdown()) {
@@ -431,12 +384,5 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
     return new KafkaMirroringSink(conf);
   }
 
-  private static class PartitionWork {
-    private final Queue<WorkUnit> partitionQueue = new LinkedList<>();
-  }
 
-  static class WorkUnit {
-    Set<Future<?>> workItems = new HashSet<>();
-    long nextOffset;
-  }
 }
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/PartitionManager.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/PartitionManager.java
new file mode 100644
index 0000000..62ae86e
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/PartitionManager.java
@@ -0,0 +1,129 @@
+package org.apache.solr.crossdc.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.*;
+import java.util.concurrent.*;
+
+
+public class PartitionManager {
+    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+    final ConcurrentHashMap<TopicPartition, PartitionWork> partitionWorkMap = new ConcurrentHashMap<>();
+    private final KafkaConsumer<String, MirroredSolrRequest> consumer;
+
+
+    static class PartitionWork {
+        final Queue<WorkUnit> partitionQueue = new LinkedList<>();
+    }
+
+    static class WorkUnit {
+        final TopicPartition partition;
+        Set<Future<?>> workItems = new HashSet<>();
+        long nextOffset;
+
+        public WorkUnit(TopicPartition partition) {
+            this.partition = partition;
+        }
+    }
+
+
+    PartitionManager(KafkaConsumer<String, MirroredSolrRequest> consumer) {
+        this.consumer = consumer;
+    }
+
+    public PartitionWork getPartitionWork(TopicPartition partition) {
+        return partitionWorkMap.compute(partition, (k, v) -> {
+            if (v == null) {
+                return new PartitionWork();
+            }
+            return v;
+        });
+    }
+
+
+    public void checkOffsetUpdates() throws Throwable {
+        for (TopicPartition partition : partitionWorkMap.keySet()) {
+            checkForOffsetUpdates(partition);
+        }
+    }
+
+    void checkForOffsetUpdates(TopicPartition partition) throws Throwable {
+        synchronized (partition) {
+            PartitionWork work;
+            if ((work = partitionWorkMap.get(partition)) != null) {
+                WorkUnit workUnit = work.partitionQueue.peek();
+                if (workUnit != null) {
+                    boolean allFuturesDone = true;
+                    for (Future<?> future : workUnit.workItems) {
+                        if (!future.isDone()) {
+                            if (log.isTraceEnabled()) {
+                                log.trace("Future for update is not done topic={}", partition.topic());
+                            }
+                            allFuturesDone = false;
+                            break;
+                        }
+
+                        try {
+                            future.get();
+                        } catch (InterruptedException e) {
+                            log.error("Error updating offset for partition: {}", partition, e);
+                            throw e;
+                        } catch (ExecutionException e) {
+                            log.error("Error updating offset for partition: {}", partition, e);
+                            throw e.getCause();
+                        }
+
+                        if (log.isTraceEnabled()) {
+                            log.trace("Future for update is done topic={}", partition.topic());
+                        }
+                    }
+
+                    if (allFuturesDone) {
+                        work.partitionQueue.poll();
+                        updateOffset(partition, workUnit.nextOffset);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Reset the local offset so that the consumer reads the records from Kafka again.
+     *
+     * @param partition        The TopicPartition to reset the offset for
+     * @param partitionRecords PartitionRecords for the specified partition
+     */
+    private void resetOffsetForPartition(TopicPartition
+                                                 partition, List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords) {
+        if (log.isTraceEnabled()) {
+            log.trace("Resetting offset to: {}", partitionRecords.get(0).offset());
+        }
+        long resetOffset = partitionRecords.get(0).offset();
+        consumer.seek(partition, resetOffset);
+    }
+
+    /**
+     * Logs and updates the commit point for the partition that has been processed.
+     *
+     * @param partition  The TopicPartition to update the offset for
+     * @param nextOffset The next offset to commit for this partition.
+     */
+    private void updateOffset(TopicPartition partition, long nextOffset) {
+        if (log.isTraceEnabled()) {
+            log.trace("Updated offset for topic={} partition={} to offset={}", partition.topic(), partition.partition(), nextOffset);
+        }
+
+        consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(nextOffset)));
+    }
+
+    static long getOffsetForPartition(List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords) {
+        return partitionRecords.get(partitionRecords.size() - 1).offset() + 1;
+    }
+}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
index b1a428e..dcea1d0 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
@@ -198,7 +198,8 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
     private void logRequest(SolrRequest request) {
         if(request instanceof UpdateRequest) {
             final StringBuilder rmsg = new StringBuilder(64);
-            rmsg.append("Submitting update request");
+            String collection = request.getCollection();
+            rmsg.append("Submitting update request for collection=").append(collection != null ? collection : request.getParams().get("collection"));
             if(((UpdateRequest) request).getDeleteById() != null) {
                 final int numDeleteByIds = ((UpdateRequest) request).getDeleteById().size();
                 metrics.counter("numDeleteByIds").inc(numDeleteByIds);
@@ -282,7 +283,7 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
      *
      * @param mirroredSolrRequest MirroredSolrRequest object that is being processed.
      */
-    private void preventCircularMirroring(MirroredSolrRequest mirroredSolrRequest) {
+    void preventCircularMirroring(MirroredSolrRequest mirroredSolrRequest) {
         if (mirroredSolrRequest.getSolrRequest() instanceof UpdateRequest) {
             UpdateRequest updateRequest = (UpdateRequest) mirroredSolrRequest.getSolrRequest();
             ModifiableSolrParams params = updateRequest.getParams();
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/NoOpResubmitBackoffPolicy.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/NoOpResubmitBackoffPolicy.java
new file mode 100644
index 0000000..d2bd804
--- /dev/null
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/NoOpResubmitBackoffPolicy.java
@@ -0,0 +1,11 @@
+package org.apache.solr.crossdc;
+
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
+
+public class NoOpResubmitBackoffPolicy implements ResubmitBackoffPolicy {
+
+    public long getBackoffTimeMs(MirroredSolrRequest resubmitRequest) {
+        return 0;
+    }
+}
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
index ebd102d..c3c7191 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
@@ -49,6 +49,8 @@ public class SimpleSolrIntegrationTest extends SolrCloudTestCase {
     if (cluster1 != null) {
       cluster1.shutdown();
     }
+    cluster1 = null;
+    processor = null;
   }
 
   public void testDocumentSanitization() {
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java
index 9dc7073..5058c54 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java
@@ -43,13 +43,6 @@ import static org.mockito.Mockito.*;
 public class TestMessageProcessor {
     static final String VERSION_FIELD = "_version_";
 
-    static class NoOpResubmitBackoffPolicy implements ResubmitBackoffPolicy {
-        @Override
-        public long getBackoffTimeMs(MirroredSolrRequest resubmitRequest) {
-            return 0;
-        }
-    }
-
     @Mock
     private CloudSolrClient solrClient;
     private SolrMessageProcessor processor;
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
index 9ff2a8e..4d503c0 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
@@ -4,9 +4,11 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.crossdc.common.IQueueHandler;
 import org.apache.solr.crossdc.common.KafkaCrossDcConf;
 import org.apache.solr.crossdc.common.KafkaMirroringSink;
@@ -16,136 +18,331 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyList;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
 public class KafkaCrossDcConsumerTest {
 
-  private KafkaCrossDcConsumer kafkaCrossDcConsumer;
-  private KafkaConsumer<String,MirroredSolrRequest> kafkaConsumerMock;
-  private CloudSolrClient solrClientMock;
-  private KafkaMirroringSink kafkaMirroringSinkMock;
-
-  private SolrMessageProcessor messageProcessorMock;
-
-  @Before public void setUp() {
-    kafkaConsumerMock = mock(KafkaConsumer.class);
-    solrClientMock = mock(CloudSolrClient.class);
-    kafkaMirroringSinkMock = mock(KafkaMirroringSink.class);
-    messageProcessorMock = mock(SolrMessageProcessor.class);
-    KafkaCrossDcConf conf = testCrossDCConf();
-    // Set necessary configurations
-
-    kafkaCrossDcConsumer = new KafkaCrossDcConsumer(conf, new CountDownLatch(0)) {
-      @Override public KafkaConsumer<String,MirroredSolrRequest> createConsumer(Properties properties) {
-        return kafkaConsumerMock;
-      }
-
-      @Override protected CloudSolrClient createSolrClient(KafkaCrossDcConf conf) {
-        return solrClientMock;
-      }
-
-      @Override protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) {
-        return kafkaMirroringSinkMock;
-      }
-    };
-  }
-
-  private static KafkaCrossDcConf testCrossDCConf() {
-    Map config = new HashMap<>();
-    config.put(KafkaCrossDcConf.TOPIC_NAME, "topic1");
-    config.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "localhost:9092");
-    KafkaCrossDcConf conf = new KafkaCrossDcConf(config);
-    return conf;
-  }
-
-  @After public void tearDown() {
-    kafkaCrossDcConsumer.shutdown();
-  }
-
-  @Test public void testRunAndShutdown() throws Exception {
-    // Define the expected behavior of the mocks and set up the test scenario
-    when(kafkaConsumerMock.poll(any())).thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
-
-    ExecutorService consumerThreadExecutor = Executors.newSingleThreadExecutor();
-
-    // Run the test
-    consumerThreadExecutor.submit(kafkaCrossDcConsumer);
-
-    // Run the shutdown method
-    kafkaCrossDcConsumer.shutdown();
-
-    // Verify that the consumer was subscribed with the correct topic names
-    verify(kafkaConsumerMock).subscribe(anyList());
-
-    // Verify that the appropriate methods were called on the mocks
-    verify(kafkaConsumerMock).wakeup();
-    verify(solrClientMock).close();
-
-    consumerThreadExecutor.shutdown();
-    consumerThreadExecutor.awaitTermination(10, TimeUnit.SECONDS);
-  }
-
-  @Test public void testHandleFailedResubmit() throws Exception {
-    // Set up the KafkaCrossDcConsumer
-    KafkaCrossDcConf testConf = testCrossDCConf();
-    KafkaCrossDcConsumer consumer = spy(new KafkaCrossDcConsumer(testConf, new CountDownLatch(0)));
-    doNothing().when(consumer).sendBatch(any(UpdateRequest.class), any(ConsumerRecord.class), any(KafkaCrossDcConsumer.WorkUnit.class));
-
-    // Set up the SolrMessageProcessor mock
-    SolrMessageProcessor mockMessageProcessor = mock(SolrMessageProcessor.class);
-    IQueueHandler.Result<MirroredSolrRequest> failedResubmitResult = new IQueueHandler.Result<>(IQueueHandler.ResultStatus.FAILED_RESUBMIT, null);
-    when(mockMessageProcessor.handleItem(any(MirroredSolrRequest.class))).thenReturn(failedResubmitResult);
-
-    // Mock the KafkaMirroringSink
-    KafkaMirroringSink mockKafkaMirroringSink = mock(KafkaMirroringSink.class);
-    doNothing().when(mockKafkaMirroringSink).submit(any(MirroredSolrRequest.class));
-    consumer.kafkaMirroringSink = mockKafkaMirroringSink;
-
-    // Call the method to test
-    ConsumerRecord<String,MirroredSolrRequest> record = createSampleConsumerRecord();
-    consumer.processResult(record, failedResubmitResult);
-
-    // Verify that the KafkaMirroringSink.submit() method was called
-    verify(consumer.kafkaMirroringSink, times(1)).submit(record.value());
-  }
-
-  private ConsumerRecord<String,MirroredSolrRequest> createSampleConsumerRecord() {
-    return new ConsumerRecord<>("sample-topic", 0, 0, "key", createSampleMirroredSolrRequest());
-  }
-
-  private ConsumerRecords<String,MirroredSolrRequest> createSampleConsumerRecords() {
-    TopicPartition topicPartition = new TopicPartition("sample-topic", 0);
-    List<ConsumerRecord<String,MirroredSolrRequest>> recordsList = new ArrayList<>();
-    recordsList.add(new ConsumerRecord<>("sample-topic", 0, 0, "key", createSampleMirroredSolrRequest()));
-    return new ConsumerRecords<>(Collections.singletonMap(topicPartition, recordsList));
-  }
-
-  private MirroredSolrRequest createSampleMirroredSolrRequest() {
-    // Create a sample MirroredSolrRequest for testing
-    SolrInputDocument solrInputDocument = new SolrInputDocument();
-    solrInputDocument.addField("id", "1");
-    solrInputDocument.addField("title", "Sample title");
-    solrInputDocument.addField("content", "Sample content");
-    UpdateRequest updateRequest = new UpdateRequest();
-    updateRequest.add(solrInputDocument);
-    return new MirroredSolrRequest(updateRequest);
-  }
+    private KafkaCrossDcConsumer kafkaCrossDcConsumer;
+    private KafkaConsumer<String, MirroredSolrRequest> kafkaConsumerMock;
+    private CloudSolrClient solrClientMock;
+    private KafkaMirroringSink kafkaMirroringSinkMock;
+
+    private SolrMessageProcessor messageProcessorMock;
+
+    private KafkaCrossDcConf conf;
+
+
+    @Before
+    public void setUp() {
+        kafkaConsumerMock = mock(KafkaConsumer.class);
+        solrClientMock = mock(CloudSolrClient.class);
+        kafkaMirroringSinkMock = mock(KafkaMirroringSink.class);
+        messageProcessorMock = mock(SolrMessageProcessor.class);
+        conf = testCrossDCConf();
+        // Set necessary configurations
+
+        kafkaCrossDcConsumer =
+                new KafkaCrossDcConsumer(conf, new CountDownLatch(0)) {
+                    @Override
+                    public KafkaConsumer<String, MirroredSolrRequest> createKafkaConsumer(
+                            Properties properties) {
+                        return kafkaConsumerMock;
+                    }
+
+                    @Override
+                    public SolrMessageProcessor createSolrMessageProcessor() {
+                        return messageProcessorMock;
+                    }
+
+                    @Override
+                    protected CloudSolrClient createSolrClient(KafkaCrossDcConf conf) {
+                        return solrClientMock;
+                    }
+
+                    @Override
+                    protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) {
+                        return kafkaMirroringSinkMock;
+                    }
+                };
+    }
+
+    private static KafkaCrossDcConf testCrossDCConf() {
+        Map config = new HashMap<>();
+        config.put(KafkaCrossDcConf.TOPIC_NAME, "topic1");
+        config.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "localhost:9092");
+        return new KafkaCrossDcConf(config);
+    }
+
+    @After
+    public void tearDown() {
+        kafkaCrossDcConsumer.shutdown();
+    }
+
+    private ConsumerRecord<String, MirroredSolrRequest> createSampleConsumerRecord() {
+        return new ConsumerRecord<>("sample-topic", 0, 0, "key", createSampleMirroredSolrRequest());
+    }
+
+    private ConsumerRecords<String, MirroredSolrRequest> createSampleConsumerRecords() {
+        TopicPartition topicPartition = new TopicPartition("sample-topic", 0);
+        List<ConsumerRecord<String, MirroredSolrRequest>> recordsList = new ArrayList<>();
+        recordsList.add(
+                new ConsumerRecord<>(
+                        "sample-topic", 0, 0, "key", createSampleMirroredSolrRequest()));
+        return new ConsumerRecords<>(Collections.singletonMap(topicPartition, recordsList));
+    }
+
+    private MirroredSolrRequest createSampleMirroredSolrRequest() {
+        // Create a sample MirroredSolrRequest for testing
+        SolrInputDocument solrInputDocument = new SolrInputDocument();
+        solrInputDocument.addField("id", "1");
+        solrInputDocument.addField("title", "Sample title");
+        solrInputDocument.addField("content", "Sample content");
+        UpdateRequest updateRequest = new UpdateRequest();
+        updateRequest.add(solrInputDocument);
+        return new MirroredSolrRequest(updateRequest);
+    }
+
+    /**
+     * Should create a KafkaCrossDcConsumer with the given configuration and startLatch
+     */
+    @Test
+    public void kafkaCrossDcConsumerCreationWithConfigurationAndStartLatch() {
+        KafkaCrossDcConf conf = testCrossDCConf();
+        CountDownLatch startLatch = new CountDownLatch(1);
+
+        KafkaCrossDcConsumer kafkaCrossDcConsumer = new KafkaCrossDcConsumer(conf, startLatch);
+
+        assertNotNull(kafkaCrossDcConsumer);
+        assertEquals(1, startLatch.getCount());
+    }
+
+    @Test
+    public void testRunAndShutdown() throws Exception {
+        // Define the expected behavior of the mocks and set up the test scenario
+
+        // Use a CountDownLatch to wait for the KafkaConsumer.subscribe method to be called
+        CountDownLatch subscribeLatch = new CountDownLatch(1);
+        doAnswer(invocation -> {
+            subscribeLatch.countDown();
+            return null;
+        }).when(kafkaConsumerMock).subscribe(anyList());
+
+        when(kafkaConsumerMock.poll(any())).thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
+
+        ExecutorService consumerThreadExecutor = Executors.newSingleThreadExecutor();
+
+
+        // Run the test
+        consumerThreadExecutor.submit(kafkaCrossDcConsumer);
+
+        // Wait for the KafkaConsumer.subscribe method to be called
+        assertTrue(subscribeLatch.await(10, TimeUnit.SECONDS));
+
+        // Run the shutdown method
+        kafkaCrossDcConsumer.shutdown();
+
+        // Verify that the consumer was subscribed with the correct topic names
+        verify(kafkaConsumerMock).subscribe(anyList());
+
+        // Verify that the appropriate methods were called on the mocks
+        verify(kafkaConsumerMock).wakeup();
+        verify(solrClientMock).close();
+
+        consumerThreadExecutor.shutdown();
+        consumerThreadExecutor.awaitTermination(10, TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void testHandleFailedResubmit() throws Exception {
+        // Set up the KafkaCrossDcConsumer
+        KafkaCrossDcConf testConf = testCrossDCConf();
+        KafkaCrossDcConsumer consumer = spy(new KafkaCrossDcConsumer(testConf, new CountDownLatch(0)));
+        doNothing().when(consumer).sendBatch(any(UpdateRequest.class), any(ConsumerRecord.class), any(PartitionManager.WorkUnit.class));
+
+        // Set up the SolrMessageProcessor mock
+        SolrMessageProcessor mockMessageProcessor = mock(SolrMessageProcessor.class);
+        IQueueHandler.Result<MirroredSolrRequest> failedResubmitResult = new IQueueHandler.Result<>(IQueueHandler.ResultStatus.FAILED_RESUBMIT, null);
+        when(mockMessageProcessor.handleItem(any(MirroredSolrRequest.class))).thenReturn(failedResubmitResult);
+
+        // Mock the KafkaMirroringSink
+        KafkaMirroringSink mockKafkaMirroringSink = mock(KafkaMirroringSink.class);
+        doNothing().when(mockKafkaMirroringSink).submit(any(MirroredSolrRequest.class));
+        consumer.kafkaMirroringSink = mockKafkaMirroringSink;
+
+        // Call the method to test
+        ConsumerRecord<String, MirroredSolrRequest> record = createSampleConsumerRecord();
+        consumer.processResult(record, failedResubmitResult);
+
+        // Verify that the KafkaMirroringSink.submit() method was called
+        verify(consumer.kafkaMirroringSink, times(1)).submit(record.value());
+    }
+
+
+    @Test
+    public void testCreateKafkaCrossDcConsumer() {
+        KafkaCrossDcConsumer consumer = new KafkaCrossDcConsumer(conf, new CountDownLatch(1));
+        assertNotNull(consumer);
+    }
+
+    @Test
+    public void testHandleValidMirroredSolrRequest() {
+        KafkaConsumer<String, MirroredSolrRequest> mockConsumer = mock(KafkaConsumer.class);
+        KafkaCrossDcConsumer spyConsumer = spy(new KafkaCrossDcConsumer(conf, new CountDownLatch(1)) {
+            @Override
+            public KafkaConsumer<String, MirroredSolrRequest> createKafkaConsumer(Properties properties) {
+                return mockConsumer;
+            }
+
+            @Override
+            public SolrMessageProcessor createSolrMessageProcessor() {
+                return messageProcessorMock;
+            }
+
+            @Override
+            protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) {
+                return kafkaMirroringSinkMock;
+            }
+        });
+
+        SolrInputDocument doc = new SolrInputDocument();
+        doc.addField("id", "1");
+        UpdateRequest validRequest = new UpdateRequest();
+        validRequest.add(doc);
+        validRequest.setParams(new ModifiableSolrParams().add("commit", "true"));
+        // Create a valid MirroredSolrRequest
+        ConsumerRecord<String, MirroredSolrRequest> record = new ConsumerRecord<>("test-topic", 0, 0, "key", new MirroredSolrRequest(validRequest));
+        ConsumerRecords<String, MirroredSolrRequest> records = new ConsumerRecords<>(Collections.singletonMap(new TopicPartition("test-topic", 0), List.of(record)));
+
+        when(mockConsumer.poll(any())).thenReturn(records).thenThrow(new WakeupException());
+
+        spyConsumer.run();
+
+        // Verify that the valid MirroredSolrRequest was processed.
+        verify(spyConsumer, times(1)).sendBatch(argThat(updateRequest -> {
+            // Check if the UpdateRequest has the same content as the original validRequest
+            return updateRequest.getDocuments().equals(validRequest.getDocuments()) &&
+                    updateRequest.getParams().equals(validRequest.getParams());
+        }), eq(record), any());
+    }
+
+    @Test
+    public void testHandleInvalidMirroredSolrRequest() {
+        KafkaConsumer<String, MirroredSolrRequest> mockConsumer = mock(KafkaConsumer.class);
+        SolrMessageProcessor mockSolrMessageProcessor = mock(SolrMessageProcessor.class);
+        KafkaCrossDcConsumer spyConsumer = spy(new KafkaCrossDcConsumer(conf, new CountDownLatch(1)) {
+            @Override
+            public KafkaConsumer<String, MirroredSolrRequest> createKafkaConsumer(Properties properties) {
+                return mockConsumer;
+            }
+
+            @Override
+            public SolrMessageProcessor createSolrMessageProcessor() {
+                return mockSolrMessageProcessor;
+            }
+
+            @Override
+            protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) {
+                return kafkaMirroringSinkMock;
+            }
+        });
+        doReturn(mockConsumer).when(spyConsumer).createKafkaConsumer(any());
+
+        UpdateRequest invalidRequest = new UpdateRequest();
+        // no updates on request
+        invalidRequest.setParams(new ModifiableSolrParams().add("invalid_param", "invalid_value"));
+
+        ConsumerRecord<String, MirroredSolrRequest> record = new ConsumerRecord<>("test-topic", 0, 0, "key", new MirroredSolrRequest(invalidRequest));
+        ConsumerRecords<String, MirroredSolrRequest> records = new ConsumerRecords<>(Collections.singletonMap(new TopicPartition("test-topic", 0), List.of(record)));
+
+        when(mockConsumer.poll(any())).thenReturn(records).thenThrow(new WakeupException());
+
+        spyConsumer.run();
+
+        // Verify that the valid MirroredSolrRequest was processed.
+        verify(spyConsumer, times(1)).sendBatch(argThat(updateRequest -> {
+            // Check if the UpdateRequest has the same content as the original invalidRequest
+            return updateRequest.getDocuments() == null &&
+                    updateRequest.getParams().equals(invalidRequest.getParams());
+        }), eq(record), any());
+    }
+
+    @Test
+    public void testHandleWakeupException() {
+        KafkaConsumer<String, MirroredSolrRequest> mockConsumer = mock(KafkaConsumer.class);
+        KafkaCrossDcConsumer spyConsumer = spy(new KafkaCrossDcConsumer(conf, new CountDownLatch(1)) {
+            @Override
+            public KafkaConsumer<String, MirroredSolrRequest> createKafkaConsumer(Properties properties) {
+                return mockConsumer;
+            }
+
+            @Override
+            public SolrMessageProcessor createSolrMessageProcessor() {
+                return messageProcessorMock;
+            }
+
+            @Override
+            protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) {
+                return kafkaMirroringSinkMock;
+            }
+        });
+
+        when(mockConsumer.poll(any())).thenThrow(new WakeupException());
+
+        // Run the consumer in a separate thread to avoid blocking the test
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        executorService.submit(spyConsumer);
+
+        // Wait for a short period to allow the consumer to start and then trigger the shutdown
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+        spyConsumer.shutdown();
+
+        // Verify that the WakeupException was caught and handled
+        verify(mockConsumer, atLeastOnce()).poll(any());
+        verify(mockConsumer, times(1)).wakeup();
+
+        // Shutdown the executor service
+        executorService.shutdown();
+        try {
+            executorService.awaitTermination(10, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    @Test
+    public void testShutdown() {
+        KafkaConsumer<String, MirroredSolrRequest> mockConsumer = mock(KafkaConsumer.class);
+        KafkaCrossDcConsumer spyConsumer = spy(new KafkaCrossDcConsumer(conf, new CountDownLatch(1)) {
+            @Override
+            public KafkaConsumer<String, MirroredSolrRequest> createKafkaConsumer(Properties properties) {
+                return mockConsumer;
+            }
+
+            @Override
+            public SolrMessageProcessor createSolrMessageProcessor() {
+                return messageProcessorMock;
+            }
+
+            @Override
+            protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) {
+                return kafkaMirroringSinkMock;
+            }
+        });
+
+
+        spyConsumer.shutdown();
+
+        verify(mockConsumer, times(1)).wakeup();
+    }
 }
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/PartitionManagerTest.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/PartitionManagerTest.java
new file mode 100644
index 0000000..13472d3
--- /dev/null
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/PartitionManagerTest.java
@@ -0,0 +1,168 @@
+package org.apache.solr.crossdc.consumer;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+public class PartitionManagerTest {
+
+    /**
+     * Should return the existing PartitionWork when the partition is already in the
+     * partitionWorkMap
+     */
+    @Test
+    public void getPartitionWorkWhenPartitionInMap() {
+        KafkaConsumer<String, MirroredSolrRequest> consumer = mock(KafkaConsumer.class);
+        PartitionManager partitionManager = new PartitionManager(consumer);
+        TopicPartition partition = new TopicPartition("test-topic", 0);
+        PartitionManager.PartitionWork partitionWork = new PartitionManager.PartitionWork();
+        partitionManager.partitionWorkMap.put(partition, partitionWork);
+
+        PartitionManager.PartitionWork result = partitionManager.getPartitionWork(partition);
+
+        assertNotNull(result);
+        assertEquals(partitionWork, result);
+    }
+
+    /**
+     * Should create a new PartitionWork when the partition is not in the partitionWorkMap
+     */
+    @Test
+    public void getPartitionWorkWhenPartitionNotInMap() {
+        KafkaConsumer<String, MirroredSolrRequest> consumer = mock(KafkaConsumer.class);
+        PartitionManager partitionManager = new PartitionManager(consumer);
+        TopicPartition partition = new TopicPartition("test-topic", 0);
+
+        PartitionManager.PartitionWork partitionWork = partitionManager.getPartitionWork(partition);
+
+        assertNotNull(partitionWork);
+        assertTrue(partitionManager.partitionWorkMap.containsKey(partition));
+        assertEquals(partitionWork, partitionManager.partitionWorkMap.get(partition));
+    }
+
+    /**
+     * Should not update the offset when the future for update is not done
+     */
+    @Test
+    public void checkForOffsetUpdatesWhenFutureNotDone() throws Throwable {
+        KafkaConsumer<String, MirroredSolrRequest> consumer = mock(KafkaConsumer.class);
+        PartitionManager partitionManager = new PartitionManager(consumer);
+        TopicPartition partition = new TopicPartition("test-topic", 0);
+        PartitionManager.PartitionWork partitionWork = partitionManager.getPartitionWork(partition);
+        PartitionManager.WorkUnit workUnit = new PartitionManager.WorkUnit(partition);
+        Future<?> future = mock(Future.class);
+        when(future.isDone()).thenReturn(false);
+        workUnit.workItems.add(future);
+        partitionWork.partitionQueue.add(workUnit);
+
+        partitionManager.checkForOffsetUpdates(partition);
+
+        assertEquals(1, partitionWork.partitionQueue.size());
+        assertTrue(partitionWork.partitionQueue.contains(workUnit));
+    }
+
+    /**
+     * Should update the offset when the future for update is done
+     */
+    @Test
+    public void checkForOffsetUpdatesWhenFutureDone() throws Throwable {
+        KafkaConsumer<String, MirroredSolrRequest> consumer = mock(KafkaConsumer.class);
+        PartitionManager partitionManager = new PartitionManager(consumer);
+        TopicPartition partition = new TopicPartition("test-topic", 0);
+
+        PartitionManager.PartitionWork partitionWork = partitionManager.getPartitionWork(partition);
+        PartitionManager.WorkUnit workUnit = new PartitionManager.WorkUnit(partition);
+        partitionWork.partitionQueue.add(workUnit);
+
+        // Use a real Future instead of a mocked one
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        Future<?> future = executor.submit(() -> {
+            // Simulate the task being completed
+        });
+
+        workUnit.workItems.add(future);
+
+        // Wait for the Future to completeE
+        future.get(10, TimeUnit.SECONDS);
+
+        partitionManager.checkForOffsetUpdates(partition);
+
+        // Verify that the consumer.commitSync() method was called with the correct parameters
+        verify(consumer, times(1))
+                .commitSync(
+                        Collections.singletonMap(
+                                partition, new OffsetAndMetadata(workUnit.nextOffset)));
+
+        // Verify that the partitionQueue is empty after processing
+        assertTrue(partitionWork.partitionQueue.isEmpty());
+
+        // Shutdown the executor
+        executor.shutdown();
+    }
+
+    /**
+     * Should check for offset updates for all partitions in the partitionWorkMap
+     */
+    @Test
+    public void checkOffsetUpdatesForAllPartitions() throws Throwable { // Create a mock KafkaConsumer
+        KafkaConsumer<String, MirroredSolrRequest> mockConsumer = mock(KafkaConsumer.class);
+
+        // Create a PartitionManager instance with the mock KafkaConsumer
+        PartitionManager partitionManager = new PartitionManager(mockConsumer);
+
+        // Create a few TopicPartitions
+        TopicPartition partition1 = new TopicPartition("topic1", 0);
+        TopicPartition partition2 = new TopicPartition("topic2", 0);
+
+        // Add some PartitionWork to the partitionWorkMap
+        PartitionManager.PartitionWork work1 = partitionManager.getPartitionWork(partition1);
+        PartitionManager.PartitionWork work2 = partitionManager.getPartitionWork(partition2);
+
+        // Create WorkUnits and add them to the PartitionWork
+        PartitionManager.WorkUnit workUnit1 = new PartitionManager.WorkUnit(partition1);
+        PartitionManager.WorkUnit workUnit2 = new PartitionManager.WorkUnit(partition2);
+
+        work1.partitionQueue.add(workUnit1);
+        work2.partitionQueue.add(workUnit2);
+
+        // Create mock Futures and add them to the WorkUnits
+        Future<?> mockFuture1 = mock(Future.class);
+        Future<?> mockFuture2 = mock(Future.class);
+
+        workUnit1.workItems.add(mockFuture1);
+        workUnit2.workItems.add(mockFuture2);
+
+        // Set the mock Futures to be done
+        when(mockFuture1.isDone()).thenReturn(true);
+        when(mockFuture2.isDone()).thenReturn(true);
+
+        // Call the checkOffsetUpdates method
+        partitionManager.checkOffsetUpdates();
+
+        // Verify that the futures were checked for completion
+        verify(mockFuture1, times(1)).isDone();
+        verify(mockFuture2, times(1)).isDone();
+
+
+        // Verify that the updateOffset method was called for each partition
+        verify(mockConsumer, times(1))
+                .commitSync(
+                        Collections.singletonMap(
+                                partition1, new OffsetAndMetadata(workUnit1.nextOffset)));
+        verify(mockConsumer, times(1))
+                .commitSync(
+                        Collections.singletonMap(
+                                partition2, new OffsetAndMetadata(workUnit2.nextOffset)));
+    }
+}
\ No newline at end of file
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessorTest.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessorTest.java
new file mode 100644
index 0000000..99cd3d4
--- /dev/null
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessorTest.java
@@ -0,0 +1,109 @@
+package org.apache.solr.crossdc.messageprocessor;
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.response.SolrResponseBase;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.crossdc.common.IQueueHandler;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+public class SolrMessageProcessorTest {
+    private SolrMessageProcessor solrMessageProcessor;
+    private CloudSolrClient client;
+    private ResubmitBackoffPolicy resubmitBackoffPolicy;
+
+    @Before
+    public void setUp() {
+        client = mock(CloudSolrClient.class);
+        resubmitBackoffPolicy = mock(ResubmitBackoffPolicy.class);
+        solrMessageProcessor = new SolrMessageProcessor(client, resubmitBackoffPolicy);
+    }
+
+    /**
+     * Should handle MirroredSolrRequest and return a failed result with no retry
+     */
+    @Test
+    public void handleItemWithFailedResultNoRetry() throws SolrServerException, IOException {
+        MirroredSolrRequest mirroredSolrRequest = mock(MirroredSolrRequest.class);
+        SolrRequest solrRequest = mock(SolrRequest.class);
+        when(mirroredSolrRequest.getSolrRequest()).thenReturn(solrRequest);
+
+        SolrResponseBase solrResponseBase = mock(SolrResponseBase.class);
+        when(solrRequest.process(client)).thenReturn(solrResponseBase);
+        when(solrResponseBase.getResponse()).thenReturn(new NamedList<>());
+        when(solrResponseBase.getStatus()).thenReturn(ErrorCode.BAD_REQUEST.code);
+        when(client.request(any(SolrRequest.class))).thenReturn(new NamedList<>());
+
+        IQueueHandler.Result<MirroredSolrRequest> result = solrMessageProcessor.handleItem(mirroredSolrRequest);
+
+        assertEquals(IQueueHandler.ResultStatus.FAILED_RESUBMIT, result.status());
+    }
+
+    /**
+     * Should handle MirroredSolrRequest and return a failed result with resubmit
+     */
+    @Test
+    public void handleItemWithFailedResultResubmit() throws SolrServerException, IOException {
+        MirroredSolrRequest mirroredSolrRequest = mock(MirroredSolrRequest.class);
+        SolrRequest solrRequest = mock(SolrRequest.class);
+        when(mirroredSolrRequest.getSolrRequest()).thenReturn(solrRequest);
+        when(solrRequest.process(client))
+                .thenThrow(new SolrException(ErrorCode.SERVER_ERROR, "Server error"));
+
+        IQueueHandler.Result<MirroredSolrRequest> result = solrMessageProcessor.handleItem(mirroredSolrRequest);
+
+        assertEquals(IQueueHandler.ResultStatus.FAILED_RESUBMIT, result.status());
+        assertEquals(mirroredSolrRequest, result.newItem());
+    }
+
+    /**
+     * Should handle MirroredSolrRequest and return a successful result
+     */
+    @Test
+    public void handleItemWithSuccessfulResult() throws SolrServerException, IOException {
+        MirroredSolrRequest mirroredSolrRequest = mock(MirroredSolrRequest.class);
+        SolrRequest solrRequest = mock(SolrRequest.class);
+        SolrResponseBase solrResponse = mock(SolrResponseBase.class);
+
+        when(mirroredSolrRequest.getSolrRequest()).thenReturn(solrRequest);
+        when(solrRequest.process(client)).thenReturn(solrResponse);
+        when(solrResponse.getStatus()).thenReturn(0);
+
+        IQueueHandler.Result<MirroredSolrRequest> result = solrMessageProcessor.handleItem(mirroredSolrRequest);
+
+        assertEquals(IQueueHandler.ResultStatus.HANDLED, result.status());
+        assertNull(result.newItem());
+    }
+
+    /**
+     * Should connect to Solr if not connected and process the request
+     */
+    @Test
+    public void connectToSolrIfNeededAndProcessRequest() throws SolrServerException, IOException {
+        MirroredSolrRequest mirroredSolrRequest = mock(MirroredSolrRequest.class);
+        SolrRequest solrRequest = mock(SolrRequest.class);
+        SolrResponseBase solrResponse = mock(SolrResponseBase.class);
+
+        when(mirroredSolrRequest.getSolrRequest()).thenReturn(solrRequest);
+        when(solrRequest.process(client)).thenReturn(solrResponse);
+        when(solrResponse.getStatus()).thenReturn(0);
+
+        IQueueHandler.Result<MirroredSolrRequest> result = solrMessageProcessor.handleItem(mirroredSolrRequest);
+
+        assertEquals(IQueueHandler.ResultStatus.HANDLED, result.status());
+        verify(client, times(1)).connect();
+        verify(solrRequest, times(1)).process(client);
+    }
+}
\ No newline at end of file
diff --git a/crossdc-producer/build.gradle b/crossdc-producer/build.gradle
index 9a6e5cd..f4aec1d 100644
--- a/crossdc-producer/build.gradle
+++ b/crossdc-producer/build.gradle
@@ -40,10 +40,10 @@ dependencies {
 
     provided  group: 'org.apache.solr', name: 'solr-core', version: '8.11.2'
 
-    testImplementation 'org.slf4j:slf4j-api'
+    testImplementation 'org.slf4j:slf4j-api:2.0.5'
     testImplementation 'org.hamcrest:hamcrest:2.2'
     testImplementation 'junit:junit:4.13.2'
-    testImplementation('org.mockito:mockito-inline:4.3.1')
+    testImplementation('org.mockito:mockito-inline:5.2.0')
     testImplementation group: 'org.apache.solr', name: 'solr-core', version: '8.11.2'
     testImplementation group: 'org.apache.solr', name: 'solr-test-framework', version: '8.11.2'
 
@@ -65,9 +65,11 @@ shadowJar {
 jar.dependsOn(shadowJar)
 
 artifacts {
-    shadowJar;
+    shadowJar
 }
 
 test {
     jvmArgs '-Djava.security.egd=file:/dev/./urandom'
-}
+    minHeapSize = "128m"
+    maxHeapSize = "512m"
+}
\ No newline at end of file
diff --git a/crossdc-producer/gradle.properties b/crossdc-producer/gradle.properties
index 66975c6..21aee2d 100644
--- a/crossdc-producer/gradle.properties
+++ b/crossdc-producer/gradle.properties
@@ -1,2 +1,2 @@
 group=org.apache.solr
-version=0.9-SNAPSHOT
\ No newline at end of file
+version=1.0-SNAPSHOT
\ No newline at end of file
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
index cf25ebe..d57b547 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
@@ -16,10 +16,7 @@ import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.crossdc.common.KafkaCrossDcConf;
 import org.apache.solr.crossdc.consumer.Consumer;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.junit.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,7 +28,6 @@ import java.util.Properties;
 
 @ThreadLeakFilters(defaultFilters = true, filters = { SolrIgnoredThreadsFilter.class,
     QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class })
-@Ignore
 @ThreadLeakLingering(linger = 5000) public class DeleteByQueryToIdTest extends
     SolrTestCaseJ4 {
 
@@ -136,6 +132,11 @@ import java.util.Properties;
       solrCluster2.getZkServer().getZkClient().printLayoutToStdOut();
       solrCluster2.shutdown();
     }
+
+    solrCluster1 = null;
+    solrCluster2 = null;
+    kafkaCluster = null;
+    consumer = null;
   }
 
   @After
@@ -147,6 +148,7 @@ import java.util.Properties;
     solrCluster2.getSolrClient().commit();
   }
 
+  @Test
   public void testDBQ() throws Exception {
 
     CloudSolrClient client = solrCluster1.getSolrClient();
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
index 8cbdae2..6d2d543 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
@@ -161,6 +161,13 @@ import java.util.Properties;
     if (zkTestServer2 != null) {
       zkTestServer2.shutdown();
     }
+
+    consumer = null;
+    solrCluster1 = null;
+    solrCluster2 = null;
+    kafkaCluster = null;
+    zkTestServer1 = null;
+    zkTestServer2 = null;
   }
 
   @After
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
index c3bb2e5..5c19268 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
@@ -57,12 +57,12 @@ import static org.mockito.Mockito.spy;
   static final String VERSION_FIELD = "_version_";
 
   private static final int NUM_BROKERS = 1;
-  public static EmbeddedKafkaCluster kafkaCluster;
+  public EmbeddedKafkaCluster kafkaCluster;
 
-  protected static volatile MiniSolrCloudCluster solrCluster1;
-  protected static volatile MiniSolrCloudCluster solrCluster2;
+  protected volatile MiniSolrCloudCluster solrCluster1;
+  protected volatile MiniSolrCloudCluster solrCluster2;
 
-  protected static volatile Consumer consumer;
+  protected volatile Consumer consumer;
 
   private static String TOPIC = "topic1";
 
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaMultiCollectionIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaMultiCollectionIntegrationTest.java
index 8027aae..815a5c2 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaMultiCollectionIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaMultiCollectionIntegrationTest.java
@@ -18,6 +18,7 @@ import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.crossdc.common.KafkaCrossDcConf;
 import org.apache.solr.crossdc.consumer.Consumer;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
index 927c5ab..c44f14d 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
@@ -124,6 +124,11 @@ import java.util.*;
       log.error("Exception stopping Kafka cluster", e);
     }
 
+    solrCluster1 = null;
+    solrCluster2 = null;
+    kafkaCluster = null;
+    consumer = null;
+
   }
 
   @After
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
index 991f67e..cf82cb7 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
@@ -153,6 +153,11 @@ import java.util.Properties;
       log.error("Exception stopping Kafka cluster", e);
     }
 
+    solrCluster1 = null;
+    solrCluster2 = null;
+    kafkaCluster = null;
+    consumer1 = null;
+    consumer2 = null;
   }
 
   @After
diff --git a/crossdc-producer/src/test/java/org/apache/solr/update/processor/MirroringUpdateProcessorTest.java b/crossdc-producer/src/test/java/org/apache/solr/update/processor/MirroringUpdateProcessorTest.java
index d21e410..bd23393 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/update/processor/MirroringUpdateProcessorTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/update/processor/MirroringUpdateProcessorTest.java
@@ -8,26 +8,16 @@ import org.apache.solr.cloud.CloudDescriptor;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.DocRouter;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.*;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.SolrCore;
-import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.request.SolrQueryRequestBase;
 import org.apache.solr.schema.IndexSchema;
 import org.apache.solr.update.AddUpdateCommand;
 import org.apache.solr.update.DeleteUpdateCommand;
-import org.apache.solr.update.processor.DistributedUpdateProcessor;
-import org.apache.solr.update.processor.MirroringUpdateProcessor;
-import org.apache.solr.update.processor.RequestMirroringHandler;
-import org.apache.solr.update.processor.UpdateRequestProcessor;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -36,142 +26,212 @@ import java.io.IOException;
 
 public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
 
-  private UpdateRequestProcessor next;
-  private MirroringUpdateProcessor processor;
-  private RequestMirroringHandler requestMirroringHandler;
-  private AddUpdateCommand addUpdateCommand;
-  private DeleteUpdateCommand deleteUpdateCommand;
-  private SolrQueryRequestBase req;
-  UpdateRequest requestMock;
-  private UpdateRequestProcessor nextProcessor;
-  private SolrCore core;
-  private HttpSolrClient.Builder builder = Mockito.mock(HttpSolrClient.Builder.class);
-  private HttpSolrClient client = Mockito.mock(HttpSolrClient.class);
-  private CloudDescriptor cloudDesc;
-
-  @Before public void setUp() throws Exception {
-    super.setUp();
-    addUpdateCommand = new AddUpdateCommand(req);
-    addUpdateCommand.solrDoc = new SolrInputDocument();
-    addUpdateCommand.solrDoc.addField("id", "test");
-    req = Mockito.mock(SolrQueryRequestBase.class);
-    Mockito.when(req.getParams()).thenReturn(new ModifiableSolrParams());
-
-    requestMock = Mockito.mock(UpdateRequest.class);
-    addUpdateCommand.setReq(req);
-
-    nextProcessor = Mockito.mock(UpdateRequestProcessor.class);
-
-    IndexSchema schema = Mockito.mock(IndexSchema.class);
-    Mockito.when(req.getSchema()).thenReturn(schema);
-
-    deleteUpdateCommand = new DeleteUpdateCommand(req);
-    deleteUpdateCommand.query = "*:*";
-
-    next = Mockito.mock(UpdateRequestProcessor.class);
-    requestMirroringHandler = Mockito.mock(RequestMirroringHandler.class);
-    processor = new MirroringUpdateProcessor(next, true, true, 1000L, new ModifiableSolrParams(), DistributedUpdateProcessor.DistribPhase.NONE,
-        requestMirroringHandler) {
-      UpdateRequest createMirrorRequest() {
-        return requestMock;
-      }
-    };
-
-    core = Mockito.mock(SolrCore.class);
-    CoreDescriptor coreDesc = Mockito.mock(CoreDescriptor.class);
-    cloudDesc = Mockito.mock(CloudDescriptor.class);
-    CoreContainer coreContainer = Mockito.mock(CoreContainer.class);
-    ZkController zkController = Mockito.mock(ZkController.class);
-    ClusterState clusterState = Mockito.mock(ClusterState.class);
-    DocCollection docCollection = Mockito.mock(DocCollection.class);
-    DocRouter docRouter = Mockito.mock(DocRouter.class);
-    Slice slice = Mockito.mock(Slice.class);
-    ZkStateReader zkStateReader = Mockito.mock(ZkStateReader.class);
-    Replica replica = Mockito.mock(Replica.class);
-
-    Mockito.when(replica.getName()).thenReturn("replica1");
-    Mockito.when(zkStateReader.getLeaderRetry(Mockito.any(), Mockito.any())).thenReturn(replica);
-    Mockito.when(zkController.getZkStateReader()).thenReturn(zkStateReader);
-    Mockito.when(coreDesc.getCloudDescriptor()).thenReturn(cloudDesc);
-    Mockito.when(clusterState.getCollection(Mockito.any())).thenReturn(docCollection);
-    Mockito.when(docCollection.getRouter()).thenReturn(docRouter);
-    Mockito.when(docRouter.getTargetSlice(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(slice);
-    Mockito.when(zkController.getClusterState()).thenReturn(clusterState);
-    Mockito.when(coreContainer.getZkController()).thenReturn(zkController);
-    Mockito.when(core.getCoreContainer()).thenReturn(coreContainer);
-    Mockito.when(core.getCoreDescriptor()).thenReturn(coreDesc);
-    Mockito.when(req.getCore()).thenReturn(core);
-  }
-
-  @Test public void testProcessAddWithinLimit() throws Exception {
-    Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
-    SolrInputDocument doc = new SolrInputDocument();
-    doc.addField("id", "1");
-    AddUpdateCommand cmd = new AddUpdateCommand(req);
-    cmd.solrDoc = doc;
-    cmd.commitWithin = 1000;
-    cmd.overwrite = true;
-    processor.processAdd(cmd);
-    Mockito.verify(next).processAdd(cmd);
-    Mockito.verify(requestMirroringHandler).mirror(requestMock);
-  }
-
-  @Test public void testProcessAddExceedsLimit() {
-    AddUpdateCommand addUpdateCommand = new AddUpdateCommand(req);
-    SolrInputDocument solrInputDocument = new SolrInputDocument();
-    solrInputDocument.addField("id", "123");
-    solrInputDocument.addField("large_field", "Test ".repeat(10000));
-    addUpdateCommand.solrDoc = solrInputDocument;
-
-    Mockito.when(req.getCore()).thenReturn(core);
-    Mockito.when(req.getCore().getCoreDescriptor()).thenReturn(Mockito.mock(CoreDescriptor.class));
-    Mockito.when(req.getCore().getCoreDescriptor().getCloudDescriptor()).thenReturn(Mockito.mock(CloudDescriptor.class));
-    Mockito.when(req.getCore().getCoreContainer()).thenReturn(Mockito.mock(CoreContainer.class));
-    Mockito.when(req.getCore().getCoreContainer().getZkController()).thenReturn(Mockito.mock(ZkController.class));
-    Mockito.when(req.getCore().getCoreContainer().getZkController().getClusterState()).thenReturn(Mockito.mock(ClusterState.class));
-
-    SolrParams mirrorParams = new ModifiableSolrParams();
-    MirroringUpdateProcessor mirroringUpdateProcessorWithLimit = new MirroringUpdateProcessor(nextProcessor, true, false, // indexUnmirrorableDocs set to false
-        50000, mirrorParams, DistributedUpdateProcessor.DistribPhase.NONE, requestMirroringHandler);
-
-    assertThrows(SolrException.class, () -> mirroringUpdateProcessorWithLimit.processAdd(addUpdateCommand));
-  }
-
-  @Test public void testProcessAddLeader() throws Exception {
-    Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
-    processor.processAdd(addUpdateCommand);
-    Mockito.verify(requestMirroringHandler, Mockito.times(1)).mirror(Mockito.any());
-  }
-
-  @Test public void testProcessAddNotLeader() throws Exception {
-    Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica2");
-    processor.processAdd(addUpdateCommand);
-    Mockito.verify(requestMirroringHandler, Mockito.times(0)).mirror(Mockito.any());
-  }
-
-  @Test public void testProcessDelete() throws Exception {
-    Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
-    processor.processDelete(deleteUpdateCommand);
-    Mockito.verify(requestMirroringHandler, Mockito.times(1)).mirror(Mockito.any());
-  }
-
-  @Test public void testProcessDBQResults() throws Exception {
-    Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
-    Mockito.when(builder.build()).thenReturn(client);
-    SolrInputDocument doc = new SolrInputDocument();
-    doc.addField("id", "test");
-    addUpdateCommand.solrDoc = doc;
-    processor.processAdd(addUpdateCommand);
-
-    SolrQuery query = new SolrQuery();
-    query.setQuery("*:*");
-    query.setRows(1000);
-    query.setSort("id", SolrQuery.ORDER.asc);
-
-    processor.processDelete(deleteUpdateCommand);
-  }
-
-  @Test public void testFinish() throws IOException {
-    processor.finish();
-  }
+    private UpdateRequestProcessor next;
+    private MirroringUpdateProcessor processor;
+    private RequestMirroringHandler requestMirroringHandler;
+    private AddUpdateCommand addUpdateCommand;
+    private DeleteUpdateCommand deleteUpdateCommand;
+    private SolrQueryRequestBase req;
+    UpdateRequest requestMock;
+    private UpdateRequestProcessor nextProcessor;
+    private SolrCore core;
+    private HttpSolrClient.Builder builder = Mockito.mock(HttpSolrClient.Builder.class);
+    private HttpSolrClient client = Mockito.mock(HttpSolrClient.class);
+    private CloudDescriptor cloudDesc;
+
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        addUpdateCommand = new AddUpdateCommand(req);
+        addUpdateCommand.solrDoc = new SolrInputDocument();
+        addUpdateCommand.solrDoc.addField("id", "test");
+        req = Mockito.mock(SolrQueryRequestBase.class);
+        Mockito.when(req.getParams()).thenReturn(new ModifiableSolrParams());
+
+        requestMock = Mockito.mock(UpdateRequest.class);
+        addUpdateCommand.setReq(req);
+
+        nextProcessor = Mockito.mock(UpdateRequestProcessor.class);
+
+        IndexSchema schema = Mockito.mock(IndexSchema.class);
+        Mockito.when(req.getSchema()).thenReturn(schema);
+
+        deleteUpdateCommand = new DeleteUpdateCommand(req);
+        deleteUpdateCommand.query = "*:*";
+
+        next = Mockito.mock(UpdateRequestProcessor.class);
+        requestMirroringHandler = Mockito.mock(RequestMirroringHandler.class);
+        processor =
+                new MirroringUpdateProcessor(
+                        next,
+                        true,
+                        true,
+                        1000L,
+                        new ModifiableSolrParams(),
+                        DistributedUpdateProcessor.DistribPhase.NONE,
+                        requestMirroringHandler) {
+                    UpdateRequest createMirrorRequest() {
+                        return requestMock;
+                    }
+                };
+
+        core = Mockito.mock(SolrCore.class);
+        CoreDescriptor coreDesc = Mockito.mock(CoreDescriptor.class);
+        cloudDesc = Mockito.mock(CloudDescriptor.class);
+        CoreContainer coreContainer = Mockito.mock(CoreContainer.class);
+        ZkController zkController = Mockito.mock(ZkController.class);
+        ClusterState clusterState = Mockito.mock(ClusterState.class);
+        DocCollection docCollection = Mockito.mock(DocCollection.class);
+        DocRouter docRouter = Mockito.mock(DocRouter.class);
+        Slice slice = Mockito.mock(Slice.class);
+        ZkStateReader zkStateReader = Mockito.mock(ZkStateReader.class);
+        Replica replica = Mockito.mock(Replica.class);
+
+        Mockito.when(replica.getName()).thenReturn("replica1");
+        Mockito.when(zkStateReader.getLeaderRetry(Mockito.any(), Mockito.any()))
+                .thenReturn(replica);
+        Mockito.when(zkController.getZkStateReader()).thenReturn(zkStateReader);
+        Mockito.when(coreDesc.getCloudDescriptor()).thenReturn(cloudDesc);
+        Mockito.when(clusterState.getCollection(Mockito.any())).thenReturn(docCollection);
+        Mockito.when(docCollection.getRouter()).thenReturn(docRouter);
+        Mockito.when(
+                        docRouter.getTargetSlice(
+                                Mockito.any(),
+                                Mockito.any(),
+                                Mockito.any(),
+                                Mockito.any(),
+                                Mockito.any()))
+                .thenReturn(slice);
+        Mockito.when(zkController.getClusterState()).thenReturn(clusterState);
+        Mockito.when(coreContainer.getZkController()).thenReturn(zkController);
+        Mockito.when(core.getCoreContainer()).thenReturn(coreContainer);
+        Mockito.when(core.getCoreDescriptor()).thenReturn(coreDesc);
+        Mockito.when(req.getCore()).thenReturn(core);
+    }
+
+    /**
+     * Should process delete command and mirror the document when the distribPhase is NONE and
+     * deleteById is false
+     */
+    @Test
+    public void processDeleteWhenDistribPhaseIsNoneAndDeleteByIdIsFalse() {
+        try {
+            processor.processDelete(deleteUpdateCommand);
+            Mockito.verify(requestMirroringHandler, Mockito.times(1)).mirror(requestMock);
+        } catch (Exception e) {
+            fail("IOException should not be thrown");
+        }
+    }
+
+    /**
+     * Should process add command and mirror the document when the document size is within the limit
+     * and the node is a leader
+     */
+    @Test
+    public void processAddWhenDocSizeWithinLimitAndNodeIsLeader() {
+        try {
+            Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+            processor.processAdd(addUpdateCommand);
+            Mockito.verify(requestMirroringHandler, Mockito.times(1)).mirror(requestMock);
+        } catch (IOException e) {
+            fail("IOException should not be thrown");
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Should process delete command and mirror the document when the node is a leader and
+     * deleteById is true
+     */
+    @Test
+    public void processDeleteWhenNodeIsLeaderAndDeleteByIdIsTrue() {
+        try {
+            Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+            deleteUpdateCommand.setId("test");
+            processor.processDelete(deleteUpdateCommand);
+            Mockito.verify(requestMirroringHandler, Mockito.times(1)).mirror(requestMock);
+        } catch (Exception e) {
+            fail("IOException should not be thrown");
+        }
+    }
+
+    @Test
+    public void testProcessAddWithinLimit() throws Exception {
+        Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+        SolrInputDocument doc = new SolrInputDocument();
+        doc.addField("id", "1");
+        AddUpdateCommand cmd = new AddUpdateCommand(req);
+        cmd.solrDoc = doc;
+        cmd.commitWithin = 1000;
+        cmd.overwrite = true;
+        processor.processAdd(cmd);
+        Mockito.verify(next).processAdd(cmd);
+        Mockito.verify(requestMirroringHandler).mirror(requestMock);
+    }
+
+    @Test
+    public void testProcessAddExceedsLimit() {
+        AddUpdateCommand addUpdateCommand = new AddUpdateCommand(req);
+        SolrInputDocument solrInputDocument = new SolrInputDocument();
+        solrInputDocument.addField("id", "123");
+        solrInputDocument.addField("large_field", "Test ".repeat(10000));
+        addUpdateCommand.solrDoc = solrInputDocument;
+
+        Mockito.when(req.getCore()).thenReturn(core);
+        Mockito.when(req.getCore().getCoreDescriptor()).thenReturn(Mockito.mock(CoreDescriptor.class));
+        Mockito.when(req.getCore().getCoreDescriptor().getCloudDescriptor()).thenReturn(Mockito.mock(CloudDescriptor.class));
+        Mockito.when(req.getCore().getCoreContainer()).thenReturn(Mockito.mock(CoreContainer.class));
+        Mockito.when(req.getCore().getCoreContainer().getZkController()).thenReturn(Mockito.mock(ZkController.class));
+        Mockito.when(req.getCore().getCoreContainer().getZkController().getClusterState()).thenReturn(Mockito.mock(ClusterState.class));
+
+        SolrParams mirrorParams = new ModifiableSolrParams();
+        MirroringUpdateProcessor mirroringUpdateProcessorWithLimit = new MirroringUpdateProcessor(nextProcessor, true, false, // indexUnmirrorableDocs set to false
+                50000, mirrorParams, DistributedUpdateProcessor.DistribPhase.NONE, requestMirroringHandler);
+
+        assertThrows(SolrException.class, () -> mirroringUpdateProcessorWithLimit.processAdd(addUpdateCommand));
+    }
+
+    @Test
+    public void testProcessAddLeader() throws Exception {
+        Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+        processor.processAdd(addUpdateCommand);
+        Mockito.verify(requestMirroringHandler, Mockito.times(1)).mirror(Mockito.any());
+    }
+
+    @Test
+    public void testProcessAddNotLeader() throws Exception {
+        Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica2");
+        processor.processAdd(addUpdateCommand);
+        Mockito.verify(requestMirroringHandler, Mockito.times(0)).mirror(Mockito.any());
+    }
+
+    @Test
+    public void testProcessDelete() throws Exception {
+        Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+        processor.processDelete(deleteUpdateCommand);
+        Mockito.verify(requestMirroringHandler, Mockito.times(1)).mirror(Mockito.any());
+    }
+
+    @Test
+    public void testProcessDBQResults() throws Exception {
+        Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+        Mockito.when(builder.build()).thenReturn(client);
+        SolrInputDocument doc = new SolrInputDocument();
+        doc.addField("id", "test");
+        addUpdateCommand.solrDoc = doc;
+        processor.processAdd(addUpdateCommand);
+
+        SolrQuery query = new SolrQuery();
+        query.setQuery("*:*");
+        query.setRows(1000);
+        query.setSort("id", SolrQuery.ORDER.asc);
+
+        processor.processDelete(deleteUpdateCommand);
+    }
+
+    @Test
+    public void testFinish() throws IOException {
+        processor.finish();
+    }
 }
\ No newline at end of file
diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar
index e708b1c..943f0cb 100644
Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index aa991fc..f398c33 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,5 +1,6 @@
 distributionBase=GRADLE_USER_HOME
 distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-7.4.2-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-7.6-bin.zip
+networkTimeout=10000
 zipStoreBase=GRADLE_USER_HOME
 zipStorePath=wrapper/dists
diff --git a/gradlew b/gradlew
index 81a9eed..65dcd68 100644
--- a/gradlew
+++ b/gradlew
@@ -1,7 +1,7 @@
-#!/usr/bin/env sh
+#!/bin/sh
 
 #
-# Copyright 2015 the original author or authors.
+# Copyright © 2015-2021 the original authors.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -17,76 +17,113 @@
 #
 
 ##############################################################################
-##
-##  Gradle start up script for UN*X
-##
+#
+#   Gradle start up script for POSIX generated by Gradle.
+#
+#   Important for running:
+#
+#   (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is
+#       noncompliant, but you have some other compliant shell such as ksh or
+#       bash, then to run this script, type that shell name before the whole
+#       command line, like:
+#
+#           ksh Gradle
+#
+#       Busybox and similar reduced shells will NOT work, because this script
+#       requires all of these POSIX shell features:
+#         * functions;
+#         * expansions «$var», «${var}», «${var:-default}», «${var+SET}»,
+#           «${var#prefix}», «${var%suffix}», and «$( cmd )»;
+#         * compound commands having a testable exit status, especially «case»;
+#         * various built-in commands including «command», «set», and «ulimit».
+#
+#   Important for patching:
+#
+#   (2) This script targets any POSIX shell, so it avoids extensions provided
+#       by Bash, Ksh, etc; in particular arrays are avoided.
+#
+#       The "traditional" practice of packing multiple parameters into a
+#       space-separated string is a well documented source of bugs and security
+#       problems, so this is (mostly) avoided, by progressively accumulating
+#       options in "$@", and eventually passing that to Java.
+#
+#       Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS,
+#       and GRADLE_OPTS) rely on word-splitting, this is performed explicitly;
+#       see the in-line comments for details.
+#
+#       There are tweaks for specific operating systems such as AIX, CygWin,
+#       Darwin, MinGW, and NonStop.
+#
+#   (3) This script is generated from the Groovy template
+#       https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
+#       within the Gradle project.
+#
+#       You can find Gradle at https://github.com/gradle/gradle/.
+#
 ##############################################################################
 
 # Attempt to set APP_HOME
+
 # Resolve links: $0 may be a link
-PRG="$0"
-# Need this for relative symlinks.
-while [ -h "$PRG" ] ; do
-    ls=`ls -ld "$PRG"`
-    link=`expr "$ls" : '.*-> \(.*\)$'`
-    if expr "$link" : '/.*' > /dev/null; then
-        PRG="$link"
-    else
-        PRG=`dirname "$PRG"`"/$link"
-    fi
+app_path=$0
+
+# Need this for daisy-chained symlinks.
+while
+    APP_HOME=${app_path%"${app_path##*/}"}  # leaves a trailing /; empty if no leading path
+    [ -h "$app_path" ]
+do
+    ls=$( ls -ld "$app_path" )
+    link=${ls#*' -> '}
+    case $link in             #(
+      /*)   app_path=$link ;; #(
+      *)    app_path=$APP_HOME$link ;;
+    esac
 done
-SAVED="`pwd`"
-cd "`dirname \"$PRG\"`/" >/dev/null
-APP_HOME="`pwd -P`"
-cd "$SAVED" >/dev/null
 
-APP_NAME="Gradle"
-APP_BASE_NAME=`basename "$0"`
+# This is normally unused
+# shellcheck disable=SC2034
+APP_BASE_NAME=${0##*/}
+APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit
 
 # Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
 DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
 
 # Use the maximum available, or set MAX_FD != -1 to use that value.
-MAX_FD="maximum"
+MAX_FD=maximum
 
 warn () {
     echo "$*"
-}
+} >&2
 
 die () {
     echo
     echo "$*"
     echo
     exit 1
-}
+} >&2
 
 # OS specific support (must be 'true' or 'false').
 cygwin=false
 msys=false
 darwin=false
 nonstop=false
-case "`uname`" in
-  CYGWIN* )
-    cygwin=true
-    ;;
-  Darwin* )
-    darwin=true
-    ;;
-  MINGW* )
-    msys=true
-    ;;
-  NONSTOP* )
-    nonstop=true
-    ;;
+case "$( uname )" in                #(
+  CYGWIN* )         cygwin=true  ;; #(
+  Darwin* )         darwin=true  ;; #(
+  MSYS* | MINGW* )  msys=true    ;; #(
+  NONSTOP* )        nonstop=true ;;
 esac
 
+CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
+
+
 # Determine the Java command to use to start the JVM.
 if [ -n "$JAVA_HOME" ] ; then
     if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
         # IBM's JDK on AIX uses strange locations for the executables
-        JAVACMD="$JAVA_HOME/jre/sh/java"
+        JAVACMD=$JAVA_HOME/jre/sh/java
     else
-        JAVACMD="$JAVA_HOME/bin/java"
+        JAVACMD=$JAVA_HOME/bin/java
     fi
     if [ ! -x "$JAVACMD" ] ; then
         die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
@@ -95,112 +132,113 @@ Please set the JAVA_HOME variable in your environment to match the
 location of your Java installation."
     fi
 else
-    JAVACMD="java"
+    JAVACMD=java
     which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
 
 Please set the JAVA_HOME variable in your environment to match the
 location of your Java installation."
 fi
 
-# LUCENE-9471: workaround for gradle leaving junk temp. files behind.
-GRADLE_TEMPDIR="$APP_HOME/.gradle/tmp"
-mkdir -p "$GRADLE_TEMPDIR"
-if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then
-    GRADLE_TEMPDIR=`cygpath --path --mixed "$GRADLE_TEMPDIR"`
-fi
-DEFAULT_JVM_OPTS="$DEFAULT_JVM_OPTS \"-Djava.io.tmpdir=$GRADLE_TEMPDIR\""
-
-# Set APP_HOME and GRADLE_WRAPPER_JAR. Unlike lucene-solr, we aren't downloading and verifying the jar if it's not present
-if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then
-    APP_HOME=`cygpath --path --mixed "$APP_HOME"`
-fi
-GRADLE_WRAPPER_JAR="$APP_HOME/gradle/wrapper/gradle-wrapper.jar"
-
-CLASSPATH=$GRADLE_WRAPPER_JAR
-
-# Don't fork a daemon mode on initial run that generates local defaults.
-GRADLE_DAEMON_CTRL=
-if [ ! -e "$APP_HOME/gradle.properties" ]; then
-    GRADLE_DAEMON_CTRL=--no-daemon
-fi
-
 # Increase the maximum file descriptors if we can.
-if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then
-    MAX_FD_LIMIT=`ulimit -H -n`
-    if [ $? -eq 0 ] ; then
-        if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then
-            MAX_FD="$MAX_FD_LIMIT"
-        fi
-        ulimit -n $MAX_FD
-        if [ $? -ne 0 ] ; then
-            warn "Could not set maximum file descriptor limit: $MAX_FD"
-        fi
-    else
-        warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT"
-    fi
+if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then
+    case $MAX_FD in #(
+      max*)
+        # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked.
+        # shellcheck disable=SC3045 
+        MAX_FD=$( ulimit -H -n ) ||
+            warn "Could not query maximum file descriptor limit"
+    esac
+    case $MAX_FD in  #(
+      '' | soft) :;; #(
+      *)
+        # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked.
+        # shellcheck disable=SC3045 
+        ulimit -n "$MAX_FD" ||
+            warn "Could not set maximum file descriptor limit to $MAX_FD"
+    esac
 fi
 
-# For Darwin, add options to specify how the application appears in the dock
-if $darwin; then
-    GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
-fi
+# Collect all arguments for the java command, stacking in reverse order:
+#   * args from the command line
+#   * the main class name
+#   * -classpath
+#   * -D...appname settings
+#   * --module-path (only if needed)
+#   * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables.
 
 # For Cygwin or MSYS, switch paths to Windows format before running java
-if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then
-    JAVACMD=`cygpath --unix "$JAVACMD"`
-
-    # We build the pattern for arguments to be converted via cygpath
-    ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`
-    SEP=""
-    for dir in $ROOTDIRSRAW ; do
-        ROOTDIRS="$ROOTDIRS$SEP$dir"
-        SEP="|"
-    done
-    OURCYGPATTERN="(^($ROOTDIRS))"
-    # Add a user-defined pattern to the cygpath arguments
-    if [ "$GRADLE_CYGPATTERN" != "" ] ; then
-        OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)"
-    fi
+if "$cygwin" || "$msys" ; then
+    APP_HOME=$( cygpath --path --mixed "$APP_HOME" )
+    CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" )
+
+    JAVACMD=$( cygpath --unix "$JAVACMD" )
+
     # Now convert the arguments - kludge to limit ourselves to /bin/sh
-    i=0
-    for arg in "$@" ; do
-        CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -`
-        CHECK2=`echo "$arg"|egrep -c "^-"`                                 ### Determine if an option
-
-        if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then                    ### Added a condition
-            eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"`
-        else
-            eval `echo args$i`="\"$arg\""
+    for arg do
+        if
+            case $arg in                                #(
+              -*)   false ;;                            # don't mess with options #(
+              /?*)  t=${arg#/} t=/${t%%/*}              # looks like a POSIX filepath
+                    [ -e "$t" ] ;;                      #(
+              *)    false ;;
+            esac
+        then
+            arg=$( cygpath --path --ignore --mixed "$arg" )
         fi
-        i=$((i+1))
+        # Roll the args list around exactly as many times as the number of
+        # args, so each arg winds up back in the position where it started, but
+        # possibly modified.
+        #
+        # NB: a `for` loop captures its iteration list before it begins, so
+        # changing the positional parameters here affects neither the number of
+        # iterations, nor the values presented in `arg`.
+        shift                   # remove old arg
+        set -- "$@" "$arg"      # push replacement arg
     done
-    case $i in
-        (0) set -- ;;
-        (1) set -- "$args0" ;;
-        (2) set -- "$args0" "$args1" ;;
-        (3) set -- "$args0" "$args1" "$args2" ;;
-        (4) set -- "$args0" "$args1" "$args2" "$args3" ;;
-        (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;;
-        (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;;
-        (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;;
-        (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;;
-        (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;;
-    esac
 fi
 
-# Escape application args
-save () {
-    for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
-    echo " "
-}
-APP_ARGS=$(save "$@")
+# Collect all arguments for the java command;
+#   * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of
+#     shell script including quotes and variable substitutions, so put them in
+#     double quotes to make sure that they get re-expanded; and
+#   * put everything else in single quotes, so that it's not re-expanded.
+
+set -- \
+        "-Dorg.gradle.appname=$APP_BASE_NAME" \
+        -classpath "$CLASSPATH" \
+        org.gradle.wrapper.GradleWrapperMain \
+        "$@"
+
+# Stop when "xargs" is not available.
+if ! command -v xargs >/dev/null 2>&1
+then
+    die "xargs is not available"
+fi
 
-# Collect all arguments for the java command, following the shell quoting and substitution rules
-eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain $GRADLE_DAEMON_CTRL "$APP_ARGS"
+# Use "xargs" to parse quoted args.
+#
+# With -n1 it outputs one arg per line, with the quotes and backslashes removed.
+#
+# In Bash we could simply go:
+#
+#   readarray ARGS < <( xargs -n1 <<<"$var" ) &&
+#   set -- "${ARGS[@]}" "$@"
+#
+# but POSIX shell has neither arrays nor command substitution, so instead we
+# post-process each arg (as a line of input to sed) to backslash-escape any
+# character that might be a shell metacharacter, then use eval to reverse
+# that process (while maintaining the separation between arguments), and wrap
+# the whole thing up as a single "set" statement.
+#
+# This will of course break if any of these variables contains a newline or
+# an unmatched quote.
+#
 
-# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong
-if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then
-  cd "$(dirname "$0")"
-fi
+eval "set -- $(
+        printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" |
+        xargs -n1 |
+        sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' |
+        tr '\n' ' '
+    )" '"$@"'
 
 exec "$JAVACMD" "$@"
diff --git a/gradlew.bat b/gradlew.bat
index 8007957..93e3f59 100644
--- a/gradlew.bat
+++ b/gradlew.bat
@@ -14,7 +14,7 @@
 @rem limitations under the License.
 @rem
 
-@if "%DEBUG%" == "" @echo off
+@if "%DEBUG%"=="" @echo off
 @rem ##########################################################################
 @rem
 @rem  Gradle startup script for Windows
@@ -25,24 +25,23 @@
 if "%OS%"=="Windows_NT" setlocal
 
 set DIRNAME=%~dp0
-if "%DIRNAME%" == "" set DIRNAME=.
+if "%DIRNAME%"=="" set DIRNAME=.
+@rem This is normally unused
 set APP_BASE_NAME=%~n0
 set APP_HOME=%DIRNAME%
 
+@rem Resolve any "." and ".." in APP_HOME to make it shorter.
+for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi
+
 @rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
 set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"
 
-@rem LUCENE-9471: workaround for gradle leaving junk temp. files behind.
-SET GRADLE_TEMPDIR=%DIRNAME%\.gradle\tmp
-IF NOT EXIST "%GRADLE_TEMPDIR%" MKDIR "%GRADLE_TEMPDIR%"
-SET DEFAULT_JVM_OPTS=%DEFAULT_JVM_OPTS% "-Djava.io.tmpdir=%GRADLE_TEMPDIR%"
-
 @rem Find java.exe
 if defined JAVA_HOME goto findJavaFromJavaHome
 
 set JAVA_EXE=java.exe
 %JAVA_EXE% -version >NUL 2>&1
-if "%ERRORLEVEL%" == "0" goto init
+if %ERRORLEVEL% equ 0 goto execute
 
 echo.
 echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
@@ -56,7 +55,7 @@ goto fail
 set JAVA_HOME=%JAVA_HOME:"=%
 set JAVA_EXE=%JAVA_HOME%/bin/java.exe
 
-if exist "%JAVA_EXE%" goto init
+if exist "%JAVA_EXE%" goto execute
 
 echo.
 echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
@@ -66,42 +65,26 @@ echo location of your Java installation.
 
 goto fail
 
-:init
-@rem Get command-line arguments, handling Windows variants
-
-if not "%OS%" == "Windows_NT" goto win9xME_args
-
-:win9xME_args
-@rem Slurp the command line arguments.
-set CMD_LINE_ARGS=
-set _SKIP=2
-
-:win9xME_args_slurp
-if "x%~1" == "x" goto execute
-
-set CMD_LINE_ARGS=%*
-
 :execute
-
 @rem Setup the command line
-set CLASSPATH=%GRADLE_WRAPPER_JAR%
 
-@rem Don't fork a daemon mode on initial run that generates local defaults.
-SET GRADLE_DAEMON_CTRL=
-IF NOT EXIST "%DIRNAME%\gradle.properties" SET GRADLE_DAEMON_CTRL=--no-daemon
+set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
+
 
 @rem Execute Gradle
-"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %GRADLE_DAEMON_CTRL% %CMD_LINE_ARGS%
+"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %*
 
 :end
 @rem End local scope for the variables with windows NT shell
-if "%ERRORLEVEL%"=="0" goto mainEnd
+if %ERRORLEVEL% equ 0 goto mainEnd
 
 :fail
 rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
 rem the _cmd.exe /c_ return code!
-if  not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
-exit /b 1
+set EXIT_CODE=%ERRORLEVEL%
+if %EXIT_CODE% equ 0 set EXIT_CODE=1
+if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE%
+exit /b %EXIT_CODE%
 
 :mainEnd
 if "%OS%"=="Windows_NT" endlocal