You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by an...@apache.org on 2022/05/16 20:29:49 UTC
[solr-sandbox] branch crossdc-wip updated: Integration tests for cross dc consumer with SolrCloud (#14)
This is an automated email from the ASF dual-hosted git repository.
anshum pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/crossdc-wip by this push:
new 9b18e46 Integration tests for cross dc consumer with SolrCloud (#14)
9b18e46 is described below
commit 9b18e46f92fb18a6537653c213fbf9afa987513a
Author: Mark Robert Miller <ma...@apache.org>
AuthorDate: Mon May 16 15:29:44 2022 -0500
Integration tests for cross dc consumer with SolrCloud (#14)
* Support for integration tests with SolrCloud.
* Add back in the EmbeddedKafkaCluster.
---
.../org/apache/solr/crossdc/consumer/Consumer.java | 2 +
.../configs/cloud-minimal/conf/schema.xml | 54 ++++++++++
.../configs/cloud-minimal/conf/solrconfig.xml | 112 +++++++++++++++++++++
crossdc-consumer/src/resources/log4j2.xml | 42 ++++++++
...lrIntegrationTest.java => IntegrationTest.java} | 36 +++----
.../solr/crossdc/SimpleSolrIntegrationTest.java | 1 +
.../solr/crossdc/SolrAndKafkaIntegrationTest.java | 58 +++++++++++
7 files changed, 285 insertions(+), 20 deletions(-)
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
index e99b09e..62e0a84 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -19,7 +19,9 @@ package org.apache.solr.crossdc.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.crossdc.KafkaMirroringSink;
import org.apache.solr.crossdc.MirroringException;
diff --git a/crossdc-consumer/src/resources/configs/cloud-minimal/conf/schema.xml b/crossdc-consumer/src/resources/configs/cloud-minimal/conf/schema.xml
new file mode 100644
index 0000000..bc4676c
--- /dev/null
+++ b/crossdc-consumer/src/resources/configs/cloud-minimal/conf/schema.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<schema name="minimal" version="1.1">
+ <fieldType name="boolean" class="solr.BoolField" sortMissingLast="true"/>
+ <fieldType name="string" class="solr.StrField" docValues="true"/>
+ <fieldType name="int" class="org.apache.solr.schema.IntPointField" docValues="true" omitNorms="true"
+ positionIncrementGap="0"/>
+ <fieldType name="long" class="org.apache.solr.schema.LongPointField" docValues="true" omitNorms="true"
+ positionIncrementGap="0"/>
+ <fieldType name="float" class="org.apache.solr.schema.FloatPointField" docValues="true" omitNorms="true"
+ positionIncrementGap="0"/>
+ <fieldType name="double" class="org.apache.solr.schema.DoublePointField" docValues="true" omitNorms="true"
+ positionIncrementGap="0"/>
+ <fieldType name="date" class="org.apache.solr.schema.DatePointField" docValues="true" omitNorms="true"
+ positionIncrementGap="0"/>
+ <fieldType name="text" class="solr.TextField">
+ <analyzer>
+ <tokenizer class="solr.StandardTokenizerFactory"/>
+ <filter class="solr.LowerCaseFilterFactory"/>
+ </analyzer>
+ </fieldType>
+
+ <!-- for versioning -->
+ <field name="_version_" type="long" indexed="true" stored="true"/>
+ <field name="_root_" type="string" indexed="true" stored="true" multiValued="false" required="false"/>
+ <field name="id" type="string" indexed="true" stored="true"/>
+ <field name="text" type="text" indexed="true" stored="false"/>
+
+ <dynamicField name="*_b" type="boolean" indexed="true" stored="true"/>
+ <dynamicField name="*_s" type="string" indexed="true" stored="false"/>
+ <dynamicField name="*_t" type="text" indexed="true" stored="false"/>
+ <dynamicField name="*_i" type="int" indexed="false" stored="false"/>
+ <dynamicField name="*_l" type="long" indexed="false" stored="false"/>
+ <dynamicField name="*_f" type="float" indexed="false" stored="false"/>
+ <dynamicField name="*_d" type="double" indexed="false" stored="false"/>
+ <dynamicField name="*_dt" type="date" indexed="false" stored="false"/>
+
+ <uniqueKey>id</uniqueKey>
+</schema>
diff --git a/crossdc-consumer/src/resources/configs/cloud-minimal/conf/solrconfig.xml b/crossdc-consumer/src/resources/configs/cloud-minimal/conf/solrconfig.xml
new file mode 100644
index 0000000..20caf3b
--- /dev/null
+++ b/crossdc-consumer/src/resources/configs/cloud-minimal/conf/solrconfig.xml
@@ -0,0 +1,112 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- Minimal solrconfig.xml with /select, /admin and /update only -->
+
+<config>
+
+ <dataDir>${solr.data.dir:}</dataDir>
+
+ <directoryFactory name="DirectoryFactory"
+ class="${directoryFactory:solr.NRTCachingDirectoryFactory}"/>
+ <schemaFactory class="ClassicIndexSchemaFactory"/>
+
+ <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+ <indexConfig>
+ <mergePolicyFactory class="${mergePolicyFactory:org.apache.solr.index.TieredMergePolicyFactory}">
+ <int name="maxMergeAtOnce">${maxMergeAtOnce:10}</int>
+ <int name="segmentsPerTier">${segmentsPerTier:10}</int>
+ <double name="noCFSRatio">${noCFSRatio:.1}</double>
+ </mergePolicyFactory>
+
+ <useCompoundFile>${useCompoundFile:true}</useCompoundFile>
+
+ <ramBufferSizeMB>${ramBufferSizeMB:160}</ramBufferSizeMB>
+ <maxBufferedDocs>${maxBufferedDocs:250000}</maxBufferedDocs> <!-- Force the common case to flush by doc count -->
+ <!-- <ramPerThreadHardLimitMB>60</ramPerThreadHardLimitMB> -->
+
+ <!-- <mergeScheduler class="org.apache.lucene.index.ConcurrentMergeScheduler">
+ <int name="maxThreadCount">6</int>
+ <int name="maxMergeCount">8</int>
+ <bool name="ioThrottle">false</bool>
+ </mergeScheduler> -->
+
+ <writeLockTimeout>1000</writeLockTimeout>
+ <commitLockTimeout>10000</commitLockTimeout>
+
+ <!-- this sys property is not set by SolrTestCaseJ4 because almost all tests should
+ use the single process lockType for speed - but tests that explicitly need
+ to vary the lockType canset it as needed.
+ -->
+ <lockType>${lockType:single}</lockType>
+
+ <infoStream>${infostream:false}</infoStream>
+
+ </indexConfig>
+
+ <updateHandler class="solr.DirectUpdateHandler2">
+ <commitWithin>
+ <softCommit>${commitwithin.softcommit:true}</softCommit>
+ </commitWithin>
+ <autoCommit>
+ <maxTime>${autoCommit.maxTime:60000}</maxTime>
+ </autoCommit>
+ <updateLog class="${ulog:solr.UpdateLog}" enable="${enable.update.log:true}"/>
+ </updateHandler>
+
+ <requestHandler name="/select" class="solr.SearchHandler">
+ <lst name="defaults">
+ <str name="echoParams">explicit</str>
+ <str name="indent">true</str>
+ <str name="df">text</str>
+ </lst>
+
+ </requestHandler>
+
+ <query>
+ <queryResultCache
+ enabled="${queryResultCache.enabled:false}"
+ class="${queryResultCache.class:solr.CaffeineCache}"
+ size="${queryResultCache.size:0}"
+ initialSize="${queryResultCache.initialSize:0}"
+ autowarmCount="${queryResultCache.autowarmCount:0}"/>
+ <documentCache
+ enabled="${documentCache.enabled:false}"
+ class="${documentCache.class:solr.CaffeineCache}"
+ size="${documentCache.size:0}"
+ initialSize="${documentCache.initialSize:0}"
+ autowarmCount="${documentCache.autowarmCount:0}"/>
+ <filterCache
+ enabled ="${filterCache.enabled:false}"
+ class="${filterCache.class:solr.CaffeineCache}"
+ size="${filterCache.size:1}"
+ initialSize="${filterCache.initialSize:1}"
+ autowarmCount="${filterCache.autowarmCount:0}"
+ async="${filterCache.async:false}"/>
+ <cache name="myPerSegmentCache"
+ enabled="${myPerSegmentCache.enabled:false}"
+ class="${myPerSegmentCache.class:solr.CaffeineCache}"
+ size="${myPerSegmentCache.size:0}"
+ initialSize="${myPerSegmentCache.initialSize:0}"
+ autowarmCount="${myPerSegmentCache.autowarmCount:0}"/>
+ </query>
+
+</config>
+
diff --git a/crossdc-consumer/src/resources/log4j2.xml b/crossdc-consumer/src/resources/log4j2.xml
new file mode 100644
index 0000000..96f69f1
--- /dev/null
+++ b/crossdc-consumer/src/resources/log4j2.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<!-- We're configuring testing to be synchronous due to "logging polution", see SOLR-13268 -->
+<Configuration>
+ <Appenders>
+ <Console name="STDERR" target="SYSTEM_ERR">
+ <PatternLayout>
+ <Pattern>
+ %maxLen{%-4r %-5p (%t) [%notEmpty{n:%X{node_name}}%notEmpty{ c:%X{collection}}%notEmpty{ s:%X{shard}}%notEmpty{ r:%X{replica}}%notEmpty{ x:%X{core}}%notEmpty{ t:%X{trace_id}}] %c{1.} %m%notEmpty{
+ =>%ex{short}}}{10240}%n
+ </Pattern>
+ </PatternLayout>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <!-- Use <AsyncLogger/<AsyncRoot and <Logger/<Root for asynchronous logging or synchonous logging respectively -->
+ <Logger name="org.apache.zookeeper" level="WARN"/>
+ <Logger name="org.apache.hadoop" level="WARN"/>
+ <Logger name="org.apache.directory" level="WARN"/>
+ <Logger name="org.apache.solr.hadoop" level="INFO"/>
+ <Logger name="org.eclipse.jetty" level="INFO"/>
+
+ <Root level="INFO">
+ <AppenderRef ref="STDERR"/>
+ </Root>
+ </Loggers>
+</Configuration>
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/IntegrationTest.java
similarity index 74%
copy from crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
copy to crossdc-consumer/src/test/java/org/apache/solr/crossdc/IntegrationTest.java
index 547fdca..695ebab 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/IntegrationTest.java
@@ -1,7 +1,6 @@
package org.apache.solr.crossdc;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.cloud.SolrCloudTestCase;
@@ -9,48 +8,40 @@ import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.crossdc.common.MirroredSolrRequest;
import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import static org.mockito.Mockito.spy;
-public class SimpleSolrIntegrationTest extends SolrCloudTestCase {
+public class IntegrationTest extends SolrCloudTestCase {
static final String VERSION_FIELD = "_version_";
- private static final int NUM_BROKERS = 1;
-
protected static volatile MiniSolrCloudCluster cluster1;
-
+ protected static volatile MiniSolrCloudCluster cluster2;
private static SolrMessageProcessor processor;
private static ResubmitBackoffPolicy backoffPolicy = spy(new TestMessageProcessor.NoOpResubmitBackoffPolicy());
- private static CloudSolrClient cloudClient1;
@BeforeClass
public static void setupIntegrationTest() throws Exception {
cluster1 =
new SolrCloudTestCase.Builder(2, createTempDir())
- .addConfig("conf", getFile("src/test/resources/configs/cloud-minimal/conf").toPath())
+ .addConfig("conf", getFile("src/resources/configs/cloud-minimal/conf").toPath())
.configure();
- String collection = "collection1";
- cloudClient1 = cluster1.getSolrClient();
-
- processor = new SolrMessageProcessor(cloudClient1, backoffPolicy);
-
- CollectionAdminRequest.Create create =
- CollectionAdminRequest.createCollection(collection, "conf", 1, 1);
- cloudClient1.request(create);
- cluster1.waitForActiveCollection(collection, 1, 1);
-
- cloudClient1.setDefaultCollection(collection);
+ processor = new SolrMessageProcessor(cluster1.getSolrClient(), backoffPolicy);
}
@AfterClass
public static void tearDownIntegrationTest() throws Exception {
- if (cluster1 != null) {
+ if (cluster != null) {
cluster1.shutdown();
}
}
@@ -76,7 +67,7 @@ public class SimpleSolrIntegrationTest extends SolrCloudTestCase {
request.deleteById("2", 10L);
request.setParam("shouldMirror", "true");
-
+ // The response is irrelevant, but it will fail because mocked server returns null when processing
processor.handleItem(new MirroredSolrRequest(request));
// After processing, check that all version fields are stripped
@@ -92,4 +83,9 @@ public class SimpleSolrIntegrationTest extends SolrCloudTestCase {
}
}
}
+
+ @Test
+ public void TestMethod() {
+
+ }
}
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
index 547fdca..dec570c 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
@@ -1,3 +1,4 @@
+ }
package org.apache.solr.crossdc;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
index d2bbe7f..147f2be 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
@@ -1,5 +1,6 @@
package org.apache.solr.crossdc;
+<<<<<<< refs/remotes/markrmiller/itwip
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakAction;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -16,6 +17,7 @@ import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.crossdc.common.MirroredSolrRequest;
import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
import org.apache.solr.crossdc.consumer.Consumer;
+import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
@@ -23,12 +25,29 @@ import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.Properties;
+=======
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.lucene.util.QuickPatchThreadsFilter;
+import org.apache.solr.SolrIgnoredThreadsFilter;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.util.Map;
+>>>>>>> Add back in the EmbeddedKafkaCluster.
import static org.mockito.Mockito.spy;
@ThreadLeakFilters(
defaultFilters = true,
filters = { SolrIgnoredThreadsFilter.class, QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class})
+<<<<<<< refs/remotes/markrmiller/itwip
@ThreadLeakAction(ThreadLeakAction.Action.INTERRUPT)
public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
@@ -47,11 +66,23 @@ public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
private static String TOPIC = "topic1";
private static String COLLECTION = "collection1";
+=======
+public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
+ static final String VERSION_FIELD = "_version_";
+
+ private static final int NUM_BROKERS = 1;
+ public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+
+ protected static volatile MiniSolrCloudCluster cluster1;
+ protected static volatile MiniSolrCloudCluster cluster2;
+ private static SolrMessageProcessor processor;
+>>>>>>> Add back in the EmbeddedKafkaCluster.
private static ResubmitBackoffPolicy backoffPolicy = spy(new TestMessageProcessor.NoOpResubmitBackoffPolicy());
@BeforeClass
public static void setupIntegrationTest() throws Exception {
+<<<<<<< refs/remotes/markrmiller/itwip
Properties config = new Properties();
config.put("unclean.leader.election.enable", "true");
config.put("enable.partition.eof", "false");
@@ -82,10 +113,22 @@ public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
consumer.start(bootstrapServers, solrCluster1.getZkServer().getZkAddress(), TOPIC, false, 0);
+=======
+
+ CLUSTER.start();
+
+ cluster1 =
+ new Builder(2, createTempDir())
+ .addConfig("conf", getFile("src/resources/configs/cloud-minimal/conf").toPath())
+ .configure();
+
+ processor = new SolrMessageProcessor(cluster1.getSolrClient(), backoffPolicy);
+>>>>>>> Add back in the EmbeddedKafkaCluster.
}
@AfterClass
public static void tearDownIntegrationTest() throws Exception {
+<<<<<<< refs/remotes/markrmiller/itwip
consumer.shutdown();
try {
@@ -130,5 +173,20 @@ public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
System.out.println("Closed producer");
Thread.sleep(10000);
+=======
+
+ CLUSTER.stop();
+
+ if (cluster1 != null) {
+ cluster1.shutdown();
+ }
+ if (cluster2 != null) {
+ cluster2.shutdown();
+ }
+ }
+
+ public void test() {
+
+>>>>>>> Add back in the EmbeddedKafkaCluster.
}
}