You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ma...@apache.org on 2023/08/24 21:57:24 UTC
[solr-sandbox] branch main updated: Allow using collection properties to configure crossdc updatehandler. (#65)
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/main by this push:
new ec350a9 Allow using collection properties to configure crossdc updatehandler. (#65)
ec350a9 is described below
commit ec350a9f0b9d4d1828ef1336f14cf096aa80dc54
Author: Mark Robert Miller <ma...@apache.org>
AuthorDate: Thu Aug 24 16:57:20 2023 -0500
Allow using collection properties to configure crossdc updatehandler. (#65)
---
crossdc-consumer/build.gradle | 4 ++--
.../MirroringUpdateRequestProcessorFactory.java | 25 ++++++++++++++++++----
...SolrAndKafkaMultiCollectionIntegrationTest.java | 22 ++++++++++++++++++-
3 files changed, 44 insertions(+), 7 deletions(-)
diff --git a/crossdc-consumer/build.gradle b/crossdc-consumer/build.gradle
index cca42fc..67668d0 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-consumer/build.gradle
@@ -60,8 +60,8 @@ dependencies {
exclude group: "org.apache.logging.log4j", module: "*"
exclude group: "org.slf4j", module: "*"
}
- testImplementation 'org.apache.kafka:kafka_2.13:2.8.1'
- testImplementation 'org.apache.kafka:kafka-streams:2.8.1'
+ implementation 'org.apache.kafka:kafka_2.13:2.8.1'
+ implementation 'org.apache.kafka:kafka-streams:2.8.1'
testImplementation 'org.apache.kafka:kafka_2.13:2.8.1:test'
testImplementation 'org.apache.kafka:kafka-streams:2.8.1:test'
}
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
index 4f571cf..87a4883 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -17,6 +17,7 @@
package org.apache.solr.update.processor;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.CollectionProperties;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
@@ -127,10 +128,6 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
public void inform(SolrCore core) {
log.info("KafkaRequestMirroringHandler inform enabled={}", this.enabled);
- if (!enabled) {
- return;
- }
-
log.info("Producer startup config properties before adding additional properties from Zookeeper={}", properties);
Properties zkProps = null;
@@ -152,6 +149,22 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
KafkaCrossDcConf.readZkProps(properties, zkProps);
}
+ CollectionProperties cp = new CollectionProperties(core.getCoreContainer().getZkController().getZkClient());
+ Map<String,String> collectionProperties = cp.getCollectionProperties(core.getCoreDescriptor().getCollectionName());
+ for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
+ String val = collectionProperties.get("crossdc." + configKey.getKey());
+ if (val != null && !val.isBlank()) {
+ properties.put(configKey.getKey(), val);
+ }
+ }
+ String enabledVal = collectionProperties.get("crossdc.enabled");
+ if (enabledVal != null) {
+ if (Boolean.parseBoolean(enabledVal.toString())) {
+ this.enabled = true;
+ } else {
+ this.enabled = false;
+ }
+ }
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Interrupted looking for CrossDC configuration in Zookeeper", e);
@@ -161,6 +174,10 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception looking for CrossDC configuration in Zookeeper", e);
}
+ if (!enabled) {
+ return;
+ }
+
if (properties.get(BOOTSTRAP_SERVERS) == null) {
log.error(
"boostrapServers not specified for producer in CrossDC configuration props={} zkProps={}",
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaMultiCollectionIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaMultiCollectionIntegrationTest.java
index 815a5c2..bd1aaa5 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaMultiCollectionIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaMultiCollectionIntegrationTest.java
@@ -14,6 +14,7 @@ import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.CollectionProperties;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.consumer.Consumer;
@@ -72,7 +73,10 @@ import static org.apache.solr.crossdc.common.KafkaCrossDcConf.INDEX_UNMIRRORABLE
kafkaCluster.createTopic(TOPIC, 1, 1);
- System.setProperty("topicName", TOPIC);
+ // in this test we will count on collection properties for topicName and enabled=true
+ System.setProperty("enabled", "false");
+ // System.setProperty("topicName", TOPIC);
+
System.setProperty("bootstrapServers", kafkaCluster.bootstrapServers());
System.setProperty(INDEX_UNMIRRORABLE_DOCS, "false");
@@ -96,6 +100,14 @@ import static org.apache.solr.crossdc.common.KafkaCrossDcConf.INDEX_UNMIRRORABLE
solrCluster2.getSolrClient().setDefaultCollection(COLLECTION);
+ // Update the collection property "enabled" to true
+ CollectionProperties cp = new CollectionProperties(solrCluster1.getZkClient());
+ cp.setCollectionProperty(COLLECTION, "crossdc.enabled", "true");
+ cp.setCollectionProperty(COLLECTION, "crossdc.topicName", TOPIC);
+ // Reloading the collection
+ CollectionAdminRequest.Reload reloadRequest = CollectionAdminRequest.reloadCollection(COLLECTION);
+ reloadRequest.process(solrCluster1.getSolrClient());
+
String bootstrapServers = kafkaCluster.bootstrapServers();
log.info("bootstrapServers={}", bootstrapServers);
@@ -169,6 +181,14 @@ import static org.apache.solr.crossdc.common.KafkaCrossDcConf.INDEX_UNMIRRORABLE
solrCluster2.getSolrClient().request(create);
solrCluster2.waitForActiveCollection(ALT_COLLECTION, 1, 1);
+ // Update the collection property "enabled" to true
+ CollectionProperties cp = new CollectionProperties(solrCluster1.getZkClient());
+ cp.setCollectionProperty(ALT_COLLECTION, "crossdc.enabled", "true");
+ cp.setCollectionProperty(ALT_COLLECTION, "crossdc.topicName", TOPIC);
+ // Reloading the collection
+ CollectionAdminRequest.Reload reloadRequest = CollectionAdminRequest.reloadCollection(ALT_COLLECTION);
+ reloadRequest.process(solrCluster1.getSolrClient());
+
CloudSolrClient client = solrCluster1.getSolrClient();