You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ma...@apache.org on 2022/09/14 22:33:41 UTC

[solr-sandbox] branch crossdc-wip updated: Config override test and cleanup. (#39)

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/crossdc-wip by this push:
     new 6c68048  Config override test and cleanup. (#39)
6c68048 is described below

commit 6c6804828fc016ef871f3b3f4f1e1d00178e45bc
Author: Mark Robert Miller <ma...@apache.org>
AuthorDate: Wed Sep 14 17:33:36 2022 -0500

    Config override test and cleanup. (#39)
    
    * Flush producer on close to prevent losing any pending updates.
    
    * Add a config override test and some cleanup.
---
 .../solr/crossdc/common/KafkaCrossDcConf.java      | 16 +++++
 .../org/apache/solr/crossdc/consumer/Consumer.java | 80 ++++++++++------------
 .../MirroringUpdateRequestProcessorFactory.java    | 21 ++----
 .../solr/crossdc/ZkConfigIntegrationTest.java      |  9 +--
 4 files changed, 65 insertions(+), 61 deletions(-)

diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
index 9a14a1b..85feb75 100644
--- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
@@ -228,6 +228,22 @@ public class KafkaCrossDcConf extends CrossDcConf {
     return additional;
   }
 
+  public static void readZkProps(Map<String,Object> properties, Properties zkProps) {
+    Properties zkPropsUnproccessed = new Properties(zkProps);
+    for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
+      if (properties.get(configKey.getKey()) == null || ((String)properties.get(configKey.getKey())).isBlank()) {
+        properties.put(configKey.getKey(), (String) zkProps.getProperty(
+            configKey.getKey()));
+        zkPropsUnproccessed.remove(configKey.getKey());
+      }
+    }
+    zkPropsUnproccessed.forEach((key, val) -> {
+      if (properties.get(key) == null) {
+        properties.put((String) key, (String) val);
+      }
+    });
+  }
+
   @Override public String toString() {
     StringBuilder sb = new StringBuilder(128);
     for (ConfigProperty configProperty : CONFIG_PROPERTIES) {
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
index b3cb2b3..122205f 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -46,41 +46,18 @@ public class Consumer {
     private Server server;
     private CrossDcConsumer crossDcConsumer;
 
-    public void start(Map<String, Object> properties) {
 
-
-        //server = new Server();
-        //ServerConnector connector = new ServerConnector(server);
-        //connector.setPort(port);
-        //server.setConnectors(new Connector[] {connector})
-        KafkaCrossDcConf conf = new KafkaCrossDcConf(properties);
-        crossDcConsumer = getCrossDcConsumer(conf);
-
-        // Start consumer thread
-
-        log.info("Starting CrossDC Consumer {}", conf);
-
-        /**
-         * ExecutorService to manage the cross-dc consumer threads.
-         */
-        ExecutorService consumerThreadExecutor = Executors.newSingleThreadExecutor();
-        consumerThreadExecutor.submit(crossDcConsumer);
-
-        // Register shutdown hook
-        Thread shutdownHook = new Thread(() -> System.out.println("Shutting down consumers!"));
-        Runtime.getRuntime().addShutdownHook(shutdownHook);
+    public void start() {
+        start(new HashMap<>());
     }
 
-    private CrossDcConsumer getCrossDcConsumer(KafkaCrossDcConf conf) {
-        return new KafkaCrossDcConsumer(conf);
-    }
-
-    public static void main(String[] args) {
-
-        Map<String,Object> properties = new HashMap<>();
+    public void start(Map<String,Object> properties ) {
 
         for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
-            properties.put(configKey.getKey(), System.getProperty(configKey.getKey()));
+            String val = System.getProperty(configKey.getKey());
+            if (val != null) {
+                properties.put(configKey.getKey(), val);
+            }
         }
 
         String zkConnectString = (String) properties.get(KafkaCrossDcConf.ZK_CONNECT_STRING);
@@ -98,18 +75,7 @@ public class Consumer {
                     Properties zkProps = new Properties();
                     zkProps.load(new ByteArrayInputStream(data));
 
-                    Map<Object, Object> zkPropsUnproccessed = new HashMap<>(zkProps);
-
-                    for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
-                        if (properties.get(configKey.getKey()) == null || ((String)properties.get(configKey.getKey())).isBlank()) {
-                            properties.put(configKey.getKey(), (String) zkProps.getProperty(
-                                configKey.getKey()));
-                            zkPropsUnproccessed.remove(configKey.getKey());
-                        }
-                    }
-                    zkPropsUnproccessed.forEach((key, val) -> {
-                        properties.put((String) key, (String) val);
-                    });
+                    KafkaCrossDcConf.readZkProps(properties, zkProps);
                 }
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
@@ -129,8 +95,36 @@ public class Consumer {
             throw new IllegalArgumentException("topicName not specified for Consumer");
         }
 
+        //server = new Server();
+        //ServerConnector connector = new ServerConnector(server);
+        //connector.setPort(port);
+        //server.setConnectors(new Connector[] {connector})
+        KafkaCrossDcConf conf = new KafkaCrossDcConf(properties);
+        crossDcConsumer = getCrossDcConsumer(conf);
+
+        // Start consumer thread
+
+        log.info("Starting CrossDC Consumer {}", conf);
+
+        /**
+         * ExecutorService to manage the cross-dc consumer threads.
+         */
+        ExecutorService consumerThreadExecutor = Executors.newSingleThreadExecutor();
+        consumerThreadExecutor.submit(crossDcConsumer);
+
+        // Register shutdown hook
+        Thread shutdownHook = new Thread(() -> System.out.println("Shutting down consumers!"));
+        Runtime.getRuntime().addShutdownHook(shutdownHook);
+    }
+
+    private CrossDcConsumer getCrossDcConsumer(KafkaCrossDcConf conf) {
+        return new KafkaCrossDcConsumer(conf);
+    }
+
+    public static void main(String[] args) {
+
         Consumer consumer = new Consumer();
-        consumer.start(properties);
+        consumer.start();
     }
 
     public final void shutdown() {
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
index 160cb46..207f0a6 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -84,7 +84,10 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
         }
 
         for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
-            properties.put(configKey.getKey(), args._getStr(configKey.getKey(), null));
+            String val = args._getStr(configKey.getKey(), null);
+            if (val != null) {
+                properties.put(configKey.getKey(), val);
+            }
         }
     }
 
@@ -144,19 +147,9 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
 
                 zkProps = new Properties();
                 zkProps.load(new ByteArrayInputStream(data));
-                Properties zkPropsUnproccessed = new Properties(zkProps);
-
-                for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
-                    if (properties.get(configKey.getKey()) == null || ((String)properties.get(configKey.getKey())).isBlank()) {
-                        properties.put(configKey.getKey(), (String) zkProps.getProperty(
-                            configKey.getKey()));
-                        zkPropsUnproccessed.remove(configKey.getKey());
-                    }
-                }
-                zkPropsUnproccessed.forEach((key, val) -> {
-                    properties.put((String) key, (String) val);
-                });
-             }
+
+                KafkaCrossDcConf.readZkProps(properties, zkProps);
+            }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             log.error("Interrupted looking for CrossDC configuration in Zookeeper", e);
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
index d496e7c..d1979e1 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
@@ -81,9 +81,12 @@ import java.util.Properties;
     solrCluster1 = new SolrCloudTestCase.Builder(1, createTempDir()).addConfig("conf",
         getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
 
-    props.setProperty(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
+    props.setProperty(KafkaCrossDcConf.TOPIC_NAME, "bad_topic");
     props.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, kafkaCluster.bootstrapServers());
 
+    System.setProperty(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
+
+
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     props.store(baos, "");
     byte[] data = baos.toByteArray();
@@ -112,10 +115,8 @@ import java.util.Properties;
     log.info("bootstrapServers={}", bootstrapServers);
 
     Map<String, Object> properties = new HashMap<>();
-    properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
     properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING, solrCluster2.getZkServer().getZkAddress());
     properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
-    properties.put(KafkaCrossDcConf.GROUP_ID, "group1");
     consumer.start(properties);
 
   }
@@ -165,7 +166,7 @@ import java.util.Properties;
 
     QueryResponse results = null;
     boolean foundUpdates = false;
-    for (int i = 0; i < 50; i++) {
+    for (int i = 0; i < 100; i++) {
       solrCluster2.getSolrClient().commit(COLLECTION);
       solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
       results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));