You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ma...@apache.org on 2022/09/14 22:33:41 UTC
[solr-sandbox] branch crossdc-wip updated: Config override test and cleanup. (#39)
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/crossdc-wip by this push:
new 6c68048 Config override test and cleanup. (#39)
6c68048 is described below
commit 6c6804828fc016ef871f3b3f4f1e1d00178e45bc
Author: Mark Robert Miller <ma...@apache.org>
AuthorDate: Wed Sep 14 17:33:36 2022 -0500
Config override test and cleanup. (#39)
* Flush producer on close to prevent losing any pending updates.
* Add a config override test and some cleanup.
---
.../solr/crossdc/common/KafkaCrossDcConf.java | 16 +++++
.../org/apache/solr/crossdc/consumer/Consumer.java | 80 ++++++++++------------
.../MirroringUpdateRequestProcessorFactory.java | 21 ++----
.../solr/crossdc/ZkConfigIntegrationTest.java | 9 +--
4 files changed, 65 insertions(+), 61 deletions(-)
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
index 9a14a1b..85feb75 100644
--- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
@@ -228,6 +228,22 @@ public class KafkaCrossDcConf extends CrossDcConf {
return additional;
}
+ public static void readZkProps(Map<String,Object> properties, Properties zkProps) {
+ Properties zkPropsUnproccessed = new Properties(zkProps);
+ for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
+ if (properties.get(configKey.getKey()) == null || ((String)properties.get(configKey.getKey())).isBlank()) {
+ properties.put(configKey.getKey(), (String) zkProps.getProperty(
+ configKey.getKey()));
+ zkPropsUnproccessed.remove(configKey.getKey());
+ }
+ }
+ zkPropsUnproccessed.forEach((key, val) -> {
+ if (properties.get(key) == null) {
+ properties.put((String) key, (String) val);
+ }
+ });
+ }
+
@Override public String toString() {
StringBuilder sb = new StringBuilder(128);
for (ConfigProperty configProperty : CONFIG_PROPERTIES) {
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
index b3cb2b3..122205f 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -46,41 +46,18 @@ public class Consumer {
private Server server;
private CrossDcConsumer crossDcConsumer;
- public void start(Map<String, Object> properties) {
-
- //server = new Server();
- //ServerConnector connector = new ServerConnector(server);
- //connector.setPort(port);
- //server.setConnectors(new Connector[] {connector})
- KafkaCrossDcConf conf = new KafkaCrossDcConf(properties);
- crossDcConsumer = getCrossDcConsumer(conf);
-
- // Start consumer thread
-
- log.info("Starting CrossDC Consumer {}", conf);
-
- /**
- * ExecutorService to manage the cross-dc consumer threads.
- */
- ExecutorService consumerThreadExecutor = Executors.newSingleThreadExecutor();
- consumerThreadExecutor.submit(crossDcConsumer);
-
- // Register shutdown hook
- Thread shutdownHook = new Thread(() -> System.out.println("Shutting down consumers!"));
- Runtime.getRuntime().addShutdownHook(shutdownHook);
+ public void start() {
+ start(new HashMap<>());
}
- private CrossDcConsumer getCrossDcConsumer(KafkaCrossDcConf conf) {
- return new KafkaCrossDcConsumer(conf);
- }
-
- public static void main(String[] args) {
-
- Map<String,Object> properties = new HashMap<>();
+ public void start(Map<String,Object> properties ) {
for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
- properties.put(configKey.getKey(), System.getProperty(configKey.getKey()));
+ String val = System.getProperty(configKey.getKey());
+ if (val != null) {
+ properties.put(configKey.getKey(), val);
+ }
}
String zkConnectString = (String) properties.get(KafkaCrossDcConf.ZK_CONNECT_STRING);
@@ -98,18 +75,7 @@ public class Consumer {
Properties zkProps = new Properties();
zkProps.load(new ByteArrayInputStream(data));
- Map<Object, Object> zkPropsUnproccessed = new HashMap<>(zkProps);
-
- for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
- if (properties.get(configKey.getKey()) == null || ((String)properties.get(configKey.getKey())).isBlank()) {
- properties.put(configKey.getKey(), (String) zkProps.getProperty(
- configKey.getKey()));
- zkPropsUnproccessed.remove(configKey.getKey());
- }
- }
- zkPropsUnproccessed.forEach((key, val) -> {
- properties.put((String) key, (String) val);
- });
+ KafkaCrossDcConf.readZkProps(properties, zkProps);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -129,8 +95,36 @@ public class Consumer {
throw new IllegalArgumentException("topicName not specified for Consumer");
}
+ //server = new Server();
+ //ServerConnector connector = new ServerConnector(server);
+ //connector.setPort(port);
+ //server.setConnectors(new Connector[] {connector})
+ KafkaCrossDcConf conf = new KafkaCrossDcConf(properties);
+ crossDcConsumer = getCrossDcConsumer(conf);
+
+ // Start consumer thread
+
+ log.info("Starting CrossDC Consumer {}", conf);
+
+ /**
+ * ExecutorService to manage the cross-dc consumer threads.
+ */
+ ExecutorService consumerThreadExecutor = Executors.newSingleThreadExecutor();
+ consumerThreadExecutor.submit(crossDcConsumer);
+
+ // Register shutdown hook
+ Thread shutdownHook = new Thread(() -> System.out.println("Shutting down consumers!"));
+ Runtime.getRuntime().addShutdownHook(shutdownHook);
+ }
+
+ private CrossDcConsumer getCrossDcConsumer(KafkaCrossDcConf conf) {
+ return new KafkaCrossDcConsumer(conf);
+ }
+
+ public static void main(String[] args) {
+
Consumer consumer = new Consumer();
- consumer.start(properties);
+ consumer.start();
}
public final void shutdown() {
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
index 160cb46..207f0a6 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -84,7 +84,10 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
}
for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
- properties.put(configKey.getKey(), args._getStr(configKey.getKey(), null));
+ String val = args._getStr(configKey.getKey(), null);
+ if (val != null) {
+ properties.put(configKey.getKey(), val);
+ }
}
}
@@ -144,19 +147,9 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
zkProps = new Properties();
zkProps.load(new ByteArrayInputStream(data));
- Properties zkPropsUnproccessed = new Properties(zkProps);
-
- for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
- if (properties.get(configKey.getKey()) == null || ((String)properties.get(configKey.getKey())).isBlank()) {
- properties.put(configKey.getKey(), (String) zkProps.getProperty(
- configKey.getKey()));
- zkPropsUnproccessed.remove(configKey.getKey());
- }
- }
- zkPropsUnproccessed.forEach((key, val) -> {
- properties.put((String) key, (String) val);
- });
- }
+
+ KafkaCrossDcConf.readZkProps(properties, zkProps);
+ }
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Interrupted looking for CrossDC configuration in Zookeeper", e);
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
index d496e7c..d1979e1 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
@@ -81,9 +81,12 @@ import java.util.Properties;
solrCluster1 = new SolrCloudTestCase.Builder(1, createTempDir()).addConfig("conf",
getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
- props.setProperty(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
+ props.setProperty(KafkaCrossDcConf.TOPIC_NAME, "bad_topic");
props.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, kafkaCluster.bootstrapServers());
+ System.setProperty(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
+
+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
props.store(baos, "");
byte[] data = baos.toByteArray();
@@ -112,10 +115,8 @@ import java.util.Properties;
log.info("bootstrapServers={}", bootstrapServers);
Map<String, Object> properties = new HashMap<>();
- properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING, solrCluster2.getZkServer().getZkAddress());
properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
- properties.put(KafkaCrossDcConf.GROUP_ID, "group1");
consumer.start(properties);
}
@@ -165,7 +166,7 @@ import java.util.Properties;
QueryResponse results = null;
boolean foundUpdates = false;
- for (int i = 0; i < 50; i++) {
+ for (int i = 0; i < 100; i++) {
solrCluster2.getSolrClient().commit(COLLECTION);
solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));