You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by an...@apache.org on 2022/06/08 07:21:37 UTC
[solr-sandbox] branch crossdc-wip updated: Allow central config from ZK and build updates. (#21)
This is an automated email from the ASF dual-hosted git repository.
anshum pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/crossdc-wip by this push:
new 990eec6 Allow central config from ZK and build updates. (#21)
990eec6 is described below
commit 990eec6f83bb439917cafbc2a703a4bdad805712
Author: Mark Robert Miller <ma...@apache.org>
AuthorDate: Wed Jun 8 02:21:33 2022 -0500
Allow central config from ZK and build updates. (#21)
---
.gitignore | 2 +
crossdc-commons/build.gradle | 7 -
crossdc-commons/gradle.properties | 2 +
.../apache/solr/crossdc/common/CrossDcConf.java | 2 +
crossdc-consumer/build.gradle | 7 -
crossdc-consumer/gradle.properties | 2 +
.../org/apache/solr/crossdc/consumer/Consumer.java | 68 +++++++-
crossdc-producer/build.gradle | 6 -
crossdc-producer/gradle.properties | 2 +
.../MirroringUpdateRequestProcessorFactory.java | 52 ++++--
.../solr/crossdc/ZkConfigIntegrationTest.java | 180 +++++++++++++++++++++
.../configs/cloud-minimal/conf/solrconfig.xml | 3 +-
gradle/wrapper/gradle-wrapper.properties | 2 +-
13 files changed, 294 insertions(+), 41 deletions(-)
diff --git a/.gitignore b/.gitignore
index 4daa5b8..2318ff7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -15,3 +15,5 @@
# Ignore Gradle build output directory
build
out
+
+logs
diff --git a/crossdc-commons/build.gradle b/crossdc-commons/build.gradle
index 26e1893..550586e 100644
--- a/crossdc-commons/build.gradle
+++ b/crossdc-commons/build.gradle
@@ -21,11 +21,8 @@ plugins {
description = 'Cross-DC Commons package'
-version '1.0-SNAPSHOT'
-
repositories {
mavenCentral()
- jcenter()
}
configurations {
@@ -42,10 +39,6 @@ dependencies {
implementation 'com.google.guava:guava:14.0'
}
-subprojects {
- group "org.apache.solr"
-}
-
jar.enabled = false
shadowJar {
diff --git a/crossdc-commons/gradle.properties b/crossdc-commons/gradle.properties
new file mode 100644
index 0000000..0df7afe
--- /dev/null
+++ b/crossdc-commons/gradle.properties
@@ -0,0 +1,2 @@
+group=org.apache.solr
+version=0.1-SNAPSHOT
\ No newline at end of file
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
index b1f3d24..26dbc13 100644
--- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
@@ -17,5 +17,7 @@
package org.apache.solr.crossdc.common;
public abstract class CrossDcConf {
+ public static final String CROSSDC_PROPERTIES = "/crossdc.properties";
+
public abstract String getClusterName();
}
diff --git a/crossdc-consumer/build.gradle b/crossdc-consumer/build.gradle
index e49e8e0..f0ebcfc 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-consumer/build.gradle
@@ -20,11 +20,8 @@ plugins {
description = 'Cross-DC Consumer package'
-version '1.0-SNAPSHOT'
-
repositories {
mavenCentral()
- jcenter()
}
application {
@@ -57,10 +54,6 @@ dependencies {
testImplementation 'org.apache.kafka:kafka-streams:2.8.1:test'
}
-subprojects {
- group "org.apache.solr"
-}
-
test {
jvmArgs '-Djava.security.egd=file:/dev/./urandom'
}
\ No newline at end of file
diff --git a/crossdc-consumer/gradle.properties b/crossdc-consumer/gradle.properties
new file mode 100644
index 0000000..0df7afe
--- /dev/null
+++ b/crossdc-consumer/gradle.properties
@@ -0,0 +1,2 @@
+group=org.apache.solr
+version=0.1-SNAPSHOT
\ No newline at end of file
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
index 5bc7f6b..feba638 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -16,6 +16,9 @@
*/
package org.apache.solr.crossdc.consumer;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.crossdc.common.CrossDcConf;
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
import org.eclipse.jetty.server.Connector;
@@ -24,12 +27,15 @@ import org.eclipse.jetty.server.ServerConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
import java.lang.invoke.MethodHandles;
+import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
// Cross-DC Consumer main class
public class Consumer {
+ public static final String DEFAULT_PORT = "8090";
private static boolean enabled = true;
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -56,10 +62,11 @@ public class Consumer {
this.topicName = topicName;
- server = new Server();
- ServerConnector connector = new ServerConnector(server);
- connector.setPort(port);
- server.setConnectors(new Connector[] {connector});
+ //server = new Server();
+ //ServerConnector connector = new ServerConnector(server);
+ //connector.setPort(port);
+ //server.setConnectors(new Connector[] {connector})
+
crossDcConsumer = getCrossDcConsumer(bootstrapServers, zkConnectString, topicName, enableDataEncryption);
// Start consumer thread
@@ -82,14 +89,59 @@ public class Consumer {
}
public static void main(String[] args) {
+
+ String zkConnectString = System.getProperty("zkConnectString");
+ if (zkConnectString == null || zkConnectString.isBlank()) {
+ throw new IllegalArgumentException("zkConnectString not specified for producer");
+ }
+
String bootstrapServers = System.getProperty("bootstrapServers");
- boolean enableDataEncryption = Boolean.getBoolean("enableDataEncryption");
+ // boolean enableDataEncryption = Boolean.getBoolean("enableDataEncryption");
String topicName = System.getProperty("topicName");
- String zkConnectString = System.getProperty("zkConnectString");
- String port = System.getProperty("port", "8090");
+ String port = System.getProperty("port");
+
+
+ try (SolrZkClient client = new SolrZkClient(zkConnectString, 15000)) {
+
+ try {
+ if ((topicName == null || topicName.isBlank())
+ || (bootstrapServers == null || bootstrapServers.isBlank()) || (port == null || port.isBlank()) && client
+ .exists(CrossDcConf.CROSSDC_PROPERTIES, true)) {
+ byte[] data = client.getData("/crossdc.properties", null, null, true);
+ Properties props = new Properties();
+ props.load(new ByteArrayInputStream(data));
+
+ if (topicName == null || topicName.isBlank()) {
+ topicName = props.getProperty("topicName");
+ }
+ if (bootstrapServers == null || bootstrapServers.isBlank()) {
+ bootstrapServers = props.getProperty("bootstrapServers");
+ }
+ if (port == null || port.isBlank()) {
+ port = props.getProperty("port");
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
+ } catch (Exception e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ }
+
+ if (port == null) {
+ port = DEFAULT_PORT;
+ }
+
+ if (bootstrapServers == null || bootstrapServers.isBlank()) {
+ throw new IllegalArgumentException("boostrapServers not specified for producer");
+ }
+ if (topicName == null || topicName.isBlank()) {
+ throw new IllegalArgumentException("topicName not specified for producer");
+ }
Consumer consumer = new Consumer();
- consumer.start(bootstrapServers, zkConnectString, topicName, enableDataEncryption, Integer.parseInt(port));
+ consumer.start(bootstrapServers, zkConnectString, topicName, false, Integer.parseInt(port));
}
public void shutdown() {
diff --git a/crossdc-producer/build.gradle b/crossdc-producer/build.gradle
index b636c88..ad505dc 100644
--- a/crossdc-producer/build.gradle
+++ b/crossdc-producer/build.gradle
@@ -21,8 +21,6 @@ plugins {
description = 'Cross-DC Producer package'
-version '1.0-SNAPSHOT'
-
repositories {
mavenCentral()
}
@@ -58,10 +56,6 @@ dependencies {
testImplementation 'org.apache.kafka:kafka-clients:2.8.1:test'
}
-subprojects {
- group "org.apache.solr"
-}
-
jar.enabled = false
shadowJar {
diff --git a/crossdc-producer/gradle.properties b/crossdc-producer/gradle.properties
new file mode 100644
index 0000000..0df7afe
--- /dev/null
+++ b/crossdc-producer/gradle.properties
@@ -0,0 +1,2 @@
+group=org.apache.solr
+version=0.1-SNAPSHOT
\ No newline at end of file
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
index 2d91531..45d5e6b 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -23,16 +23,20 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CloseHook;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
+import org.apache.solr.crossdc.common.CrossDcConf;
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.common.KafkaMirroringSink;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.util.plugin.SolrCoreAware;
+import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
+import java.util.Properties;
import static org.apache.solr.update.processor.DistributedUpdateProcessor.*;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
@@ -60,11 +64,15 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
/** This is instantiated in inform(SolrCore) and then shared by all processor instances - visible for testing */
volatile KafkaRequestMirroringHandler mirroringHandler;
+ private String topicName;
+ private String bootstrapServers;
@Override
public void init(final NamedList args) {
-
super.init(args);
+
+ topicName = args._getStr("topicName", null);
+ bootstrapServers = args._getStr("bootstrapServers", null);
}
private class Closer {
@@ -87,20 +95,42 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
@Override
public void inform(SolrCore core) {
log.info("KafkaRequestMirroringHandler inform");
- // load the request mirroring sink class and instantiate.
- // mirroringHandler = core.getResourceLoader().newInstance(RequestMirroringHandler.class.getName(), KafkaRequestMirroringHandler.class);
+ try {
+ if ((topicName == null || topicName.isBlank()) || (bootstrapServers == null || bootstrapServers.isBlank()) && core.getCoreContainer().getZkController()
+ .getZkClient().exists(CrossDcConf.CROSSDC_PROPERTIES, true)) {
+ byte[] data = core.getCoreContainer().getZkController().getZkClient().getData("/crossdc.properties", null, null, true);
+ Properties props = new Properties();
+ props.load(new ByteArrayInputStream(data));
+
+ if (topicName == null || topicName.isBlank()) {
+ topicName = props.getProperty("topicName");
+ }
+ if (bootstrapServers == null || bootstrapServers.isBlank()) {
+ bootstrapServers = props.getProperty("bootstrapServers");
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
+ } catch (Exception e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
- // TODO: Setup Kafka properly
- final String topicName = System.getProperty("topicName");
- if (topicName == null) {
+ if (bootstrapServers == null || bootstrapServers.isBlank()) {
+ throw new IllegalArgumentException("boostrapServers not specified for producer");
+ }
+
+ if (topicName == null || topicName.isBlank()) {
throw new IllegalArgumentException("topicName not specified for producer");
}
- final String boostrapServers = System.getProperty("bootstrapServers");
- if (boostrapServers == null) {
- throw new IllegalArgumentException("boostrapServers not specified for producer");
- }
- KafkaCrossDcConf conf = new KafkaCrossDcConf(boostrapServers, topicName, false, null);
+
+ log.info("bootstrapServers={} topicName={}", bootstrapServers, topicName);
+
+ // load the request mirroring sink class and instantiate.
+ // mirroringHandler = core.getResourceLoader().newInstance(RequestMirroringHandler.class.getName(), KafkaRequestMirroringHandler.class);
+
+ KafkaCrossDcConf conf = new KafkaCrossDcConf(bootstrapServers, topicName, false, null);
KafkaMirroringSink sink = new KafkaMirroringSink(conf);
Closer closer = new Closer(sink);
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
new file mode 100644
index 0000000..a1bd0e4
--- /dev/null
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
@@ -0,0 +1,180 @@
+package org.apache.solr.crossdc;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.lucene.util.QuickPatchThreadsFilter;
+import org.apache.solr.SolrIgnoredThreadsFilter;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
+import org.apache.solr.crossdc.consumer.Consumer;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.lang.invoke.MethodHandles;
+import java.util.Properties;
+
+@ThreadLeakFilters(defaultFilters = true, filters = { SolrIgnoredThreadsFilter.class,
+ QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class })
+@ThreadLeakLingering(linger = 5000) public class ZkConfigIntegrationTest extends
+ SolrTestCaseJ4 {
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ static final String VERSION_FIELD = "_version_";
+
+ private static final int NUM_BROKERS = 1;
+ public static EmbeddedKafkaCluster kafkaCluster;
+
+ protected static volatile MiniSolrCloudCluster solrCluster1;
+ protected static volatile MiniSolrCloudCluster solrCluster2;
+
+ protected static volatile Consumer consumer = new Consumer();
+
+ private static String TOPIC = "topic1";
+
+ private static String COLLECTION = "collection1";
+
+ @BeforeClass
+ public static void beforeSolrAndKafkaIntegrationTest() throws Exception {
+
+ Properties config = new Properties();
+ config.put("unclean.leader.election.enable", "true");
+ config.put("enable.partition.eof", "false");
+
+ kafkaCluster = new EmbeddedKafkaCluster(NUM_BROKERS, config) {
+ public String bootstrapServers() {
+ return super.bootstrapServers().replaceAll("localhost", "127.0.0.1");
+ }
+ };
+ kafkaCluster.start();
+
+ kafkaCluster.createTopic(TOPIC, 1, 1);
+
+ // System.setProperty("topicName", null);
+ // System.setProperty("bootstrapServers", null);
+
+ Properties props = new Properties();
+
+ solrCluster1 = new SolrCloudTestCase.Builder(1, createTempDir()).addConfig("conf",
+ getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
+
+ props.setProperty("topicName", TOPIC);
+ props.setProperty("bootstrapServers", kafkaCluster.bootstrapServers());
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ props.store(baos, "");
+ byte[] data = baos.toByteArray();
+ solrCluster1.getSolrClient().getZkStateReader().getZkClient().makePath("/crossdc.properties", data, true);
+
+ CollectionAdminRequest.Create create =
+ CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
+ solrCluster1.getSolrClient().request(create);
+ solrCluster1.waitForActiveCollection(COLLECTION, 1, 1);
+
+ solrCluster1.getSolrClient().setDefaultCollection(COLLECTION);
+
+ solrCluster2 = new SolrCloudTestCase.Builder(1, createTempDir()).addConfig("conf",
+ getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
+
+ solrCluster2.getSolrClient().getZkStateReader().getZkClient().makePath("/crossdc.properties", data, true);
+
+ CollectionAdminRequest.Create create2 =
+ CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
+ solrCluster2.getSolrClient().request(create2);
+ solrCluster2.waitForActiveCollection(COLLECTION, 1, 1);
+
+ solrCluster2.getSolrClient().setDefaultCollection(COLLECTION);
+
+ String bootstrapServers = kafkaCluster.bootstrapServers();
+ log.info("bootstrapServers={}", bootstrapServers);
+
+ consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, false, 0);
+
+ }
+
+ @AfterClass
+ public static void afterSolrAndKafkaIntegrationTest() throws Exception {
+ ObjectReleaseTracker.clear();
+
+ consumer.shutdown();
+
+ try {
+ kafkaCluster.stop();
+ } catch (Exception e) {
+ log.error("Exception stopping Kafka cluster", e);
+ }
+
+ if (solrCluster1 != null) {
+ solrCluster1.getZkServer().getZkClient().printLayoutToStdOut();
+ solrCluster1.shutdown();
+ }
+ if (solrCluster2 != null) {
+ solrCluster2.getZkServer().getZkClient().printLayoutToStdOut();
+ solrCluster2.shutdown();
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+ solrCluster1.getSolrClient().deleteByQuery("*:*");
+ solrCluster2.getSolrClient().deleteByQuery("*:*");
+ solrCluster1.getSolrClient().commit();
+ solrCluster2.getSolrClient().commit();
+ }
+
+ public void testConfigFromZkPickedUp() throws Exception {
+ Thread.sleep(10000); // TODO why?
+
+ CloudSolrClient client = solrCluster1.getSolrClient();
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", String.valueOf(System.currentTimeMillis()));
+ doc.addField("text", "some test");
+
+ client.add(doc);
+
+ client.commit(COLLECTION);
+
+ System.out.println("Sent producer record");
+
+ QueryResponse results = null;
+ boolean foundUpdates = false;
+ for (int i = 0; i < 50; i++) {
+ solrCluster2.getSolrClient().commit(COLLECTION);
+ solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+ results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+ if (results.getResults().getNumFound() == 1) {
+ foundUpdates = true;
+ } else {
+ Thread.sleep(500);
+ }
+ }
+
+ System.out.println("Closed producer");
+
+ assertTrue("results=" + results, foundUpdates);
+ System.out.println("Rest: " + results);
+
+ }
+
+}
diff --git a/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml b/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml
index 6e057ab..6917e48 100644
--- a/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml
+++ b/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml
@@ -110,7 +110,8 @@
<updateRequestProcessorChain name="mirrorUpdateChain" default="true">
<processor class="org.apache.solr.update.processor.MirroringUpdateRequestProcessorFactory">
-
+ <str name="bootstrapServers">${bootstrapServers:}</str>
+ <str name="topicName">${topicName:}</str>
</processor>
<processor class="solr.LogUpdateProcessorFactory" />
<processor class="solr.RunUpdateProcessorFactory" />
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index ffed3a2..aa991fc 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-7.2-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-7.4.2-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists