You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by an...@apache.org on 2022/05/16 20:29:49 UTC

[solr-sandbox] branch crossdc-wip updated: Integration tests for cross dc consumer with SolrCloud (#14)

This is an automated email from the ASF dual-hosted git repository.

anshum pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/crossdc-wip by this push:
     new 9b18e46  Integration tests for cross dc consumer with SolrCloud (#14)
9b18e46 is described below

commit 9b18e46f92fb18a6537653c213fbf9afa987513a
Author: Mark Robert Miller <ma...@apache.org>
AuthorDate: Mon May 16 15:29:44 2022 -0500

    Integration tests for cross dc consumer with SolrCloud (#14)
    
    * Support for integration tests with SolrCloud.
    * Add back in the EmbeddedKafkaCluster.
---
 .../org/apache/solr/crossdc/consumer/Consumer.java |   2 +
 .../configs/cloud-minimal/conf/schema.xml          |  54 ++++++++++
 .../configs/cloud-minimal/conf/solrconfig.xml      | 112 +++++++++++++++++++++
 crossdc-consumer/src/resources/log4j2.xml          |  42 ++++++++
 ...lrIntegrationTest.java => IntegrationTest.java} |  36 +++----
 .../solr/crossdc/SimpleSolrIntegrationTest.java    |   1 +
 .../solr/crossdc/SolrAndKafkaIntegrationTest.java  |  58 +++++++++++
 7 files changed, 285 insertions(+), 20 deletions(-)

diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
index e99b09e..62e0a84 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -19,7 +19,9 @@ package org.apache.solr.crossdc.consumer;
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.crossdc.KafkaMirroringSink;
 import org.apache.solr.crossdc.MirroringException;
diff --git a/crossdc-consumer/src/resources/configs/cloud-minimal/conf/schema.xml b/crossdc-consumer/src/resources/configs/cloud-minimal/conf/schema.xml
new file mode 100644
index 0000000..bc4676c
--- /dev/null
+++ b/crossdc-consumer/src/resources/configs/cloud-minimal/conf/schema.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<schema name="minimal" version="1.1">
+    <fieldType name="boolean" class="solr.BoolField" sortMissingLast="true"/>
+    <fieldType name="string" class="solr.StrField" docValues="true"/>
+    <fieldType name="int" class="org.apache.solr.schema.IntPointField" docValues="true" omitNorms="true"
+               positionIncrementGap="0"/>
+    <fieldType name="long" class="org.apache.solr.schema.LongPointField" docValues="true" omitNorms="true"
+               positionIncrementGap="0"/>
+    <fieldType name="float" class="org.apache.solr.schema.FloatPointField" docValues="true" omitNorms="true"
+               positionIncrementGap="0"/>
+    <fieldType name="double" class="org.apache.solr.schema.DoublePointField" docValues="true" omitNorms="true"
+               positionIncrementGap="0"/>
+    <fieldType name="date" class="org.apache.solr.schema.DatePointField" docValues="true" omitNorms="true"
+               positionIncrementGap="0"/>
+    <fieldType name="text" class="solr.TextField">
+        <analyzer>
+            <tokenizer class="solr.StandardTokenizerFactory"/>
+            <filter class="solr.LowerCaseFilterFactory"/>
+        </analyzer>
+    </fieldType>
+
+    <!-- for versioning -->
+    <field name="_version_" type="long" indexed="true" stored="true"/>
+    <field name="_root_" type="string" indexed="true" stored="true" multiValued="false" required="false"/>
+    <field name="id" type="string" indexed="true" stored="true"/>
+    <field name="text" type="text" indexed="true" stored="false"/>
+
+    <dynamicField name="*_b" type="boolean" indexed="true" stored="true"/>
+    <dynamicField name="*_s" type="string" indexed="true" stored="false"/>
+    <dynamicField name="*_t" type="text" indexed="true" stored="false"/>
+    <dynamicField name="*_i" type="int" indexed="false" stored="false"/>
+    <dynamicField name="*_l" type="long" indexed="false" stored="false"/>
+    <dynamicField name="*_f" type="float" indexed="false" stored="false"/>
+    <dynamicField name="*_d" type="double" indexed="false" stored="false"/>
+    <dynamicField name="*_dt" type="date" indexed="false" stored="false"/>
+
+    <uniqueKey>id</uniqueKey>
+</schema>
diff --git a/crossdc-consumer/src/resources/configs/cloud-minimal/conf/solrconfig.xml b/crossdc-consumer/src/resources/configs/cloud-minimal/conf/solrconfig.xml
new file mode 100644
index 0000000..20caf3b
--- /dev/null
+++ b/crossdc-consumer/src/resources/configs/cloud-minimal/conf/solrconfig.xml
@@ -0,0 +1,112 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- Minimal solrconfig.xml with /select, /admin and /update only -->
+
+<config>
+
+  <dataDir>${solr.data.dir:}</dataDir>
+
+  <directoryFactory name="DirectoryFactory"
+                    class="${directoryFactory:solr.NRTCachingDirectoryFactory}"/>
+  <schemaFactory class="ClassicIndexSchemaFactory"/>
+
+  <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+  <indexConfig>
+    <mergePolicyFactory class="${mergePolicyFactory:org.apache.solr.index.TieredMergePolicyFactory}">
+      <int name="maxMergeAtOnce">${maxMergeAtOnce:10}</int>
+      <int name="segmentsPerTier">${segmentsPerTier:10}</int>
+      <double name="noCFSRatio">${noCFSRatio:.1}</double>
+    </mergePolicyFactory>
+
+    <useCompoundFile>${useCompoundFile:true}</useCompoundFile>
+
+    <ramBufferSizeMB>${ramBufferSizeMB:160}</ramBufferSizeMB>
+    <maxBufferedDocs>${maxBufferedDocs:250000}</maxBufferedDocs>     <!-- Force the common case to flush by doc count  -->
+    <!-- <ramPerThreadHardLimitMB>60</ramPerThreadHardLimitMB> -->
+
+    <!-- <mergeScheduler class="org.apache.lucene.index.ConcurrentMergeScheduler">
+      <int name="maxThreadCount">6</int>
+      <int name="maxMergeCount">8</int>
+      <bool name="ioThrottle">false</bool>
+    </mergeScheduler> -->
+
+    <writeLockTimeout>1000</writeLockTimeout>
+    <commitLockTimeout>10000</commitLockTimeout>
+
+    <!-- this sys property is not set by SolrTestCaseJ4 because almost all tests should
+         use the single process lockType for speed - but tests that explicitly need
+         to vary the lockType canset it as needed.
+    -->
+    <lockType>${lockType:single}</lockType>
+
+    <infoStream>${infostream:false}</infoStream>
+
+  </indexConfig>
+
+  <updateHandler class="solr.DirectUpdateHandler2">
+    <commitWithin>
+      <softCommit>${commitwithin.softcommit:true}</softCommit>
+    </commitWithin>
+    <autoCommit>
+      <maxTime>${autoCommit.maxTime:60000}</maxTime>
+    </autoCommit>
+    <updateLog class="${ulog:solr.UpdateLog}" enable="${enable.update.log:true}"/>
+  </updateHandler>
+
+  <requestHandler name="/select" class="solr.SearchHandler">
+    <lst name="defaults">
+      <str name="echoParams">explicit</str>
+      <str name="indent">true</str>
+      <str name="df">text</str>
+    </lst>
+
+  </requestHandler>
+
+  <query>
+    <queryResultCache
+            enabled="${queryResultCache.enabled:false}"
+            class="${queryResultCache.class:solr.CaffeineCache}"
+            size="${queryResultCache.size:0}"
+            initialSize="${queryResultCache.initialSize:0}"
+            autowarmCount="${queryResultCache.autowarmCount:0}"/>
+      <documentCache
+              enabled="${documentCache.enabled:false}"
+              class="${documentCache.class:solr.CaffeineCache}"
+              size="${documentCache.size:0}"
+              initialSize="${documentCache.initialSize:0}"
+              autowarmCount="${documentCache.autowarmCount:0}"/>
+      <filterCache
+              enabled ="${filterCache.enabled:false}"
+              class="${filterCache.class:solr.CaffeineCache}"
+              size="${filterCache.size:1}"
+              initialSize="${filterCache.initialSize:1}"
+              autowarmCount="${filterCache.autowarmCount:0}"
+              async="${filterCache.async:false}"/>
+    <cache name="myPerSegmentCache"
+           enabled="${myPerSegmentCache.enabled:false}"
+           class="${myPerSegmentCache.class:solr.CaffeineCache}"
+           size="${myPerSegmentCache.size:0}"
+           initialSize="${myPerSegmentCache.initialSize:0}"
+           autowarmCount="${myPerSegmentCache.autowarmCount:0}"/>
+  </query>
+
+</config>
+
diff --git a/crossdc-consumer/src/resources/log4j2.xml b/crossdc-consumer/src/resources/log4j2.xml
new file mode 100644
index 0000000..96f69f1
--- /dev/null
+++ b/crossdc-consumer/src/resources/log4j2.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<!-- We're configuring testing to be synchronous due to "logging polution", see SOLR-13268 -->
+<Configuration>
+  <Appenders>
+    <Console name="STDERR" target="SYSTEM_ERR">
+      <PatternLayout>
+        <Pattern>
+          %maxLen{%-4r %-5p (%t) [%notEmpty{n:%X{node_name}}%notEmpty{ c:%X{collection}}%notEmpty{ s:%X{shard}}%notEmpty{ r:%X{replica}}%notEmpty{ x:%X{core}}%notEmpty{ t:%X{trace_id}}] %c{1.} %m%notEmpty{
+          =>%ex{short}}}{10240}%n
+        </Pattern>
+      </PatternLayout>
+    </Console>
+  </Appenders>
+  <Loggers>
+    <!-- Use <AsyncLogger/<AsyncRoot and <Logger/<Root for asynchronous logging or synchonous logging respectively -->
+    <Logger name="org.apache.zookeeper" level="WARN"/>
+    <Logger name="org.apache.hadoop" level="WARN"/>
+    <Logger name="org.apache.directory" level="WARN"/>
+    <Logger name="org.apache.solr.hadoop" level="INFO"/>
+    <Logger name="org.eclipse.jetty" level="INFO"/>
+
+    <Root level="INFO">
+      <AppenderRef ref="STDERR"/>
+    </Root>
+  </Loggers>
+</Configuration>
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/IntegrationTest.java
similarity index 74%
copy from crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
copy to crossdc-consumer/src/test/java/org/apache/solr/crossdc/IntegrationTest.java
index 547fdca..695ebab 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/IntegrationTest.java
@@ -1,7 +1,6 @@
 package org.apache.solr.crossdc;
 
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.cloud.MiniSolrCloudCluster;
 import org.apache.solr.cloud.SolrCloudTestCase;
@@ -9,48 +8,40 @@ import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.crossdc.common.MirroredSolrRequest;
 import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 import static org.mockito.Mockito.spy;
 
-public class SimpleSolrIntegrationTest extends SolrCloudTestCase {
+public class IntegrationTest extends SolrCloudTestCase {
   static final String VERSION_FIELD = "_version_";
 
-  private static final int NUM_BROKERS = 1;
-
   protected static volatile MiniSolrCloudCluster cluster1;
-
+  protected static volatile MiniSolrCloudCluster cluster2;
   private static SolrMessageProcessor processor;
 
   private static ResubmitBackoffPolicy backoffPolicy = spy(new TestMessageProcessor.NoOpResubmitBackoffPolicy());
-  private static CloudSolrClient cloudClient1;
 
   @BeforeClass
   public static void setupIntegrationTest() throws Exception {
 
     cluster1 =
         new SolrCloudTestCase.Builder(2, createTempDir())
-            .addConfig("conf", getFile("src/test/resources/configs/cloud-minimal/conf").toPath())
+            .addConfig("conf", getFile("src/resources/configs/cloud-minimal/conf").toPath())
             .configure();
 
-    String collection = "collection1";
-    cloudClient1 = cluster1.getSolrClient();
-
-    processor = new SolrMessageProcessor(cloudClient1, backoffPolicy);
-
-    CollectionAdminRequest.Create create =
-        CollectionAdminRequest.createCollection(collection, "conf", 1, 1);
-    cloudClient1.request(create);
-    cluster1.waitForActiveCollection(collection, 1, 1);
-
-    cloudClient1.setDefaultCollection(collection);
+    processor = new SolrMessageProcessor(cluster1.getSolrClient(), backoffPolicy);
   }
 
   @AfterClass
   public static void tearDownIntegrationTest() throws Exception {
-    if (cluster1 != null) {
+    if (cluster != null) {
       cluster1.shutdown();
     }
   }
@@ -76,7 +67,7 @@ public class SimpleSolrIntegrationTest extends SolrCloudTestCase {
     request.deleteById("2", 10L);
 
     request.setParam("shouldMirror", "true");
-
+    // The response is irrelevant, but it will fail because mocked server returns null when processing
     processor.handleItem(new MirroredSolrRequest(request));
 
     // After processing, check that all version fields are stripped
@@ -92,4 +83,9 @@ public class SimpleSolrIntegrationTest extends SolrCloudTestCase {
       }
     }
   }
+
+  @Test
+  public void TestMethod() {
+
+  }
 }
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
index 547fdca..dec570c 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
@@ -1,3 +1,4 @@
+    }
 package org.apache.solr.crossdc;
 
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
index d2bbe7f..147f2be 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
@@ -1,5 +1,6 @@
 package org.apache.solr.crossdc;
 
+<<<<<<< refs/remotes/markrmiller/itwip
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakAction;
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -16,6 +17,7 @@ import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.crossdc.common.MirroredSolrRequest;
 import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
 import org.apache.solr.crossdc.consumer.Consumer;
+import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.slf4j.Logger;
@@ -23,12 +25,29 @@ import org.slf4j.LoggerFactory;
 
 import java.lang.invoke.MethodHandles;
 import java.util.Properties;
+=======
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.lucene.util.QuickPatchThreadsFilter;
+import org.apache.solr.SolrIgnoredThreadsFilter;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.util.Map;
+>>>>>>> Add back in the EmbeddedKafkaCluster.
 
 import static org.mockito.Mockito.spy;
 
 @ThreadLeakFilters(
     defaultFilters = true,
     filters = { SolrIgnoredThreadsFilter.class, QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class})
+<<<<<<< refs/remotes/markrmiller/itwip
 @ThreadLeakAction(ThreadLeakAction.Action.INTERRUPT)
 public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
 
@@ -47,11 +66,23 @@ public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
   private static String TOPIC = "topic1";
   
   private static String COLLECTION = "collection1";
+=======
+public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
+  static final String VERSION_FIELD = "_version_";
+
+  private static final int NUM_BROKERS = 1;
+  public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+
+  protected static volatile MiniSolrCloudCluster cluster1;
+  protected static volatile MiniSolrCloudCluster cluster2;
+  private static SolrMessageProcessor processor;
+>>>>>>> Add back in the EmbeddedKafkaCluster.
 
   private static ResubmitBackoffPolicy backoffPolicy = spy(new TestMessageProcessor.NoOpResubmitBackoffPolicy());
 
   @BeforeClass
   public static void setupIntegrationTest() throws Exception {
+<<<<<<< refs/remotes/markrmiller/itwip
     Properties config = new Properties();
     config.put("unclean.leader.election.enable", "true");
     config.put("enable.partition.eof", "false");
@@ -82,10 +113,22 @@ public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
 
     consumer.start(bootstrapServers, solrCluster1.getZkServer().getZkAddress(), TOPIC, false, 0);
 
+=======
+
+    CLUSTER.start();
+
+    cluster1 =
+        new Builder(2, createTempDir())
+            .addConfig("conf", getFile("src/resources/configs/cloud-minimal/conf").toPath())
+            .configure();
+
+    processor = new SolrMessageProcessor(cluster1.getSolrClient(), backoffPolicy);
+>>>>>>> Add back in the EmbeddedKafkaCluster.
   }
 
   @AfterClass
   public static void tearDownIntegrationTest() throws Exception {
+<<<<<<< refs/remotes/markrmiller/itwip
     consumer.shutdown();
 
     try {
@@ -130,5 +173,20 @@ public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
     System.out.println("Closed producer");
 
     Thread.sleep(10000);
+=======
+
+    CLUSTER.stop();
+
+    if (cluster1 != null) {
+      cluster1.shutdown();
+    }
+    if (cluster2 != null) {
+      cluster2.shutdown();
+    }
+  }
+
+  public void test() {
+
+>>>>>>> Add back in the EmbeddedKafkaCluster.
   }
 }