You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ma...@apache.org on 2023/05/10 19:04:54 UTC
[solr-sandbox] branch main updated: Some refactoring, a bunch of simple mock tests, test fixes & improvements, dep updates and fixes, rework offset management.. (#56)
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/main by this push:
new 5238ca6 Some refactoring, a bunch of simple mock tests, test fixes & improvements, dep updates and fixes, rework offset management.. (#56)
5238ca6 is described below
commit 5238ca6a0d15dd048608089be5eda66d8d9bc4a0
Author: Mark Robert Miller <ma...@apache.org>
AuthorDate: Wed May 10 14:04:49 2023 -0500
Some refactoring, a bunch of simple mock tests, test fixes & improvements, dep updates and fixes, rework offset management.. (#56)
* Some refactoring, a bunch of simple mock tests, test fixes & improvements, dep updates and fixes, rework offset managment..
* Version bump.
---
build.gradle | 2 +-
crossdc-commons/build.gradle | 5 +-
crossdc-commons/gradle.properties | 2 +-
crossdc-consumer/build.gradle | 32 +-
crossdc-consumer/gradle.properties | 2 +-
crossdc-consumer/machinet.conf | 3 +
.../solr/crossdc/consumer/BlockingQueue.java | 25 ++
.../crossdc/consumer/KafkaCrossDcConsumer.java | 180 +++------
.../solr/crossdc/consumer/PartitionManager.java | 129 ++++++
.../messageprocessor/SolrMessageProcessor.java | 5 +-
.../solr/crossdc/NoOpResubmitBackoffPolicy.java | 11 +
.../solr/crossdc/SimpleSolrIntegrationTest.java | 2 +
.../apache/solr/crossdc/TestMessageProcessor.java | 7 -
.../crossdc/consumer/KafkaCrossDcConsumerTest.java | 445 +++++++++++++++------
.../crossdc/consumer/PartitionManagerTest.java | 168 ++++++++
.../messageprocessor/SolrMessageProcessorTest.java | 109 +++++
crossdc-producer/build.gradle | 10 +-
crossdc-producer/gradle.properties | 2 +-
.../apache/solr/crossdc/DeleteByQueryToIdTest.java | 12 +-
.../solr/crossdc/RetryQueueIntegrationTest.java | 7 +
.../solr/crossdc/SolrAndKafkaIntegrationTest.java | 8 +-
...SolrAndKafkaMultiCollectionIntegrationTest.java | 1 +
.../solr/crossdc/SolrAndKafkaReindexTest.java | 5 +
.../solr/crossdc/ZkConfigIntegrationTest.java | 5 +
.../processor/MirroringUpdateProcessorTest.java | 358 ++++++++++-------
gradle/wrapper/gradle-wrapper.jar | Bin 59203 -> 61574 bytes
gradle/wrapper/gradle-wrapper.properties | 3 +-
gradlew | 294 ++++++++------
gradlew.bat | 49 +--
29 files changed, 1292 insertions(+), 589 deletions(-)
diff --git a/build.gradle b/build.gradle
index 25af428..9a38422 100644
--- a/build.gradle
+++ b/build.gradle
@@ -28,5 +28,5 @@ description 'Root for Solr plugins sandbox'
subprojects {
group "org.apache.solr.crossdc"
- //group "org.apache.solr.encryption"
+ group "org.apache.solr.encryption"
}
diff --git a/crossdc-commons/build.gradle b/crossdc-commons/build.gradle
index a687058..57cb915 100644
--- a/crossdc-commons/build.gradle
+++ b/crossdc-commons/build.gradle
@@ -34,7 +34,10 @@ sourceSets {
}
dependencies {
- provided 'org.apache.solr:solr-solrj:8.11.2'
+ provided 'org.apache.solr:solr-solrj:8.11.2', {
+ exclude group: "org.apache.logging.log4j", module: "*"
+ exclude group: "org.slf4j", module: "*"
+ }
implementation 'org.apache.kafka:kafka-clients:2.8.1'
implementation 'com.google.guava:guava:14.0'
}
diff --git a/crossdc-commons/gradle.properties b/crossdc-commons/gradle.properties
index 66975c6..21aee2d 100644
--- a/crossdc-commons/gradle.properties
+++ b/crossdc-commons/gradle.properties
@@ -1,2 +1,2 @@
group=org.apache.solr
-version=0.9-SNAPSHOT
\ No newline at end of file
+version=1.0-SNAPSHOT
\ No newline at end of file
diff --git a/crossdc-consumer/build.gradle b/crossdc-consumer/build.gradle
index 466bbfa..cca42fc 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-consumer/build.gradle
@@ -32,24 +32,34 @@ dependencies {
implementation group: 'org.apache.solr', name: 'solr-solrj', version: '8.11.2'
implementation project(path: ':crossdc-commons', configuration: 'shadow')
- implementation 'io.dropwizard.metrics:metrics-core:4.2.9'
- implementation 'org.slf4j:slf4j-api:1.7.36'
+ implementation 'io.dropwizard.metrics:metrics-core:4.2.17'
+ implementation 'org.slf4j:slf4j-api:2.0.5'
implementation 'org.eclipse.jetty:jetty-http:9.4.41.v20210516'
implementation 'org.eclipse.jetty:jetty-server:9.4.41.v20210516'
+ implementation group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.20.0' // log4j impl can use StackLocatorUtil which is in the api jar
implementation 'org.eclipse.jetty:jetty-servlet:9.4.41.v20210516'
- implementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.17.2'
- runtimeOnly ('com.google.protobuf:protobuf-java-util:3.19.2')
- runtimeOnly ('commons-codec:commons-codec:1.13')
+ implementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.20.0'
+ testImplementation project(path: ':crossdc-commons')
+ testImplementation project(path: ':crossdc-commons')
+ runtimeOnly ('com.google.protobuf:protobuf-java-util:3.22.2')
+ runtimeOnly ('commons-codec:commons-codec:1.15')
testImplementation 'org.hamcrest:hamcrest:2.2'
testImplementation 'junit:junit:4.13.2'
- testImplementation('org.mockito:mockito-core:4.3.1', {
- exclude group: "net.bytebuddy", module: "byte-buddy-agent"
- })
+ testImplementation('org.mockito:mockito-inline:5.2.0')
testImplementation project(':crossdc-producer')
- testImplementation group: 'org.apache.solr', name: 'solr-core', version: '8.11.2'
- testImplementation group: 'org.apache.solr', name: 'solr-test-framework', version: '8.11.2'
+ testImplementation group: 'org.apache.solr', name: 'solr-core', version: '8.11.2', {
+ exclude group: "org.apache.logging.log4j", module: "*"
+ exclude group: "org.slf4j", module: "*"
+ exclude group: "org.eclipse.jetty", module: "jetty-http"
+ exclude group: "org.eclipse.jetty", module: "jetty-server"
+ exclude group: "org.eclipse.jetty", module: "jetty-servlet"
+ }
+ testImplementation group: 'org.apache.solr', name: 'solr-test-framework', version: '8.11.2', {
+ exclude group: "org.apache.logging.log4j", module: "*"
+ exclude group: "org.slf4j", module: "*"
+ }
testImplementation 'org.apache.kafka:kafka_2.13:2.8.1'
testImplementation 'org.apache.kafka:kafka-streams:2.8.1'
testImplementation 'org.apache.kafka:kafka_2.13:2.8.1:test'
@@ -58,6 +68,8 @@ dependencies {
test {
jvmArgs '-Djava.security.egd=file:/dev/./urandom'
+ minHeapSize = "128m"
+ maxHeapSize = "512m"
}
tasks.withType(Tar){
diff --git a/crossdc-consumer/gradle.properties b/crossdc-consumer/gradle.properties
index 66975c6..21aee2d 100644
--- a/crossdc-consumer/gradle.properties
+++ b/crossdc-consumer/gradle.properties
@@ -1,2 +1,2 @@
group=org.apache.solr
-version=0.9-SNAPSHOT
\ No newline at end of file
+version=1.0-SNAPSHOT
\ No newline at end of file
diff --git a/crossdc-consumer/machinet.conf b/crossdc-consumer/machinet.conf
new file mode 100644
index 0000000..8831cd1
--- /dev/null
+++ b/crossdc-consumer/machinet.conf
@@ -0,0 +1,3 @@
+### Please DO NOT modify the contents of this file. For internal purpose only
+root=7badab89-0174-350c-a9c1-20ef5b1bf5a5
+rootId=ba95d78a-7c94-3571-9853-08775a97a3a0
\ No newline at end of file
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/BlockingQueue.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/BlockingQueue.java
new file mode 100644
index 0000000..bc414c8
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/BlockingQueue.java
@@ -0,0 +1,25 @@
+package org.apache.solr.crossdc.consumer;
+
+import java.util.concurrent.ArrayBlockingQueue;
+
+public class BlockingQueue<E> extends ArrayBlockingQueue<E> {
+
+ public BlockingQueue(int capacity) {
+ super(capacity);
+ }
+
+
+ @Override
+ public boolean offer(E r) {
+ //return super.offer(r);
+ try {
+ super.put(r);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ return true;
+ }
+
+
+}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index 9ac506a..e8195ac 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -38,7 +38,7 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
private final MetricRegistry metrics = SharedMetricRegistries.getOrCreate("metrics");
- private final KafkaConsumer<String,MirroredSolrRequest> consumer;
+ private final KafkaConsumer<String,MirroredSolrRequest> kafkaConsumer;
private final CountDownLatch startLatch;
KafkaMirroringSink kafkaMirroringSink;
@@ -48,22 +48,14 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
private final CloudSolrClient solrClient;
- private final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(10) {
- @Override public boolean offer(Runnable r) {
- //return super.offer(r);
- try {
- super.put(r);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return false;
- }
- return true;
- }
+ private final ThreadPoolExecutor executor;
+
+
+ private PartitionManager partitionManager;
+
+ private BlockingQueue<Runnable> queue = new BlockingQueue<>(10);
- };
- private final ThreadPoolExecutor executor;
- private final ConcurrentHashMap<TopicPartition,PartitionWork> partitionWorkMap = new ConcurrentHashMap<>();
/**
* @param conf The Kafka consumer configuration
@@ -95,22 +87,26 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
kafkaConsumerProps.putAll(conf.getAdditionalProperties());
int threads = conf.getInt(KafkaCrossDcConf.CONSUMER_PROCESSING_THREADS);
+
executor = new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS, queue, new ThreadFactory() {
- @Override public Thread newThread(Runnable r) {
- Thread t = new Thread(r);
- t.setName("KafkaCrossDcConsumerWorker");
- return t;
- }
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r);
+ t.setName("KafkaCrossDcConsumerWorker");
+ return t;
+ }
});
executor.prestartAllCoreThreads();
solrClient = createSolrClient(conf);
- messageProcessor = new SolrMessageProcessor(solrClient, resubmitRequest -> 0L);
+ messageProcessor = createSolrMessageProcessor();
- log.info("Creating Kafka consumer with configuration {}", kafkaConsumerProps);
- consumer = createConsumer(kafkaConsumerProps);
+
+ log.info("Creating Kafka consumer with configuration {}", kafkaConsumerProps);
+ kafkaConsumer = createKafkaConsumer(kafkaConsumerProps);
+ partitionManager = new PartitionManager(kafkaConsumer);
// Create producer for resubmitting failed requests
log.info("Creating Kafka resubmit producer");
this.kafkaMirroringSink = createKafkaMirroringSink(conf);
@@ -118,7 +114,11 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
}
- public KafkaConsumer<String,MirroredSolrRequest> createConsumer(Properties properties) {
+ protected SolrMessageProcessor createSolrMessageProcessor() {
+ return new SolrMessageProcessor(solrClient, resubmitRequest -> 0L);
+ }
+
+ public KafkaConsumer<String,MirroredSolrRequest> createKafkaConsumer(Properties properties) {
return new KafkaConsumer<>(properties, new StringDeserializer(), new MirroredSolrRequestSerializer());
}
@@ -133,7 +133,7 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
try {
- consumer.subscribe(Arrays.asList((topicNames)));
+ kafkaConsumer.subscribe(Arrays.asList((topicNames)));
log.info("Consumer started");
startLatch.countDown();
@@ -144,7 +144,7 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
log.info("Closed kafka consumer. Exiting now.");
try {
- consumer.close();
+ kafkaConsumer.close();
} catch (Exception e) {
log.warn("Failed to close kafka consumer", e);
}
@@ -167,11 +167,14 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
boolean pollAndProcessRequests() {
log.trace("Entered pollAndProcessRequests loop");
try {
- for (TopicPartition partition : partitionWorkMap.keySet()) {
- checkForOffsetUpdates(partition);
+ try {
+ partitionManager.checkOffsetUpdates();
+ } catch (Throwable e) {
+ log.error("Error while checking offset updates, shutting down", e);
+ return false;
}
- ConsumerRecords<String,MirroredSolrRequest> records = consumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_TIMEOUT_MS));
+ ConsumerRecords<String,MirroredSolrRequest> records = kafkaConsumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_TIMEOUT_MS));
if (log.isTraceEnabled()) {
log.trace("poll return {} records", records.count());
@@ -184,14 +187,9 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String,MirroredSolrRequest>> partitionRecords = records.records(partition);
- PartitionWork partitionWork = partitionWorkMap.compute(partition, (k, v) -> {
- if (v == null) {
- return new PartitionWork();
- }
- return v;
- });
- WorkUnit workUnit = new WorkUnit();
- workUnit.nextOffset = getOffsetForPartition(partitionRecords);
+ PartitionManager.PartitionWork partitionWork = partitionManager.getPartitionWork(partition);
+ PartitionManager.WorkUnit workUnit = new PartitionManager.WorkUnit(partition);
+ workUnit.nextOffset = PartitionManager.getOffsetForPartition(partitionRecords);
partitionWork.partitionQueue.add(workUnit);
try {
ModifiableSolrParams lastParams = null;
@@ -218,8 +216,8 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
lastParamsAsNamedList = null;
sendBatch(solrReqBatch, lastRecord, workUnit);
solrReqBatch = new UpdateRequest();
- workUnit = new WorkUnit();
- workUnit.nextOffset = getOffsetForPartition(partitionRecords);
+ workUnit = new PartitionManager.WorkUnit(partition);
+ workUnit.nextOffset = PartitionManager.getOffsetForPartition(partitionRecords);
partitionWork.partitionQueue.add(workUnit);
}
@@ -247,8 +245,12 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
}
sendBatch(solrReqBatch, lastRecord, workUnit);
-
- checkForOffsetUpdates(partition);
+ try {
+ partitionManager.checkForOffsetUpdates(partition);
+ } catch (Throwable e) {
+ log.error("Error while checking offset updates, shutting down", e);
+ return false;
+ }
// handleItem sets the thread interrupt, let's exit if there has been an interrupt set
if (Thread.currentThread().isInterrupted()) {
@@ -263,15 +265,18 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
if (e instanceof ClassCastException || e instanceof SerializationException) { // TODO: optional
log.error("Non retryable error", e);
- break;
+ return false;
}
log.error("Exception occurred in Kafka consumer thread, stopping the Consumer.", e);
- break;
+ return false;
}
}
- for (TopicPartition partition : partitionWorkMap.keySet()) {
- checkForOffsetUpdates(partition);
+ try {
+ partitionManager.checkOffsetUpdates();
+ } catch (Throwable e) {
+ log.error("Error while checking offset updates, shutting down", e);
+ return false;
}
} catch (WakeupException e) {
@@ -289,53 +294,32 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
return true;
}
- public void sendBatch(UpdateRequest solrReqBatch, ConsumerRecord<String,MirroredSolrRequest> lastRecord, WorkUnit workUnit) {
+ public void sendBatch(UpdateRequest solrReqBatch, ConsumerRecord<String,MirroredSolrRequest> lastRecord, PartitionManager.WorkUnit workUnit) {
UpdateRequest finalSolrReqBatch = solrReqBatch;
Future<?> future = executor.submit(() -> {
- IQueueHandler.Result<MirroredSolrRequest> result = messageProcessor.handleItem(new MirroredSolrRequest(finalSolrReqBatch));
try {
+ IQueueHandler.Result<MirroredSolrRequest> result = messageProcessor.handleItem(new MirroredSolrRequest(finalSolrReqBatch));
+
processResult(lastRecord, result);
} catch (MirroringException e) {
// We don't really know what to do here
log.error("Mirroring exception occurred while resubmitting to Kafka. We are going to stop the consumer thread now.", e);
throw new RuntimeException(e);
+ } finally {
+ executor.submit(() -> {
+ try {
+ partitionManager.checkForOffsetUpdates(workUnit.partition);
+ } catch (Throwable e) {
+ // already logging in checkForOffsetUpdates
+ }
+ });
}
+
});
workUnit.workItems.add(future);
}
- private void checkForOffsetUpdates(TopicPartition partition) {
- PartitionWork work;
- while ((work = partitionWorkMap.get(partition)) != null) {
- WorkUnit workUnit = work.partitionQueue.peek();
- if (workUnit != null) {
- for (Future<?> future : workUnit.workItems) {
- if (!future.isDone()) {
- if (log.isTraceEnabled()) {
- log.trace("Future for update is not done topic={}", partition.topic());
- }
- return;
- }
- if (log.isTraceEnabled()) {
- log.trace("Future for update is done topic={}", partition.topic());
- }
- try {
- future.get();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } catch (ExecutionException e) {
- log.error("Exception resubmitting updates to Kafka, stopping the Consumer thread", e);
- work.partitionQueue.poll();
- throw new RuntimeException("Exception resubmitting updates to Kafka, stopping the Consumer thread", e);
- }
- work.partitionQueue.poll();
- }
-
- updateOffset(partition, workUnit.nextOffset);
- }
- }
- }
void processResult(ConsumerRecord<String,MirroredSolrRequest> record, IQueueHandler.Result<MirroredSolrRequest> result) throws MirroringException {
switch (result.status()) {
@@ -370,44 +354,13 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
}
}
- /**
- * Reset the local offset so that the consumer reads the records from Kafka again.
- *
- * @param partition The TopicPartition to reset the offset for
- * @param partitionRecords PartitionRecords for the specified partition
- */
- private void resetOffsetForPartition(TopicPartition partition, List<ConsumerRecord<String,MirroredSolrRequest>> partitionRecords) {
- if (log.isTraceEnabled()) {
- log.trace("Resetting offset to: {}", partitionRecords.get(0).offset());
- }
- long resetOffset = partitionRecords.get(0).offset();
- consumer.seek(partition, resetOffset);
- }
-
- /**
- * Logs and updates the commit point for the partition that has been processed.
- *
- * @param partition The TopicPartition to update the offset for
- * @param nextOffset The next offset to commit for this partition.
- */
- private void updateOffset(TopicPartition partition, long nextOffset) {
- if (log.isTraceEnabled()) {
- log.trace("Updated offset for topic={} partition={} to offset={}", partition.topic(), partition.partition(), nextOffset);
- }
-
- consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(nextOffset)));
- }
- private static long getOffsetForPartition(List<ConsumerRecord<String,MirroredSolrRequest>> partitionRecords) {
- long nextOffset = partitionRecords.get(partitionRecords.size() - 1).offset() + 1;
- return nextOffset;
- }
/**
* Shutdown the Kafka consumer by calling wakeup.
*/
public final void shutdown() {
- consumer.wakeup();
+ kafkaConsumer.wakeup();
log.info("Shutdown called on KafkaCrossDcConsumer");
try {
if (!executor.isShutdown()) {
@@ -431,12 +384,5 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
return new KafkaMirroringSink(conf);
}
- private static class PartitionWork {
- private final Queue<WorkUnit> partitionQueue = new LinkedList<>();
- }
- static class WorkUnit {
- Set<Future<?>> workItems = new HashSet<>();
- long nextOffset;
- }
}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/PartitionManager.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/PartitionManager.java
new file mode 100644
index 0000000..62ae86e
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/PartitionManager.java
@@ -0,0 +1,129 @@
+package org.apache.solr.crossdc.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.*;
+import java.util.concurrent.*;
+
+
+public class PartitionManager {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ final ConcurrentHashMap<TopicPartition, PartitionWork> partitionWorkMap = new ConcurrentHashMap<>();
+ private final KafkaConsumer<String, MirroredSolrRequest> consumer;
+
+
+ static class PartitionWork {
+ final Queue<WorkUnit> partitionQueue = new LinkedList<>();
+ }
+
+ static class WorkUnit {
+ final TopicPartition partition;
+ Set<Future<?>> workItems = new HashSet<>();
+ long nextOffset;
+
+ public WorkUnit(TopicPartition partition) {
+ this.partition = partition;
+ }
+ }
+
+
+ PartitionManager(KafkaConsumer<String, MirroredSolrRequest> consumer) {
+ this.consumer = consumer;
+ }
+
+ public PartitionWork getPartitionWork(TopicPartition partition) {
+ return partitionWorkMap.compute(partition, (k, v) -> {
+ if (v == null) {
+ return new PartitionWork();
+ }
+ return v;
+ });
+ }
+
+
+ public void checkOffsetUpdates() throws Throwable {
+ for (TopicPartition partition : partitionWorkMap.keySet()) {
+ checkForOffsetUpdates(partition);
+ }
+ }
+
+ void checkForOffsetUpdates(TopicPartition partition) throws Throwable {
+ synchronized (partition) {
+ PartitionWork work;
+ if ((work = partitionWorkMap.get(partition)) != null) {
+ WorkUnit workUnit = work.partitionQueue.peek();
+ if (workUnit != null) {
+ boolean allFuturesDone = true;
+ for (Future<?> future : workUnit.workItems) {
+ if (!future.isDone()) {
+ if (log.isTraceEnabled()) {
+ log.trace("Future for update is not done topic={}", partition.topic());
+ }
+ allFuturesDone = false;
+ break;
+ }
+
+ try {
+ future.get();
+ } catch (InterruptedException e) {
+ log.error("Error updating offset for partition: {}", partition, e);
+ throw e;
+ } catch (ExecutionException e) {
+ log.error("Error updating offset for partition: {}", partition, e);
+ throw e.getCause();
+ }
+
+ if (log.isTraceEnabled()) {
+ log.trace("Future for update is done topic={}", partition.topic());
+ }
+ }
+
+ if (allFuturesDone) {
+ work.partitionQueue.poll();
+ updateOffset(partition, workUnit.nextOffset);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Reset the local offset so that the consumer reads the records from Kafka again.
+ *
+ * @param partition The TopicPartition to reset the offset for
+ * @param partitionRecords PartitionRecords for the specified partition
+ */
+ private void resetOffsetForPartition(TopicPartition
+ partition, List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords) {
+ if (log.isTraceEnabled()) {
+ log.trace("Resetting offset to: {}", partitionRecords.get(0).offset());
+ }
+ long resetOffset = partitionRecords.get(0).offset();
+ consumer.seek(partition, resetOffset);
+ }
+
+ /**
+ * Logs and updates the commit point for the partition that has been processed.
+ *
+ * @param partition The TopicPartition to update the offset for
+ * @param nextOffset The next offset to commit for this partition.
+ */
+ private void updateOffset(TopicPartition partition, long nextOffset) {
+ if (log.isTraceEnabled()) {
+ log.trace("Updated offset for topic={} partition={} to offset={}", partition.topic(), partition.partition(), nextOffset);
+ }
+
+ consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(nextOffset)));
+ }
+
+ static long getOffsetForPartition(List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords) {
+ return partitionRecords.get(partitionRecords.size() - 1).offset() + 1;
+ }
+}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
index b1a428e..dcea1d0 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
@@ -198,7 +198,8 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
private void logRequest(SolrRequest request) {
if(request instanceof UpdateRequest) {
final StringBuilder rmsg = new StringBuilder(64);
- rmsg.append("Submitting update request");
+ String collection = request.getCollection();
+ rmsg.append("Submitting update request for collection=").append(collection != null ? collection : request.getParams().get("collection"));
if(((UpdateRequest) request).getDeleteById() != null) {
final int numDeleteByIds = ((UpdateRequest) request).getDeleteById().size();
metrics.counter("numDeleteByIds").inc(numDeleteByIds);
@@ -282,7 +283,7 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
*
* @param mirroredSolrRequest MirroredSolrRequest object that is being processed.
*/
- private void preventCircularMirroring(MirroredSolrRequest mirroredSolrRequest) {
+ void preventCircularMirroring(MirroredSolrRequest mirroredSolrRequest) {
if (mirroredSolrRequest.getSolrRequest() instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) mirroredSolrRequest.getSolrRequest();
ModifiableSolrParams params = updateRequest.getParams();
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/NoOpResubmitBackoffPolicy.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/NoOpResubmitBackoffPolicy.java
new file mode 100644
index 0000000..d2bd804
--- /dev/null
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/NoOpResubmitBackoffPolicy.java
@@ -0,0 +1,11 @@
+package org.apache.solr.crossdc;
+
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
+
+public class NoOpResubmitBackoffPolicy implements ResubmitBackoffPolicy {
+
+ public long getBackoffTimeMs(MirroredSolrRequest resubmitRequest) {
+ return 0;
+ }
+}
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
index ebd102d..c3c7191 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
@@ -49,6 +49,8 @@ public class SimpleSolrIntegrationTest extends SolrCloudTestCase {
if (cluster1 != null) {
cluster1.shutdown();
}
+ cluster1 = null;
+ processor = null;
}
public void testDocumentSanitization() {
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java
index 9dc7073..5058c54 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java
@@ -43,13 +43,6 @@ import static org.mockito.Mockito.*;
public class TestMessageProcessor {
static final String VERSION_FIELD = "_version_";
- static class NoOpResubmitBackoffPolicy implements ResubmitBackoffPolicy {
- @Override
- public long getBackoffTimeMs(MirroredSolrRequest resubmitRequest) {
- return 0;
- }
- }
-
@Mock
private CloudSolrClient solrClient;
private SolrMessageProcessor processor;
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
index 9ff2a8e..4d503c0 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
@@ -4,9 +4,11 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.crossdc.common.IQueueHandler;
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.common.KafkaMirroringSink;
@@ -16,136 +18,331 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyList;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
public class KafkaCrossDcConsumerTest {
- private KafkaCrossDcConsumer kafkaCrossDcConsumer;
- private KafkaConsumer<String,MirroredSolrRequest> kafkaConsumerMock;
- private CloudSolrClient solrClientMock;
- private KafkaMirroringSink kafkaMirroringSinkMock;
-
- private SolrMessageProcessor messageProcessorMock;
-
- @Before public void setUp() {
- kafkaConsumerMock = mock(KafkaConsumer.class);
- solrClientMock = mock(CloudSolrClient.class);
- kafkaMirroringSinkMock = mock(KafkaMirroringSink.class);
- messageProcessorMock = mock(SolrMessageProcessor.class);
- KafkaCrossDcConf conf = testCrossDCConf();
- // Set necessary configurations
-
- kafkaCrossDcConsumer = new KafkaCrossDcConsumer(conf, new CountDownLatch(0)) {
- @Override public KafkaConsumer<String,MirroredSolrRequest> createConsumer(Properties properties) {
- return kafkaConsumerMock;
- }
-
- @Override protected CloudSolrClient createSolrClient(KafkaCrossDcConf conf) {
- return solrClientMock;
- }
-
- @Override protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) {
- return kafkaMirroringSinkMock;
- }
- };
- }
-
- private static KafkaCrossDcConf testCrossDCConf() {
- Map config = new HashMap<>();
- config.put(KafkaCrossDcConf.TOPIC_NAME, "topic1");
- config.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "localhost:9092");
- KafkaCrossDcConf conf = new KafkaCrossDcConf(config);
- return conf;
- }
-
- @After public void tearDown() {
- kafkaCrossDcConsumer.shutdown();
- }
-
- @Test public void testRunAndShutdown() throws Exception {
- // Define the expected behavior of the mocks and set up the test scenario
- when(kafkaConsumerMock.poll(any())).thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
-
- ExecutorService consumerThreadExecutor = Executors.newSingleThreadExecutor();
-
- // Run the test
- consumerThreadExecutor.submit(kafkaCrossDcConsumer);
-
- // Run the shutdown method
- kafkaCrossDcConsumer.shutdown();
-
- // Verify that the consumer was subscribed with the correct topic names
- verify(kafkaConsumerMock).subscribe(anyList());
-
- // Verify that the appropriate methods were called on the mocks
- verify(kafkaConsumerMock).wakeup();
- verify(solrClientMock).close();
-
- consumerThreadExecutor.shutdown();
- consumerThreadExecutor.awaitTermination(10, TimeUnit.SECONDS);
- }
-
- @Test public void testHandleFailedResubmit() throws Exception {
- // Set up the KafkaCrossDcConsumer
- KafkaCrossDcConf testConf = testCrossDCConf();
- KafkaCrossDcConsumer consumer = spy(new KafkaCrossDcConsumer(testConf, new CountDownLatch(0)));
- doNothing().when(consumer).sendBatch(any(UpdateRequest.class), any(ConsumerRecord.class), any(KafkaCrossDcConsumer.WorkUnit.class));
-
- // Set up the SolrMessageProcessor mock
- SolrMessageProcessor mockMessageProcessor = mock(SolrMessageProcessor.class);
- IQueueHandler.Result<MirroredSolrRequest> failedResubmitResult = new IQueueHandler.Result<>(IQueueHandler.ResultStatus.FAILED_RESUBMIT, null);
- when(mockMessageProcessor.handleItem(any(MirroredSolrRequest.class))).thenReturn(failedResubmitResult);
-
- // Mock the KafkaMirroringSink
- KafkaMirroringSink mockKafkaMirroringSink = mock(KafkaMirroringSink.class);
- doNothing().when(mockKafkaMirroringSink).submit(any(MirroredSolrRequest.class));
- consumer.kafkaMirroringSink = mockKafkaMirroringSink;
-
- // Call the method to test
- ConsumerRecord<String,MirroredSolrRequest> record = createSampleConsumerRecord();
- consumer.processResult(record, failedResubmitResult);
-
- // Verify that the KafkaMirroringSink.submit() method was called
- verify(consumer.kafkaMirroringSink, times(1)).submit(record.value());
- }
-
- private ConsumerRecord<String,MirroredSolrRequest> createSampleConsumerRecord() {
- return new ConsumerRecord<>("sample-topic", 0, 0, "key", createSampleMirroredSolrRequest());
- }
-
- private ConsumerRecords<String,MirroredSolrRequest> createSampleConsumerRecords() {
- TopicPartition topicPartition = new TopicPartition("sample-topic", 0);
- List<ConsumerRecord<String,MirroredSolrRequest>> recordsList = new ArrayList<>();
- recordsList.add(new ConsumerRecord<>("sample-topic", 0, 0, "key", createSampleMirroredSolrRequest()));
- return new ConsumerRecords<>(Collections.singletonMap(topicPartition, recordsList));
- }
-
- private MirroredSolrRequest createSampleMirroredSolrRequest() {
- // Create a sample MirroredSolrRequest for testing
- SolrInputDocument solrInputDocument = new SolrInputDocument();
- solrInputDocument.addField("id", "1");
- solrInputDocument.addField("title", "Sample title");
- solrInputDocument.addField("content", "Sample content");
- UpdateRequest updateRequest = new UpdateRequest();
- updateRequest.add(solrInputDocument);
- return new MirroredSolrRequest(updateRequest);
- }
+ private KafkaCrossDcConsumer kafkaCrossDcConsumer;
+ private KafkaConsumer<String, MirroredSolrRequest> kafkaConsumerMock;
+ private CloudSolrClient solrClientMock;
+ private KafkaMirroringSink kafkaMirroringSinkMock;
+
+ private SolrMessageProcessor messageProcessorMock;
+
+ private KafkaCrossDcConf conf;
+
+
+ @Before
+ public void setUp() {
+ kafkaConsumerMock = mock(KafkaConsumer.class);
+ solrClientMock = mock(CloudSolrClient.class);
+ kafkaMirroringSinkMock = mock(KafkaMirroringSink.class);
+ messageProcessorMock = mock(SolrMessageProcessor.class);
+ conf = testCrossDCConf();
+ // Set necessary configurations
+
+ kafkaCrossDcConsumer =
+ new KafkaCrossDcConsumer(conf, new CountDownLatch(0)) {
+ @Override
+ public KafkaConsumer<String, MirroredSolrRequest> createKafkaConsumer(
+ Properties properties) {
+ return kafkaConsumerMock;
+ }
+
+ @Override
+ public SolrMessageProcessor createSolrMessageProcessor() {
+ return messageProcessorMock;
+ }
+
+ @Override
+ protected CloudSolrClient createSolrClient(KafkaCrossDcConf conf) {
+ return solrClientMock;
+ }
+
+ @Override
+ protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) {
+ return kafkaMirroringSinkMock;
+ }
+ };
+ }
+
+ private static KafkaCrossDcConf testCrossDCConf() {
+ Map config = new HashMap<>();
+ config.put(KafkaCrossDcConf.TOPIC_NAME, "topic1");
+ config.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "localhost:9092");
+ return new KafkaCrossDcConf(config);
+ }
+
+ @After
+ public void tearDown() {
+ kafkaCrossDcConsumer.shutdown();
+ }
+
+ private ConsumerRecord<String, MirroredSolrRequest> createSampleConsumerRecord() {
+ return new ConsumerRecord<>("sample-topic", 0, 0, "key", createSampleMirroredSolrRequest());
+ }
+
+ private ConsumerRecords<String, MirroredSolrRequest> createSampleConsumerRecords() {
+ TopicPartition topicPartition = new TopicPartition("sample-topic", 0);
+ List<ConsumerRecord<String, MirroredSolrRequest>> recordsList = new ArrayList<>();
+ recordsList.add(
+ new ConsumerRecord<>(
+ "sample-topic", 0, 0, "key", createSampleMirroredSolrRequest()));
+ return new ConsumerRecords<>(Collections.singletonMap(topicPartition, recordsList));
+ }
+
+ private MirroredSolrRequest createSampleMirroredSolrRequest() {
+ // Create a sample MirroredSolrRequest for testing
+ SolrInputDocument solrInputDocument = new SolrInputDocument();
+ solrInputDocument.addField("id", "1");
+ solrInputDocument.addField("title", "Sample title");
+ solrInputDocument.addField("content", "Sample content");
+ UpdateRequest updateRequest = new UpdateRequest();
+ updateRequest.add(solrInputDocument);
+ return new MirroredSolrRequest(updateRequest);
+ }
+
+ /**
+ * Should create a KafkaCrossDcConsumer with the given configuration and startLatch
+ */
+ @Test
+ public void kafkaCrossDcConsumerCreationWithConfigurationAndStartLatch() {
+ KafkaCrossDcConf conf = testCrossDCConf();
+ CountDownLatch startLatch = new CountDownLatch(1);
+
+ KafkaCrossDcConsumer kafkaCrossDcConsumer = new KafkaCrossDcConsumer(conf, startLatch);
+
+ assertNotNull(kafkaCrossDcConsumer);
+ assertEquals(1, startLatch.getCount());
+ }
+
+ @Test
+ public void testRunAndShutdown() throws Exception {
+ // Define the expected behavior of the mocks and set up the test scenario
+
+ // Use a CountDownLatch to wait for the KafkaConsumer.subscribe method to be called
+ CountDownLatch subscribeLatch = new CountDownLatch(1);
+ doAnswer(invocation -> {
+ subscribeLatch.countDown();
+ return null;
+ }).when(kafkaConsumerMock).subscribe(anyList());
+
+ when(kafkaConsumerMock.poll(any())).thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
+
+ ExecutorService consumerThreadExecutor = Executors.newSingleThreadExecutor();
+
+
+ // Run the test
+ consumerThreadExecutor.submit(kafkaCrossDcConsumer);
+
+ // Wait for the KafkaConsumer.subscribe method to be called
+ assertTrue(subscribeLatch.await(10, TimeUnit.SECONDS));
+
+ // Run the shutdown method
+ kafkaCrossDcConsumer.shutdown();
+
+ // Verify that the consumer was subscribed with the correct topic names
+ verify(kafkaConsumerMock).subscribe(anyList());
+
+ // Verify that the appropriate methods were called on the mocks
+ verify(kafkaConsumerMock).wakeup();
+ verify(solrClientMock).close();
+
+ consumerThreadExecutor.shutdown();
+ consumerThreadExecutor.awaitTermination(10, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void testHandleFailedResubmit() throws Exception {
+ // Set up the KafkaCrossDcConsumer
+ KafkaCrossDcConf testConf = testCrossDCConf();
+ KafkaCrossDcConsumer consumer = spy(new KafkaCrossDcConsumer(testConf, new CountDownLatch(0)));
+ doNothing().when(consumer).sendBatch(any(UpdateRequest.class), any(ConsumerRecord.class), any(PartitionManager.WorkUnit.class));
+
+ // Set up the SolrMessageProcessor mock
+ SolrMessageProcessor mockMessageProcessor = mock(SolrMessageProcessor.class);
+ IQueueHandler.Result<MirroredSolrRequest> failedResubmitResult = new IQueueHandler.Result<>(IQueueHandler.ResultStatus.FAILED_RESUBMIT, null);
+ when(mockMessageProcessor.handleItem(any(MirroredSolrRequest.class))).thenReturn(failedResubmitResult);
+
+ // Mock the KafkaMirroringSink
+ KafkaMirroringSink mockKafkaMirroringSink = mock(KafkaMirroringSink.class);
+ doNothing().when(mockKafkaMirroringSink).submit(any(MirroredSolrRequest.class));
+ consumer.kafkaMirroringSink = mockKafkaMirroringSink;
+
+ // Call the method to test
+ ConsumerRecord<String, MirroredSolrRequest> record = createSampleConsumerRecord();
+ consumer.processResult(record, failedResubmitResult);
+
+ // Verify that the KafkaMirroringSink.submit() method was called
+ verify(consumer.kafkaMirroringSink, times(1)).submit(record.value());
+ }
+
+
+ @Test
+ public void testCreateKafkaCrossDcConsumer() {
+ KafkaCrossDcConsumer consumer = new KafkaCrossDcConsumer(conf, new CountDownLatch(1));
+ assertNotNull(consumer);
+ }
+
+ @Test
+ public void testHandleValidMirroredSolrRequest() {
+ KafkaConsumer<String, MirroredSolrRequest> mockConsumer = mock(KafkaConsumer.class);
+ KafkaCrossDcConsumer spyConsumer = spy(new KafkaCrossDcConsumer(conf, new CountDownLatch(1)) {
+ @Override
+ public KafkaConsumer<String, MirroredSolrRequest> createKafkaConsumer(Properties properties) {
+ return mockConsumer;
+ }
+
+ @Override
+ public SolrMessageProcessor createSolrMessageProcessor() {
+ return messageProcessorMock;
+ }
+
+ @Override
+ protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) {
+ return kafkaMirroringSinkMock;
+ }
+ });
+
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", "1");
+ UpdateRequest validRequest = new UpdateRequest();
+ validRequest.add(doc);
+ validRequest.setParams(new ModifiableSolrParams().add("commit", "true"));
+ // Create a valid MirroredSolrRequest
+ ConsumerRecord<String, MirroredSolrRequest> record = new ConsumerRecord<>("test-topic", 0, 0, "key", new MirroredSolrRequest(validRequest));
+ ConsumerRecords<String, MirroredSolrRequest> records = new ConsumerRecords<>(Collections.singletonMap(new TopicPartition("test-topic", 0), List.of(record)));
+
+ when(mockConsumer.poll(any())).thenReturn(records).thenThrow(new WakeupException());
+
+ spyConsumer.run();
+
+ // Verify that the valid MirroredSolrRequest was processed.
+ verify(spyConsumer, times(1)).sendBatch(argThat(updateRequest -> {
+ // Check if the UpdateRequest has the same content as the original validRequest
+ return updateRequest.getDocuments().equals(validRequest.getDocuments()) &&
+ updateRequest.getParams().equals(validRequest.getParams());
+ }), eq(record), any());
+ }
+
+ @Test
+ public void testHandleInvalidMirroredSolrRequest() {
+ KafkaConsumer<String, MirroredSolrRequest> mockConsumer = mock(KafkaConsumer.class);
+ SolrMessageProcessor mockSolrMessageProcessor = mock(SolrMessageProcessor.class);
+ KafkaCrossDcConsumer spyConsumer = spy(new KafkaCrossDcConsumer(conf, new CountDownLatch(1)) {
+ @Override
+ public KafkaConsumer<String, MirroredSolrRequest> createKafkaConsumer(Properties properties) {
+ return mockConsumer;
+ }
+
+ @Override
+ public SolrMessageProcessor createSolrMessageProcessor() {
+ return mockSolrMessageProcessor;
+ }
+
+ @Override
+ protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) {
+ return kafkaMirroringSinkMock;
+ }
+ });
+ doReturn(mockConsumer).when(spyConsumer).createKafkaConsumer(any());
+
+ UpdateRequest invalidRequest = new UpdateRequest();
+ // no updates on request
+ invalidRequest.setParams(new ModifiableSolrParams().add("invalid_param", "invalid_value"));
+
+ ConsumerRecord<String, MirroredSolrRequest> record = new ConsumerRecord<>("test-topic", 0, 0, "key", new MirroredSolrRequest(invalidRequest));
+ ConsumerRecords<String, MirroredSolrRequest> records = new ConsumerRecords<>(Collections.singletonMap(new TopicPartition("test-topic", 0), List.of(record)));
+
+ when(mockConsumer.poll(any())).thenReturn(records).thenThrow(new WakeupException());
+
+ spyConsumer.run();
+
+ // Verify that the valid MirroredSolrRequest was processed.
+ verify(spyConsumer, times(1)).sendBatch(argThat(updateRequest -> {
+ // Check if the UpdateRequest has the same content as the original invalidRequest
+ return updateRequest.getDocuments() == null &&
+ updateRequest.getParams().equals(invalidRequest.getParams());
+ }), eq(record), any());
+ }
+
+ @Test
+ public void testHandleWakeupException() {
+ KafkaConsumer<String, MirroredSolrRequest> mockConsumer = mock(KafkaConsumer.class);
+ KafkaCrossDcConsumer spyConsumer = spy(new KafkaCrossDcConsumer(conf, new CountDownLatch(1)) {
+ @Override
+ public KafkaConsumer<String, MirroredSolrRequest> createKafkaConsumer(Properties properties) {
+ return mockConsumer;
+ }
+
+ @Override
+ public SolrMessageProcessor createSolrMessageProcessor() {
+ return messageProcessorMock;
+ }
+
+ @Override
+ protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) {
+ return kafkaMirroringSinkMock;
+ }
+ });
+
+ when(mockConsumer.poll(any())).thenThrow(new WakeupException());
+
+ // Run the consumer in a separate thread to avoid blocking the test
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ executorService.submit(spyConsumer);
+
+ // Wait for a short period to allow the consumer to start and then trigger the shutdown
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ spyConsumer.shutdown();
+
+ // Verify that the WakeupException was caught and handled
+ verify(mockConsumer, atLeastOnce()).poll(any());
+ verify(mockConsumer, times(1)).wakeup();
+
+ // Shutdown the executor service
+ executorService.shutdown();
+ try {
+ executorService.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Test
+ public void testShutdown() {
+ KafkaConsumer<String, MirroredSolrRequest> mockConsumer = mock(KafkaConsumer.class);
+ KafkaCrossDcConsumer spyConsumer = spy(new KafkaCrossDcConsumer(conf, new CountDownLatch(1)) {
+ @Override
+ public KafkaConsumer<String, MirroredSolrRequest> createKafkaConsumer(Properties properties) {
+ return mockConsumer;
+ }
+
+ @Override
+ public SolrMessageProcessor createSolrMessageProcessor() {
+ return messageProcessorMock;
+ }
+
+ @Override
+ protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) {
+ return kafkaMirroringSinkMock;
+ }
+ });
+
+
+ spyConsumer.shutdown();
+
+ verify(mockConsumer, times(1)).wakeup();
+ }
}
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/PartitionManagerTest.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/PartitionManagerTest.java
new file mode 100644
index 0000000..13472d3
--- /dev/null
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/PartitionManagerTest.java
@@ -0,0 +1,168 @@
+package org.apache.solr.crossdc.consumer;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+public class PartitionManagerTest {
+
+ /**
+ * Should return the existing PartitionWork when the partition is already in the
+ * partitionWorkMap
+ */
+ @Test
+ public void getPartitionWorkWhenPartitionInMap() {
+ KafkaConsumer<String, MirroredSolrRequest> consumer = mock(KafkaConsumer.class);
+ PartitionManager partitionManager = new PartitionManager(consumer);
+ TopicPartition partition = new TopicPartition("test-topic", 0);
+ PartitionManager.PartitionWork partitionWork = new PartitionManager.PartitionWork();
+ partitionManager.partitionWorkMap.put(partition, partitionWork);
+
+ PartitionManager.PartitionWork result = partitionManager.getPartitionWork(partition);
+
+ assertNotNull(result);
+ assertEquals(partitionWork, result);
+ }
+
+ /**
+ * Should create a new PartitionWork when the partition is not in the partitionWorkMap
+ */
+ @Test
+ public void getPartitionWorkWhenPartitionNotInMap() {
+ KafkaConsumer<String, MirroredSolrRequest> consumer = mock(KafkaConsumer.class);
+ PartitionManager partitionManager = new PartitionManager(consumer);
+ TopicPartition partition = new TopicPartition("test-topic", 0);
+
+ PartitionManager.PartitionWork partitionWork = partitionManager.getPartitionWork(partition);
+
+ assertNotNull(partitionWork);
+ assertTrue(partitionManager.partitionWorkMap.containsKey(partition));
+ assertEquals(partitionWork, partitionManager.partitionWorkMap.get(partition));
+ }
+
+ /**
+ * Should not update the offset when the future for update is not done
+ */
+ @Test
+ public void checkForOffsetUpdatesWhenFutureNotDone() throws Throwable {
+ KafkaConsumer<String, MirroredSolrRequest> consumer = mock(KafkaConsumer.class);
+ PartitionManager partitionManager = new PartitionManager(consumer);
+ TopicPartition partition = new TopicPartition("test-topic", 0);
+ PartitionManager.PartitionWork partitionWork = partitionManager.getPartitionWork(partition);
+ PartitionManager.WorkUnit workUnit = new PartitionManager.WorkUnit(partition);
+ Future<?> future = mock(Future.class);
+ when(future.isDone()).thenReturn(false);
+ workUnit.workItems.add(future);
+ partitionWork.partitionQueue.add(workUnit);
+
+ partitionManager.checkForOffsetUpdates(partition);
+
+ assertEquals(1, partitionWork.partitionQueue.size());
+ assertTrue(partitionWork.partitionQueue.contains(workUnit));
+ }
+
+ /**
+ * Should update the offset when the future for update is done
+ */
+ @Test
+ public void checkForOffsetUpdatesWhenFutureDone() throws Throwable {
+ KafkaConsumer<String, MirroredSolrRequest> consumer = mock(KafkaConsumer.class);
+ PartitionManager partitionManager = new PartitionManager(consumer);
+ TopicPartition partition = new TopicPartition("test-topic", 0);
+
+ PartitionManager.PartitionWork partitionWork = partitionManager.getPartitionWork(partition);
+ PartitionManager.WorkUnit workUnit = new PartitionManager.WorkUnit(partition);
+ partitionWork.partitionQueue.add(workUnit);
+
+ // Use a real Future instead of a mocked one
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ Future<?> future = executor.submit(() -> {
+ // Simulate the task being completed
+ });
+
+ workUnit.workItems.add(future);
+
+ // Wait for the Future to completeE
+ future.get(10, TimeUnit.SECONDS);
+
+ partitionManager.checkForOffsetUpdates(partition);
+
+ // Verify that the consumer.commitSync() method was called with the correct parameters
+ verify(consumer, times(1))
+ .commitSync(
+ Collections.singletonMap(
+ partition, new OffsetAndMetadata(workUnit.nextOffset)));
+
+ // Verify that the partitionQueue is empty after processing
+ assertTrue(partitionWork.partitionQueue.isEmpty());
+
+ // Shutdown the executor
+ executor.shutdown();
+ }
+
+ /**
+ * Should check for offset updates for all partitions in the partitionWorkMap
+ */
+ @Test
+ public void checkOffsetUpdatesForAllPartitions() throws Throwable { // Create a mock KafkaConsumer
+ KafkaConsumer<String, MirroredSolrRequest> mockConsumer = mock(KafkaConsumer.class);
+
+ // Create a PartitionManager instance with the mock KafkaConsumer
+ PartitionManager partitionManager = new PartitionManager(mockConsumer);
+
+ // Create a few TopicPartitions
+ TopicPartition partition1 = new TopicPartition("topic1", 0);
+ TopicPartition partition2 = new TopicPartition("topic2", 0);
+
+ // Add some PartitionWork to the partitionWorkMap
+ PartitionManager.PartitionWork work1 = partitionManager.getPartitionWork(partition1);
+ PartitionManager.PartitionWork work2 = partitionManager.getPartitionWork(partition2);
+
+ // Create WorkUnits and add them to the PartitionWork
+ PartitionManager.WorkUnit workUnit1 = new PartitionManager.WorkUnit(partition1);
+ PartitionManager.WorkUnit workUnit2 = new PartitionManager.WorkUnit(partition2);
+
+ work1.partitionQueue.add(workUnit1);
+ work2.partitionQueue.add(workUnit2);
+
+ // Create mock Futures and add them to the WorkUnits
+ Future<?> mockFuture1 = mock(Future.class);
+ Future<?> mockFuture2 = mock(Future.class);
+
+ workUnit1.workItems.add(mockFuture1);
+ workUnit2.workItems.add(mockFuture2);
+
+ // Set the mock Futures to be done
+ when(mockFuture1.isDone()).thenReturn(true);
+ when(mockFuture2.isDone()).thenReturn(true);
+
+ // Call the checkOffsetUpdates method
+ partitionManager.checkOffsetUpdates();
+
+ // Verify that the futures were checked for completion
+ verify(mockFuture1, times(1)).isDone();
+ verify(mockFuture2, times(1)).isDone();
+
+
+ // Verify that the updateOffset method was called for each partition
+ verify(mockConsumer, times(1))
+ .commitSync(
+ Collections.singletonMap(
+ partition1, new OffsetAndMetadata(workUnit1.nextOffset)));
+ verify(mockConsumer, times(1))
+ .commitSync(
+ Collections.singletonMap(
+ partition2, new OffsetAndMetadata(workUnit2.nextOffset)));
+ }
+}
\ No newline at end of file
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessorTest.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessorTest.java
new file mode 100644
index 0000000..99cd3d4
--- /dev/null
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessorTest.java
@@ -0,0 +1,109 @@
+package org.apache.solr.crossdc.messageprocessor;
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.response.SolrResponseBase;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.crossdc.common.IQueueHandler;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+public class SolrMessageProcessorTest {
+ private SolrMessageProcessor solrMessageProcessor;
+ private CloudSolrClient client;
+ private ResubmitBackoffPolicy resubmitBackoffPolicy;
+
+ @Before
+ public void setUp() {
+ client = mock(CloudSolrClient.class);
+ resubmitBackoffPolicy = mock(ResubmitBackoffPolicy.class);
+ solrMessageProcessor = new SolrMessageProcessor(client, resubmitBackoffPolicy);
+ }
+
+ /**
+ * Should handle MirroredSolrRequest and return a failed result with no retry
+ */
+ @Test
+ public void handleItemWithFailedResultNoRetry() throws SolrServerException, IOException {
+ MirroredSolrRequest mirroredSolrRequest = mock(MirroredSolrRequest.class);
+ SolrRequest solrRequest = mock(SolrRequest.class);
+ when(mirroredSolrRequest.getSolrRequest()).thenReturn(solrRequest);
+
+ SolrResponseBase solrResponseBase = mock(SolrResponseBase.class);
+ when(solrRequest.process(client)).thenReturn(solrResponseBase);
+ when(solrResponseBase.getResponse()).thenReturn(new NamedList<>());
+ when(solrResponseBase.getStatus()).thenReturn(ErrorCode.BAD_REQUEST.code);
+ when(client.request(any(SolrRequest.class))).thenReturn(new NamedList<>());
+
+ IQueueHandler.Result<MirroredSolrRequest> result = solrMessageProcessor.handleItem(mirroredSolrRequest);
+
+ assertEquals(IQueueHandler.ResultStatus.FAILED_RESUBMIT, result.status());
+ }
+
+ /**
+ * Should handle MirroredSolrRequest and return a failed result with resubmit
+ */
+ @Test
+ public void handleItemWithFailedResultResubmit() throws SolrServerException, IOException {
+ MirroredSolrRequest mirroredSolrRequest = mock(MirroredSolrRequest.class);
+ SolrRequest solrRequest = mock(SolrRequest.class);
+ when(mirroredSolrRequest.getSolrRequest()).thenReturn(solrRequest);
+ when(solrRequest.process(client))
+ .thenThrow(new SolrException(ErrorCode.SERVER_ERROR, "Server error"));
+
+ IQueueHandler.Result<MirroredSolrRequest> result = solrMessageProcessor.handleItem(mirroredSolrRequest);
+
+ assertEquals(IQueueHandler.ResultStatus.FAILED_RESUBMIT, result.status());
+ assertEquals(mirroredSolrRequest, result.newItem());
+ }
+
+ /**
+ * Should handle MirroredSolrRequest and return a successful result
+ */
+ @Test
+ public void handleItemWithSuccessfulResult() throws SolrServerException, IOException {
+ MirroredSolrRequest mirroredSolrRequest = mock(MirroredSolrRequest.class);
+ SolrRequest solrRequest = mock(SolrRequest.class);
+ SolrResponseBase solrResponse = mock(SolrResponseBase.class);
+
+ when(mirroredSolrRequest.getSolrRequest()).thenReturn(solrRequest);
+ when(solrRequest.process(client)).thenReturn(solrResponse);
+ when(solrResponse.getStatus()).thenReturn(0);
+
+ IQueueHandler.Result<MirroredSolrRequest> result = solrMessageProcessor.handleItem(mirroredSolrRequest);
+
+ assertEquals(IQueueHandler.ResultStatus.HANDLED, result.status());
+ assertNull(result.newItem());
+ }
+
+ /**
+ * Should connect to Solr if not connected and process the request
+ */
+ @Test
+ public void connectToSolrIfNeededAndProcessRequest() throws SolrServerException, IOException {
+ MirroredSolrRequest mirroredSolrRequest = mock(MirroredSolrRequest.class);
+ SolrRequest solrRequest = mock(SolrRequest.class);
+ SolrResponseBase solrResponse = mock(SolrResponseBase.class);
+
+ when(mirroredSolrRequest.getSolrRequest()).thenReturn(solrRequest);
+ when(solrRequest.process(client)).thenReturn(solrResponse);
+ when(solrResponse.getStatus()).thenReturn(0);
+
+ IQueueHandler.Result<MirroredSolrRequest> result = solrMessageProcessor.handleItem(mirroredSolrRequest);
+
+ assertEquals(IQueueHandler.ResultStatus.HANDLED, result.status());
+ verify(client, times(1)).connect();
+ verify(solrRequest, times(1)).process(client);
+ }
+}
\ No newline at end of file
diff --git a/crossdc-producer/build.gradle b/crossdc-producer/build.gradle
index 9a6e5cd..f4aec1d 100644
--- a/crossdc-producer/build.gradle
+++ b/crossdc-producer/build.gradle
@@ -40,10 +40,10 @@ dependencies {
provided group: 'org.apache.solr', name: 'solr-core', version: '8.11.2'
- testImplementation 'org.slf4j:slf4j-api'
+ testImplementation 'org.slf4j:slf4j-api:2.0.5'
testImplementation 'org.hamcrest:hamcrest:2.2'
testImplementation 'junit:junit:4.13.2'
- testImplementation('org.mockito:mockito-inline:4.3.1')
+ testImplementation('org.mockito:mockito-inline:5.2.0')
testImplementation group: 'org.apache.solr', name: 'solr-core', version: '8.11.2'
testImplementation group: 'org.apache.solr', name: 'solr-test-framework', version: '8.11.2'
@@ -65,9 +65,11 @@ shadowJar {
jar.dependsOn(shadowJar)
artifacts {
- shadowJar;
+ shadowJar
}
test {
jvmArgs '-Djava.security.egd=file:/dev/./urandom'
-}
+ minHeapSize = "128m"
+ maxHeapSize = "512m"
+}
\ No newline at end of file
diff --git a/crossdc-producer/gradle.properties b/crossdc-producer/gradle.properties
index 66975c6..21aee2d 100644
--- a/crossdc-producer/gradle.properties
+++ b/crossdc-producer/gradle.properties
@@ -1,2 +1,2 @@
group=org.apache.solr
-version=0.9-SNAPSHOT
\ No newline at end of file
+version=1.0-SNAPSHOT
\ No newline at end of file
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
index cf25ebe..d57b547 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
@@ -16,10 +16,7 @@ import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.consumer.Consumer;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,7 +28,6 @@ import java.util.Properties;
@ThreadLeakFilters(defaultFilters = true, filters = { SolrIgnoredThreadsFilter.class,
QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class })
-@Ignore
@ThreadLeakLingering(linger = 5000) public class DeleteByQueryToIdTest extends
SolrTestCaseJ4 {
@@ -136,6 +132,11 @@ import java.util.Properties;
solrCluster2.getZkServer().getZkClient().printLayoutToStdOut();
solrCluster2.shutdown();
}
+
+ solrCluster1 = null;
+ solrCluster2 = null;
+ kafkaCluster = null;
+ consumer = null;
}
@After
@@ -147,6 +148,7 @@ import java.util.Properties;
solrCluster2.getSolrClient().commit();
}
+ @Test
public void testDBQ() throws Exception {
CloudSolrClient client = solrCluster1.getSolrClient();
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
index 8cbdae2..6d2d543 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
@@ -161,6 +161,13 @@ import java.util.Properties;
if (zkTestServer2 != null) {
zkTestServer2.shutdown();
}
+
+ consumer = null;
+ solrCluster1 = null;
+ solrCluster2 = null;
+ kafkaCluster = null;
+ zkTestServer1 = null;
+ zkTestServer2 = null;
}
@After
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
index c3bb2e5..5c19268 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
@@ -57,12 +57,12 @@ import static org.mockito.Mockito.spy;
static final String VERSION_FIELD = "_version_";
private static final int NUM_BROKERS = 1;
- public static EmbeddedKafkaCluster kafkaCluster;
+ public EmbeddedKafkaCluster kafkaCluster;
- protected static volatile MiniSolrCloudCluster solrCluster1;
- protected static volatile MiniSolrCloudCluster solrCluster2;
+ protected volatile MiniSolrCloudCluster solrCluster1;
+ protected volatile MiniSolrCloudCluster solrCluster2;
- protected static volatile Consumer consumer;
+ protected volatile Consumer consumer;
private static String TOPIC = "topic1";
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaMultiCollectionIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaMultiCollectionIntegrationTest.java
index 8027aae..815a5c2 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaMultiCollectionIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaMultiCollectionIntegrationTest.java
@@ -18,6 +18,7 @@ import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.consumer.Consumer;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
index 927c5ab..c44f14d 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
@@ -124,6 +124,11 @@ import java.util.*;
log.error("Exception stopping Kafka cluster", e);
}
+ solrCluster1 = null;
+ solrCluster2 = null;
+ kafkaCluster = null;
+ consumer = null;
+
}
@After
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
index 991f67e..cf82cb7 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
@@ -153,6 +153,11 @@ import java.util.Properties;
log.error("Exception stopping Kafka cluster", e);
}
+ solrCluster1 = null;
+ solrCluster2 = null;
+ kafkaCluster = null;
+ consumer1 = null;
+ consumer2 = null;
}
@After
diff --git a/crossdc-producer/src/test/java/org/apache/solr/update/processor/MirroringUpdateProcessorTest.java b/crossdc-producer/src/test/java/org/apache/solr/update/processor/MirroringUpdateProcessorTest.java
index d21e410..bd23393 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/update/processor/MirroringUpdateProcessorTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/update/processor/MirroringUpdateProcessorTest.java
@@ -8,26 +8,16 @@ import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.DocRouter;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.*;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
-import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrQueryRequestBase;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
-import org.apache.solr.update.processor.DistributedUpdateProcessor;
-import org.apache.solr.update.processor.MirroringUpdateProcessor;
-import org.apache.solr.update.processor.RequestMirroringHandler;
-import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@@ -36,142 +26,212 @@ import java.io.IOException;
public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
- private UpdateRequestProcessor next;
- private MirroringUpdateProcessor processor;
- private RequestMirroringHandler requestMirroringHandler;
- private AddUpdateCommand addUpdateCommand;
- private DeleteUpdateCommand deleteUpdateCommand;
- private SolrQueryRequestBase req;
- UpdateRequest requestMock;
- private UpdateRequestProcessor nextProcessor;
- private SolrCore core;
- private HttpSolrClient.Builder builder = Mockito.mock(HttpSolrClient.Builder.class);
- private HttpSolrClient client = Mockito.mock(HttpSolrClient.class);
- private CloudDescriptor cloudDesc;
-
- @Before public void setUp() throws Exception {
- super.setUp();
- addUpdateCommand = new AddUpdateCommand(req);
- addUpdateCommand.solrDoc = new SolrInputDocument();
- addUpdateCommand.solrDoc.addField("id", "test");
- req = Mockito.mock(SolrQueryRequestBase.class);
- Mockito.when(req.getParams()).thenReturn(new ModifiableSolrParams());
-
- requestMock = Mockito.mock(UpdateRequest.class);
- addUpdateCommand.setReq(req);
-
- nextProcessor = Mockito.mock(UpdateRequestProcessor.class);
-
- IndexSchema schema = Mockito.mock(IndexSchema.class);
- Mockito.when(req.getSchema()).thenReturn(schema);
-
- deleteUpdateCommand = new DeleteUpdateCommand(req);
- deleteUpdateCommand.query = "*:*";
-
- next = Mockito.mock(UpdateRequestProcessor.class);
- requestMirroringHandler = Mockito.mock(RequestMirroringHandler.class);
- processor = new MirroringUpdateProcessor(next, true, true, 1000L, new ModifiableSolrParams(), DistributedUpdateProcessor.DistribPhase.NONE,
- requestMirroringHandler) {
- UpdateRequest createMirrorRequest() {
- return requestMock;
- }
- };
-
- core = Mockito.mock(SolrCore.class);
- CoreDescriptor coreDesc = Mockito.mock(CoreDescriptor.class);
- cloudDesc = Mockito.mock(CloudDescriptor.class);
- CoreContainer coreContainer = Mockito.mock(CoreContainer.class);
- ZkController zkController = Mockito.mock(ZkController.class);
- ClusterState clusterState = Mockito.mock(ClusterState.class);
- DocCollection docCollection = Mockito.mock(DocCollection.class);
- DocRouter docRouter = Mockito.mock(DocRouter.class);
- Slice slice = Mockito.mock(Slice.class);
- ZkStateReader zkStateReader = Mockito.mock(ZkStateReader.class);
- Replica replica = Mockito.mock(Replica.class);
-
- Mockito.when(replica.getName()).thenReturn("replica1");
- Mockito.when(zkStateReader.getLeaderRetry(Mockito.any(), Mockito.any())).thenReturn(replica);
- Mockito.when(zkController.getZkStateReader()).thenReturn(zkStateReader);
- Mockito.when(coreDesc.getCloudDescriptor()).thenReturn(cloudDesc);
- Mockito.when(clusterState.getCollection(Mockito.any())).thenReturn(docCollection);
- Mockito.when(docCollection.getRouter()).thenReturn(docRouter);
- Mockito.when(docRouter.getTargetSlice(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(slice);
- Mockito.when(zkController.getClusterState()).thenReturn(clusterState);
- Mockito.when(coreContainer.getZkController()).thenReturn(zkController);
- Mockito.when(core.getCoreContainer()).thenReturn(coreContainer);
- Mockito.when(core.getCoreDescriptor()).thenReturn(coreDesc);
- Mockito.when(req.getCore()).thenReturn(core);
- }
-
- @Test public void testProcessAddWithinLimit() throws Exception {
- Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
- SolrInputDocument doc = new SolrInputDocument();
- doc.addField("id", "1");
- AddUpdateCommand cmd = new AddUpdateCommand(req);
- cmd.solrDoc = doc;
- cmd.commitWithin = 1000;
- cmd.overwrite = true;
- processor.processAdd(cmd);
- Mockito.verify(next).processAdd(cmd);
- Mockito.verify(requestMirroringHandler).mirror(requestMock);
- }
-
- @Test public void testProcessAddExceedsLimit() {
- AddUpdateCommand addUpdateCommand = new AddUpdateCommand(req);
- SolrInputDocument solrInputDocument = new SolrInputDocument();
- solrInputDocument.addField("id", "123");
- solrInputDocument.addField("large_field", "Test ".repeat(10000));
- addUpdateCommand.solrDoc = solrInputDocument;
-
- Mockito.when(req.getCore()).thenReturn(core);
- Mockito.when(req.getCore().getCoreDescriptor()).thenReturn(Mockito.mock(CoreDescriptor.class));
- Mockito.when(req.getCore().getCoreDescriptor().getCloudDescriptor()).thenReturn(Mockito.mock(CloudDescriptor.class));
- Mockito.when(req.getCore().getCoreContainer()).thenReturn(Mockito.mock(CoreContainer.class));
- Mockito.when(req.getCore().getCoreContainer().getZkController()).thenReturn(Mockito.mock(ZkController.class));
- Mockito.when(req.getCore().getCoreContainer().getZkController().getClusterState()).thenReturn(Mockito.mock(ClusterState.class));
-
- SolrParams mirrorParams = new ModifiableSolrParams();
- MirroringUpdateProcessor mirroringUpdateProcessorWithLimit = new MirroringUpdateProcessor(nextProcessor, true, false, // indexUnmirrorableDocs set to false
- 50000, mirrorParams, DistributedUpdateProcessor.DistribPhase.NONE, requestMirroringHandler);
-
- assertThrows(SolrException.class, () -> mirroringUpdateProcessorWithLimit.processAdd(addUpdateCommand));
- }
-
- @Test public void testProcessAddLeader() throws Exception {
- Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
- processor.processAdd(addUpdateCommand);
- Mockito.verify(requestMirroringHandler, Mockito.times(1)).mirror(Mockito.any());
- }
-
- @Test public void testProcessAddNotLeader() throws Exception {
- Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica2");
- processor.processAdd(addUpdateCommand);
- Mockito.verify(requestMirroringHandler, Mockito.times(0)).mirror(Mockito.any());
- }
-
- @Test public void testProcessDelete() throws Exception {
- Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
- processor.processDelete(deleteUpdateCommand);
- Mockito.verify(requestMirroringHandler, Mockito.times(1)).mirror(Mockito.any());
- }
-
- @Test public void testProcessDBQResults() throws Exception {
- Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
- Mockito.when(builder.build()).thenReturn(client);
- SolrInputDocument doc = new SolrInputDocument();
- doc.addField("id", "test");
- addUpdateCommand.solrDoc = doc;
- processor.processAdd(addUpdateCommand);
-
- SolrQuery query = new SolrQuery();
- query.setQuery("*:*");
- query.setRows(1000);
- query.setSort("id", SolrQuery.ORDER.asc);
-
- processor.processDelete(deleteUpdateCommand);
- }
-
- @Test public void testFinish() throws IOException {
- processor.finish();
- }
+ private UpdateRequestProcessor next;
+ private MirroringUpdateProcessor processor;
+ private RequestMirroringHandler requestMirroringHandler;
+ private AddUpdateCommand addUpdateCommand;
+ private DeleteUpdateCommand deleteUpdateCommand;
+ private SolrQueryRequestBase req;
+ UpdateRequest requestMock;
+ private UpdateRequestProcessor nextProcessor;
+ private SolrCore core;
+ private HttpSolrClient.Builder builder = Mockito.mock(HttpSolrClient.Builder.class);
+ private HttpSolrClient client = Mockito.mock(HttpSolrClient.class);
+ private CloudDescriptor cloudDesc;
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ addUpdateCommand = new AddUpdateCommand(req);
+ addUpdateCommand.solrDoc = new SolrInputDocument();
+ addUpdateCommand.solrDoc.addField("id", "test");
+ req = Mockito.mock(SolrQueryRequestBase.class);
+ Mockito.when(req.getParams()).thenReturn(new ModifiableSolrParams());
+
+ requestMock = Mockito.mock(UpdateRequest.class);
+ addUpdateCommand.setReq(req);
+
+ nextProcessor = Mockito.mock(UpdateRequestProcessor.class);
+
+ IndexSchema schema = Mockito.mock(IndexSchema.class);
+ Mockito.when(req.getSchema()).thenReturn(schema);
+
+ deleteUpdateCommand = new DeleteUpdateCommand(req);
+ deleteUpdateCommand.query = "*:*";
+
+ next = Mockito.mock(UpdateRequestProcessor.class);
+ requestMirroringHandler = Mockito.mock(RequestMirroringHandler.class);
+ processor =
+ new MirroringUpdateProcessor(
+ next,
+ true,
+ true,
+ 1000L,
+ new ModifiableSolrParams(),
+ DistributedUpdateProcessor.DistribPhase.NONE,
+ requestMirroringHandler) {
+ UpdateRequest createMirrorRequest() {
+ return requestMock;
+ }
+ };
+
+ core = Mockito.mock(SolrCore.class);
+ CoreDescriptor coreDesc = Mockito.mock(CoreDescriptor.class);
+ cloudDesc = Mockito.mock(CloudDescriptor.class);
+ CoreContainer coreContainer = Mockito.mock(CoreContainer.class);
+ ZkController zkController = Mockito.mock(ZkController.class);
+ ClusterState clusterState = Mockito.mock(ClusterState.class);
+ DocCollection docCollection = Mockito.mock(DocCollection.class);
+ DocRouter docRouter = Mockito.mock(DocRouter.class);
+ Slice slice = Mockito.mock(Slice.class);
+ ZkStateReader zkStateReader = Mockito.mock(ZkStateReader.class);
+ Replica replica = Mockito.mock(Replica.class);
+
+ Mockito.when(replica.getName()).thenReturn("replica1");
+ Mockito.when(zkStateReader.getLeaderRetry(Mockito.any(), Mockito.any()))
+ .thenReturn(replica);
+ Mockito.when(zkController.getZkStateReader()).thenReturn(zkStateReader);
+ Mockito.when(coreDesc.getCloudDescriptor()).thenReturn(cloudDesc);
+ Mockito.when(clusterState.getCollection(Mockito.any())).thenReturn(docCollection);
+ Mockito.when(docCollection.getRouter()).thenReturn(docRouter);
+ Mockito.when(
+ docRouter.getTargetSlice(
+ Mockito.any(),
+ Mockito.any(),
+ Mockito.any(),
+ Mockito.any(),
+ Mockito.any()))
+ .thenReturn(slice);
+ Mockito.when(zkController.getClusterState()).thenReturn(clusterState);
+ Mockito.when(coreContainer.getZkController()).thenReturn(zkController);
+ Mockito.when(core.getCoreContainer()).thenReturn(coreContainer);
+ Mockito.when(core.getCoreDescriptor()).thenReturn(coreDesc);
+ Mockito.when(req.getCore()).thenReturn(core);
+ }
+
+ /**
+ * Should process delete command and mirror the document when the distribPhase is NONE and
+ * deleteById is false
+ */
+ @Test
+ public void processDeleteWhenDistribPhaseIsNoneAndDeleteByIdIsFalse() {
+ try {
+ processor.processDelete(deleteUpdateCommand);
+ Mockito.verify(requestMirroringHandler, Mockito.times(1)).mirror(requestMock);
+ } catch (Exception e) {
+ fail("IOException should not be thrown");
+ }
+ }
+
+ /**
+ * Should process add command and mirror the document when the document size is within the limit
+ * and the node is a leader
+ */
+ @Test
+ public void processAddWhenDocSizeWithinLimitAndNodeIsLeader() {
+ try {
+ Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+ processor.processAdd(addUpdateCommand);
+ Mockito.verify(requestMirroringHandler, Mockito.times(1)).mirror(requestMock);
+ } catch (IOException e) {
+ fail("IOException should not be thrown");
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Should process delete command and mirror the document when the node is a leader and
+ * deleteById is true
+ */
+ @Test
+ public void processDeleteWhenNodeIsLeaderAndDeleteByIdIsTrue() {
+ try {
+ Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+ deleteUpdateCommand.setId("test");
+ processor.processDelete(deleteUpdateCommand);
+ Mockito.verify(requestMirroringHandler, Mockito.times(1)).mirror(requestMock);
+ } catch (Exception e) {
+ fail("IOException should not be thrown");
+ }
+ }
+
+ @Test
+ public void testProcessAddWithinLimit() throws Exception {
+ Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", "1");
+ AddUpdateCommand cmd = new AddUpdateCommand(req);
+ cmd.solrDoc = doc;
+ cmd.commitWithin = 1000;
+ cmd.overwrite = true;
+ processor.processAdd(cmd);
+ Mockito.verify(next).processAdd(cmd);
+ Mockito.verify(requestMirroringHandler).mirror(requestMock);
+ }
+
+ @Test
+ public void testProcessAddExceedsLimit() {
+ AddUpdateCommand addUpdateCommand = new AddUpdateCommand(req);
+ SolrInputDocument solrInputDocument = new SolrInputDocument();
+ solrInputDocument.addField("id", "123");
+ solrInputDocument.addField("large_field", "Test ".repeat(10000));
+ addUpdateCommand.solrDoc = solrInputDocument;
+
+ Mockito.when(req.getCore()).thenReturn(core);
+ Mockito.when(req.getCore().getCoreDescriptor()).thenReturn(Mockito.mock(CoreDescriptor.class));
+ Mockito.when(req.getCore().getCoreDescriptor().getCloudDescriptor()).thenReturn(Mockito.mock(CloudDescriptor.class));
+ Mockito.when(req.getCore().getCoreContainer()).thenReturn(Mockito.mock(CoreContainer.class));
+ Mockito.when(req.getCore().getCoreContainer().getZkController()).thenReturn(Mockito.mock(ZkController.class));
+ Mockito.when(req.getCore().getCoreContainer().getZkController().getClusterState()).thenReturn(Mockito.mock(ClusterState.class));
+
+ SolrParams mirrorParams = new ModifiableSolrParams();
+ MirroringUpdateProcessor mirroringUpdateProcessorWithLimit = new MirroringUpdateProcessor(nextProcessor, true, false, // indexUnmirrorableDocs set to false
+ 50000, mirrorParams, DistributedUpdateProcessor.DistribPhase.NONE, requestMirroringHandler);
+
+ assertThrows(SolrException.class, () -> mirroringUpdateProcessorWithLimit.processAdd(addUpdateCommand));
+ }
+
+ @Test
+ public void testProcessAddLeader() throws Exception {
+ Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+ processor.processAdd(addUpdateCommand);
+ Mockito.verify(requestMirroringHandler, Mockito.times(1)).mirror(Mockito.any());
+ }
+
+ @Test
+ public void testProcessAddNotLeader() throws Exception {
+ Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica2");
+ processor.processAdd(addUpdateCommand);
+ Mockito.verify(requestMirroringHandler, Mockito.times(0)).mirror(Mockito.any());
+ }
+
+ @Test
+ public void testProcessDelete() throws Exception {
+ Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+ processor.processDelete(deleteUpdateCommand);
+ Mockito.verify(requestMirroringHandler, Mockito.times(1)).mirror(Mockito.any());
+ }
+
+ @Test
+ public void testProcessDBQResults() throws Exception {
+ Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+ Mockito.when(builder.build()).thenReturn(client);
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", "test");
+ addUpdateCommand.solrDoc = doc;
+ processor.processAdd(addUpdateCommand);
+
+ SolrQuery query = new SolrQuery();
+ query.setQuery("*:*");
+ query.setRows(1000);
+ query.setSort("id", SolrQuery.ORDER.asc);
+
+ processor.processDelete(deleteUpdateCommand);
+ }
+
+ @Test
+ public void testFinish() throws IOException {
+ processor.finish();
+ }
}
\ No newline at end of file
diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar
index e708b1c..943f0cb 100644
Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index aa991fc..f398c33 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,5 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-7.4.2-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-7.6-bin.zip
+networkTimeout=10000
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
diff --git a/gradlew b/gradlew
index 81a9eed..65dcd68 100644
--- a/gradlew
+++ b/gradlew
@@ -1,7 +1,7 @@
-#!/usr/bin/env sh
+#!/bin/sh
#
-# Copyright 2015 the original author or authors.
+# Copyright © 2015-2021 the original authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -17,76 +17,113 @@
#
##############################################################################
-##
-## Gradle start up script for UN*X
-##
+#
+# Gradle start up script for POSIX generated by Gradle.
+#
+# Important for running:
+#
+# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is
+# noncompliant, but you have some other compliant shell such as ksh or
+# bash, then to run this script, type that shell name before the whole
+# command line, like:
+#
+# ksh Gradle
+#
+# Busybox and similar reduced shells will NOT work, because this script
+# requires all of these POSIX shell features:
+# * functions;
+# * expansions «$var», «${var}», «${var:-default}», «${var+SET}»,
+# «${var#prefix}», «${var%suffix}», and «$( cmd )»;
+# * compound commands having a testable exit status, especially «case»;
+# * various built-in commands including «command», «set», and «ulimit».
+#
+# Important for patching:
+#
+# (2) This script targets any POSIX shell, so it avoids extensions provided
+# by Bash, Ksh, etc; in particular arrays are avoided.
+#
+# The "traditional" practice of packing multiple parameters into a
+# space-separated string is a well documented source of bugs and security
+# problems, so this is (mostly) avoided, by progressively accumulating
+# options in "$@", and eventually passing that to Java.
+#
+# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS,
+# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly;
+# see the in-line comments for details.
+#
+# There are tweaks for specific operating systems such as AIX, CygWin,
+# Darwin, MinGW, and NonStop.
+#
+# (3) This script is generated from the Groovy template
+# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
+# within the Gradle project.
+#
+# You can find Gradle at https://github.com/gradle/gradle/.
+#
##############################################################################
# Attempt to set APP_HOME
+
# Resolve links: $0 may be a link
-PRG="$0"
-# Need this for relative symlinks.
-while [ -h "$PRG" ] ; do
- ls=`ls -ld "$PRG"`
- link=`expr "$ls" : '.*-> \(.*\)$'`
- if expr "$link" : '/.*' > /dev/null; then
- PRG="$link"
- else
- PRG=`dirname "$PRG"`"/$link"
- fi
+app_path=$0
+
+# Need this for daisy-chained symlinks.
+while
+ APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path
+ [ -h "$app_path" ]
+do
+ ls=$( ls -ld "$app_path" )
+ link=${ls#*' -> '}
+ case $link in #(
+ /*) app_path=$link ;; #(
+ *) app_path=$APP_HOME$link ;;
+ esac
done
-SAVED="`pwd`"
-cd "`dirname \"$PRG\"`/" >/dev/null
-APP_HOME="`pwd -P`"
-cd "$SAVED" >/dev/null
-APP_NAME="Gradle"
-APP_BASE_NAME=`basename "$0"`
+# This is normally unused
+# shellcheck disable=SC2034
+APP_BASE_NAME=${0##*/}
+APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
# Use the maximum available, or set MAX_FD != -1 to use that value.
-MAX_FD="maximum"
+MAX_FD=maximum
warn () {
echo "$*"
-}
+} >&2
die () {
echo
echo "$*"
echo
exit 1
-}
+} >&2
# OS specific support (must be 'true' or 'false').
cygwin=false
msys=false
darwin=false
nonstop=false
-case "`uname`" in
- CYGWIN* )
- cygwin=true
- ;;
- Darwin* )
- darwin=true
- ;;
- MINGW* )
- msys=true
- ;;
- NONSTOP* )
- nonstop=true
- ;;
+case "$( uname )" in #(
+ CYGWIN* ) cygwin=true ;; #(
+ Darwin* ) darwin=true ;; #(
+ MSYS* | MINGW* ) msys=true ;; #(
+ NONSTOP* ) nonstop=true ;;
esac
+CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
+
+
# Determine the Java command to use to start the JVM.
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
# IBM's JDK on AIX uses strange locations for the executables
- JAVACMD="$JAVA_HOME/jre/sh/java"
+ JAVACMD=$JAVA_HOME/jre/sh/java
else
- JAVACMD="$JAVA_HOME/bin/java"
+ JAVACMD=$JAVA_HOME/bin/java
fi
if [ ! -x "$JAVACMD" ] ; then
die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
@@ -95,112 +132,113 @@ Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
else
- JAVACMD="java"
+ JAVACMD=java
which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
-# LUCENE-9471: workaround for gradle leaving junk temp. files behind.
-GRADLE_TEMPDIR="$APP_HOME/.gradle/tmp"
-mkdir -p "$GRADLE_TEMPDIR"
-if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then
- GRADLE_TEMPDIR=`cygpath --path --mixed "$GRADLE_TEMPDIR"`
-fi
-DEFAULT_JVM_OPTS="$DEFAULT_JVM_OPTS \"-Djava.io.tmpdir=$GRADLE_TEMPDIR\""
-
-# Set APP_HOME and GRADLE_WRAPPER_JAR. Unlike lucene-solr, we aren't downloading and verifying the jar if it's not present
-if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then
- APP_HOME=`cygpath --path --mixed "$APP_HOME"`
-fi
-GRADLE_WRAPPER_JAR="$APP_HOME/gradle/wrapper/gradle-wrapper.jar"
-
-CLASSPATH=$GRADLE_WRAPPER_JAR
-
-# Don't fork a daemon mode on initial run that generates local defaults.
-GRADLE_DAEMON_CTRL=
-if [ ! -e "$APP_HOME/gradle.properties" ]; then
- GRADLE_DAEMON_CTRL=--no-daemon
-fi
-
# Increase the maximum file descriptors if we can.
-if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then
- MAX_FD_LIMIT=`ulimit -H -n`
- if [ $? -eq 0 ] ; then
- if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then
- MAX_FD="$MAX_FD_LIMIT"
- fi
- ulimit -n $MAX_FD
- if [ $? -ne 0 ] ; then
- warn "Could not set maximum file descriptor limit: $MAX_FD"
- fi
- else
- warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT"
- fi
+if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then
+ case $MAX_FD in #(
+ max*)
+ # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked.
+ # shellcheck disable=SC3045
+ MAX_FD=$( ulimit -H -n ) ||
+ warn "Could not query maximum file descriptor limit"
+ esac
+ case $MAX_FD in #(
+ '' | soft) :;; #(
+ *)
+ # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked.
+ # shellcheck disable=SC3045
+ ulimit -n "$MAX_FD" ||
+ warn "Could not set maximum file descriptor limit to $MAX_FD"
+ esac
fi
-# For Darwin, add options to specify how the application appears in the dock
-if $darwin; then
- GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
-fi
+# Collect all arguments for the java command, stacking in reverse order:
+# * args from the command line
+# * the main class name
+# * -classpath
+# * -D...appname settings
+# * --module-path (only if needed)
+# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables.
# For Cygwin or MSYS, switch paths to Windows format before running java
-if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then
- JAVACMD=`cygpath --unix "$JAVACMD"`
-
- # We build the pattern for arguments to be converted via cygpath
- ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`
- SEP=""
- for dir in $ROOTDIRSRAW ; do
- ROOTDIRS="$ROOTDIRS$SEP$dir"
- SEP="|"
- done
- OURCYGPATTERN="(^($ROOTDIRS))"
- # Add a user-defined pattern to the cygpath arguments
- if [ "$GRADLE_CYGPATTERN" != "" ] ; then
- OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)"
- fi
+if "$cygwin" || "$msys" ; then
+ APP_HOME=$( cygpath --path --mixed "$APP_HOME" )
+ CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" )
+
+ JAVACMD=$( cygpath --unix "$JAVACMD" )
+
# Now convert the arguments - kludge to limit ourselves to /bin/sh
- i=0
- for arg in "$@" ; do
- CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -`
- CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option
-
- if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition
- eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"`
- else
- eval `echo args$i`="\"$arg\""
+ for arg do
+ if
+ case $arg in #(
+ -*) false ;; # don't mess with options #(
+ /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath
+ [ -e "$t" ] ;; #(
+ *) false ;;
+ esac
+ then
+ arg=$( cygpath --path --ignore --mixed "$arg" )
fi
- i=$((i+1))
+ # Roll the args list around exactly as many times as the number of
+ # args, so each arg winds up back in the position where it started, but
+ # possibly modified.
+ #
+ # NB: a `for` loop captures its iteration list before it begins, so
+ # changing the positional parameters here affects neither the number of
+ # iterations, nor the values presented in `arg`.
+ shift # remove old arg
+ set -- "$@" "$arg" # push replacement arg
done
- case $i in
- (0) set -- ;;
- (1) set -- "$args0" ;;
- (2) set -- "$args0" "$args1" ;;
- (3) set -- "$args0" "$args1" "$args2" ;;
- (4) set -- "$args0" "$args1" "$args2" "$args3" ;;
- (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;;
- (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;;
- (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;;
- (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;;
- (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;;
- esac
fi
-# Escape application args
-save () {
- for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
- echo " "
-}
-APP_ARGS=$(save "$@")
+# Collect all arguments for the java command;
+# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of
+# shell script including quotes and variable substitutions, so put them in
+# double quotes to make sure that they get re-expanded; and
+# * put everything else in single quotes, so that it's not re-expanded.
+
+set -- \
+ "-Dorg.gradle.appname=$APP_BASE_NAME" \
+ -classpath "$CLASSPATH" \
+ org.gradle.wrapper.GradleWrapperMain \
+ "$@"
+
+# Stop when "xargs" is not available.
+if ! command -v xargs >/dev/null 2>&1
+then
+ die "xargs is not available"
+fi
-# Collect all arguments for the java command, following the shell quoting and substitution rules
-eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain $GRADLE_DAEMON_CTRL "$APP_ARGS"
+# Use "xargs" to parse quoted args.
+#
+# With -n1 it outputs one arg per line, with the quotes and backslashes removed.
+#
+# In Bash we could simply go:
+#
+# readarray ARGS < <( xargs -n1 <<<"$var" ) &&
+# set -- "${ARGS[@]}" "$@"
+#
+# but POSIX shell has neither arrays nor command substitution, so instead we
+# post-process each arg (as a line of input to sed) to backslash-escape any
+# character that might be a shell metacharacter, then use eval to reverse
+# that process (while maintaining the separation between arguments), and wrap
+# the whole thing up as a single "set" statement.
+#
+# This will of course break if any of these variables contains a newline or
+# an unmatched quote.
+#
-# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong
-if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then
- cd "$(dirname "$0")"
-fi
+eval "set -- $(
+ printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" |
+ xargs -n1 |
+ sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' |
+ tr '\n' ' '
+ )" '"$@"'
exec "$JAVACMD" "$@"
diff --git a/gradlew.bat b/gradlew.bat
index 8007957..93e3f59 100644
--- a/gradlew.bat
+++ b/gradlew.bat
@@ -14,7 +14,7 @@
@rem limitations under the License.
@rem
-@if "%DEBUG%" == "" @echo off
+@if "%DEBUG%"=="" @echo off
@rem ##########################################################################
@rem
@rem Gradle startup script for Windows
@@ -25,24 +25,23 @@
if "%OS%"=="Windows_NT" setlocal
set DIRNAME=%~dp0
-if "%DIRNAME%" == "" set DIRNAME=.
+if "%DIRNAME%"=="" set DIRNAME=.
+@rem This is normally unused
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%
+@rem Resolve any "." and ".." in APP_HOME to make it shorter.
+for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi
+
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"
-@rem LUCENE-9471: workaround for gradle leaving junk temp. files behind.
-SET GRADLE_TEMPDIR=%DIRNAME%\.gradle\tmp
-IF NOT EXIST "%GRADLE_TEMPDIR%" MKDIR "%GRADLE_TEMPDIR%"
-SET DEFAULT_JVM_OPTS=%DEFAULT_JVM_OPTS% "-Djava.io.tmpdir=%GRADLE_TEMPDIR%"
-
@rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome
set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1
-if "%ERRORLEVEL%" == "0" goto init
+if %ERRORLEVEL% equ 0 goto execute
echo.
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
@@ -56,7 +55,7 @@ goto fail
set JAVA_HOME=%JAVA_HOME:"=%
set JAVA_EXE=%JAVA_HOME%/bin/java.exe
-if exist "%JAVA_EXE%" goto init
+if exist "%JAVA_EXE%" goto execute
echo.
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
@@ -66,42 +65,26 @@ echo location of your Java installation.
goto fail
-:init
-@rem Get command-line arguments, handling Windows variants
-
-if not "%OS%" == "Windows_NT" goto win9xME_args
-
-:win9xME_args
-@rem Slurp the command line arguments.
-set CMD_LINE_ARGS=
-set _SKIP=2
-
-:win9xME_args_slurp
-if "x%~1" == "x" goto execute
-
-set CMD_LINE_ARGS=%*
-
:execute
-
@rem Setup the command line
-set CLASSPATH=%GRADLE_WRAPPER_JAR%
-@rem Don't fork a daemon mode on initial run that generates local defaults.
-SET GRADLE_DAEMON_CTRL=
-IF NOT EXIST "%DIRNAME%\gradle.properties" SET GRADLE_DAEMON_CTRL=--no-daemon
+set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
+
@rem Execute Gradle
-"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %GRADLE_DAEMON_CTRL% %CMD_LINE_ARGS%
+"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %*
:end
@rem End local scope for the variables with windows NT shell
-if "%ERRORLEVEL%"=="0" goto mainEnd
+if %ERRORLEVEL% equ 0 goto mainEnd
:fail
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
rem the _cmd.exe /c_ return code!
-if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
-exit /b 1
+set EXIT_CODE=%ERRORLEVEL%
+if %EXIT_CODE% equ 0 set EXIT_CODE=1
+if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE%
+exit /b %EXIT_CODE%
:mainEnd
if "%OS%"=="Windows_NT" endlocal