You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ma...@apache.org on 2023/08/24 21:57:24 UTC

[solr-sandbox] branch main updated: Allow using collection properties to configure crossdc updatehandler. (#65)

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/main by this push:
     new ec350a9  Allow using collection properties to configure crossdc updatehandler. (#65)
ec350a9 is described below

commit ec350a9f0b9d4d1828ef1336f14cf096aa80dc54
Author: Mark Robert Miller <ma...@apache.org>
AuthorDate: Thu Aug 24 16:57:20 2023 -0500

    Allow using collection properties to configure crossdc updatehandler. (#65)
---
 crossdc-consumer/build.gradle                      |  4 ++--
 .../MirroringUpdateRequestProcessorFactory.java    | 25 ++++++++++++++++++----
 ...SolrAndKafkaMultiCollectionIntegrationTest.java | 22 ++++++++++++++++++-
 3 files changed, 44 insertions(+), 7 deletions(-)

diff --git a/crossdc-consumer/build.gradle b/crossdc-consumer/build.gradle
index cca42fc..67668d0 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-consumer/build.gradle
@@ -60,8 +60,8 @@ dependencies {
         exclude group: "org.apache.logging.log4j", module: "*"
         exclude group: "org.slf4j", module: "*"
     }
-    testImplementation 'org.apache.kafka:kafka_2.13:2.8.1'
-    testImplementation 'org.apache.kafka:kafka-streams:2.8.1'
+    implementation 'org.apache.kafka:kafka_2.13:2.8.1'
+    implementation 'org.apache.kafka:kafka-streams:2.8.1'
     testImplementation 'org.apache.kafka:kafka_2.13:2.8.1:test'
     testImplementation 'org.apache.kafka:kafka-streams:2.8.1:test'
 }
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
index 4f571cf..87a4883 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -17,6 +17,7 @@
 package org.apache.solr.update.processor;
 
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.CollectionProperties;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
@@ -127,10 +128,6 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
     public void inform(SolrCore core) {
         log.info("KafkaRequestMirroringHandler inform enabled={}", this.enabled);
 
-        if (!enabled) {
-            return;
-        }
-
         log.info("Producer startup config properties before adding additional properties from Zookeeper={}", properties);
 
         Properties zkProps = null;
@@ -152,6 +149,22 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
 
                 KafkaCrossDcConf.readZkProps(properties, zkProps);
             }
+            CollectionProperties cp = new CollectionProperties(core.getCoreContainer().getZkController().getZkClient());
+             Map<String,String> collectionProperties = cp.getCollectionProperties(core.getCoreDescriptor().getCollectionName());
+            for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
+                String val = collectionProperties.get("crossdc." + configKey.getKey());
+                if (val != null && !val.isBlank()) {
+                    properties.put(configKey.getKey(), val);
+                }
+            }
+            String enabledVal = collectionProperties.get("crossdc.enabled");
+            if (enabledVal != null) {
+                if (Boolean.parseBoolean(enabledVal.toString())) {
+                    this.enabled = true;
+                } else {
+                    this.enabled = false;
+                }
+            }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             log.error("Interrupted looking for CrossDC configuration in Zookeeper", e);
@@ -161,6 +174,10 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
             throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception looking for CrossDC configuration in Zookeeper", e);
         }
 
+        if (!enabled) {
+            return;
+        }
+
         if (properties.get(BOOTSTRAP_SERVERS) == null) {
             log.error(
                 "boostrapServers not specified for producer in CrossDC configuration props={} zkProps={}",
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaMultiCollectionIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaMultiCollectionIntegrationTest.java
index 815a5c2..bd1aaa5 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaMultiCollectionIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaMultiCollectionIntegrationTest.java
@@ -14,6 +14,7 @@ import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.cloud.MiniSolrCloudCluster;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.CollectionProperties;
 import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.crossdc.common.KafkaCrossDcConf;
 import org.apache.solr.crossdc.consumer.Consumer;
@@ -72,7 +73,10 @@ import static org.apache.solr.crossdc.common.KafkaCrossDcConf.INDEX_UNMIRRORABLE
 
     kafkaCluster.createTopic(TOPIC, 1, 1);
 
-    System.setProperty("topicName", TOPIC);
+    // in this test we will count on collection properties for topicName and enabled=true
+    System.setProperty("enabled", "false");
+    // System.setProperty("topicName", TOPIC);
+
     System.setProperty("bootstrapServers", kafkaCluster.bootstrapServers());
     System.setProperty(INDEX_UNMIRRORABLE_DOCS, "false");
 
@@ -96,6 +100,14 @@ import static org.apache.solr.crossdc.common.KafkaCrossDcConf.INDEX_UNMIRRORABLE
 
     solrCluster2.getSolrClient().setDefaultCollection(COLLECTION);
 
+    // Update the collection property "enabled" to true
+    CollectionProperties cp = new CollectionProperties(solrCluster1.getZkClient());
+    cp.setCollectionProperty(COLLECTION, "crossdc.enabled", "true");
+    cp.setCollectionProperty(COLLECTION, "crossdc.topicName", TOPIC);
+    // Reloading the collection
+    CollectionAdminRequest.Reload reloadRequest = CollectionAdminRequest.reloadCollection(COLLECTION);
+    reloadRequest.process(solrCluster1.getSolrClient());
+
     String bootstrapServers = kafkaCluster.bootstrapServers();
     log.info("bootstrapServers={}", bootstrapServers);
 
@@ -169,6 +181,14 @@ import static org.apache.solr.crossdc.common.KafkaCrossDcConf.INDEX_UNMIRRORABLE
       solrCluster2.getSolrClient().request(create);
       solrCluster2.waitForActiveCollection(ALT_COLLECTION, 1, 1);
 
+      // Update the collection property "enabled" to true
+      CollectionProperties cp = new CollectionProperties(solrCluster1.getZkClient());
+      cp.setCollectionProperty(ALT_COLLECTION, "crossdc.enabled", "true");
+      cp.setCollectionProperty(ALT_COLLECTION, "crossdc.topicName", TOPIC);
+      // Reloading the collection
+      CollectionAdminRequest.Reload reloadRequest = CollectionAdminRequest.reloadCollection(ALT_COLLECTION);
+      reloadRequest.process(solrCluster1.getSolrClient());
+
 
       CloudSolrClient client = solrCluster1.getSolrClient();