You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by an...@apache.org on 2022/08/10 21:03:27 UTC

[solr-sandbox] branch crossdc-wip updated: Reindex Test, cover a few more test cases, additional config options. (#32)

This is an automated email from the ASF dual-hosted git repository.

anshum pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/crossdc-wip by this push:
     new 28b1d36  Reindex Test, cover a few more test cases, additional config options. (#32)
28b1d36 is described below

commit 28b1d36bd4e85063e1d98106736b95a438aa17e5
Author: Mark Robert Miller <ma...@apache.org>
AuthorDate: Wed Aug 10 16:03:23 2022 -0500

    Reindex Test, cover a few more test cases, additional config options. (#32)
    
    * Reindex Test, cover a few more test cases, additional config options.
    
    Adds a basic test focused on reindexing, covers a few more test cases: more replicas and shards, different shard count on primary and secondary dc, allows consumer group to be configured, allows crossdc zk prop file location to be configured.
    
    * Remove TODO, some minor fixes, doc.
    
    * Add maxPolledRecords and some logging cleanup.
---
 CROSSDC.md                                         | 191 +++++++++++++++++++++
 .../apache/solr/crossdc/common/CrossDcConf.java    |   1 +
 .../solr/crossdc/common/KafkaCrossDcConf.java      |  22 ++-
 .../solr/crossdc/common/KafkaMirroringSink.java    |   5 +-
 .../common/MirroredSolrRequestSerializer.java      |   5 +-
 .../org/apache/solr/crossdc/consumer/Consumer.java |  71 +++++---
 .../crossdc/consumer/KafkaCrossDcConsumer.java     |   4 +-
 .../processor/KafkaRequestMirroringHandler.java    |   2 +-
 .../update/processor/MirroringUpdateProcessor.java |   2 +-
 .../MirroringUpdateRequestProcessorFactory.java    |   8 +-
 .../apache/solr/crossdc/DeleteByQueryToIdTest.java |   2 +-
 .../solr/crossdc/RetryQueueIntegrationTest.java    |   2 +-
 .../solr/crossdc/SolrAndKafkaIntegrationTest.java  |   2 +-
 ...ationTest.java => SolrAndKafkaReindexTest.java} | 170 +++++++++++-------
 .../solr/crossdc/ZkConfigIntegrationTest.java      |   2 +-
 15 files changed, 391 insertions(+), 98 deletions(-)

diff --git a/CROSSDC.md b/CROSSDC.md
index e69de29..228ef31 100644
--- a/CROSSDC.md
+++ b/CROSSDC.md
@@ -0,0 +1,191 @@
+# Solr Cross DC: Getting Started
+
+**A simple cross-data-center fail-over solution for Apache Solr.**
+
+
+
+[TOC]
+
+## Overview
+
+The design for this feature involves three key components:
+
+- A UpdateProccessor plugin for Solr to forward updates from the primary data center.
+- An update request consumer application to receive updates in the backup data center.
+- A distributed queue to connect the above two.
+
+The UpdateProcessor plugin is called the CrossDC Producer, the consumer application is called the CrossDC Consumer, and the supported distributed queue application is Apache Kafka.
+
+## Getting Started
+
+To use Solr Cross DC, you must complete the following steps:
+
+- Startup or obtain access to an Apache Kafka cluster to provide the distributed queue between data centers.
+- Install the CrossDC Solr plugin on each of the nodes in your Solr cluster (in your primary and backup data centers) by placing the jar in the correct location and configuring solrconfig.xml to reference the new UpdateProcessor and then configure it for the Kafka cluster.
+- Install the CrossDC consumer application in the backup data center and configure it for the Kafka cluster and the Solr cluster it will send consumed updates to.
+
+The Solr UpdateProccessor plugin will intercept updates when the node acts as the leader and then put those updates onto the distributed queue. The CrossDC Consumer application will poll the distributed queue and forward updates on to the configured Solr cluster upon receiving the update requests.
+
+### Configuration and Startup
+
+The current configuration options are entirely minimal. Further configuration options will be added over time. At this early stage, some may also change.
+
+#### Installing and Configuring the Cross DC Producer Solr Plug-In
+
+1. Configure the sharedLib directory in solr.xml (eg sharedLIb=lib) and place the CrossDC producer plug-in jar file into the specified folder. It's not advisable to attempt to use the per SolrCore instance directory lib folder as you would have to duplicate the plug-in many times and manage it when creating new collections or adding replicas or shards.
+
+
+**solr.xml**
+
+   ```xml
+   <solr>
+     <str name="sharedLib">${solr.sharedLib:}</str>
+   ```
+
+
+
+2. Configure the new UpdateProcessor in solrconfig.xml
+
+   **NOTE:** `The following is not the recommended configuration approach in production, see the information on central configuration below!`
+
+
+
+**solrconfig.xml**
+
+   ```xml
+   <updateRequestProcessorChain  name="mirrorUpdateChain" default="true">
+   
+     <processor class="org.apache.solr.update.processor.MirroringUpdateRequestProcessorFactory">
+       <str name="bootstrapServers">${bootstrapServers:}</str>
+       <str name="topicName">${topicName:}</str>
+     </processor>
+   
+     <processor class="solr.LogUpdateProcessorFactory" />
+     <processor class="solr.RunUpdateProcessorFactory" />
+   </updateRequestProcessorChain>
+   ```
+
+Notice that this update chain has been declared to be the default chain used.
+
+
+
+##### Configuration Properties
+
+There are two configuration properties. You can specify them directly, or use the above notation to allow them to specified via system property (generally configured for Solr in the bin/solr.in.sh file).
+
+   ```
+   bootstrapServers
+   ```
+
+The list of servers used to connect to the Kafka cluster, see https://kafka.apache.org/28/documentation.html#producerconfigs_bootstrap.servers
+
+   ```
+   topicName 
+   ```
+
+The Kafka topicName used to indicate which Kafka queue the Solr updates will be pushed on to.
+
+
+
+3. Add an external version constraint UpdateProcessor to the update chain added to solrconfig.xml to allow user-provided update versions (as opposed to the two Solr clusters using the independently managed built-in versioning).
+
+   https://solr.apache.org/guide/8_11/update-request-processors.html#general-use-updateprocessorfactories
+
+   https://solr.apache.org/docs/8_1_1/solr-core/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessor.html
+
+
+4. Start or restart the Solr cluster(s).
+
+
+
+#### Installing and Configuring the CrossDC Consumer Application
+
+1. Uncompress the distribution tar or zip file for the CrossDC Consumer into an appropriate install location on a node in the receiving data center.
+2. You can start the Consumer process via the included shell start script at bin/crossdc-consumer.
+3. You can configure the CrossDC Consumer via Java system properties pass in the CROSSDC_CONSUMER_OPTS environment variable, i.e. CROSSDC_CONSUMER_OPTS="-DbootstrapServers=127.0.0.1:2181 -DzkConnectString=127.0.0.1:2181 -DtopicName=crossdc" bin/crossdc-consumer
+
+The required configuration properties are:
+
+
+   *bootstrapServers* - the list of servers used to connect to the Kafka cluster https://kafka.apache.org/28/documentation.html#producerconfigs_bootstrap.servers
+
+   *topicName* - the Kafka topicName used to indicate which Kafka queue the Solr updates will be pushed to.
+
+   *zkConnectString* - the Zookeeper connection string used by Solr to connect to its Zookeeper cluster in the backup data center
+
+Additional configuration properties:
+
+   *groupId* - the group id to give Kafka for the consumer, default to the empty string if not specified.
+
+#### Central Configuration Option
+
+You can optionally manage the configuration centrally in Solr's Zookeeper cluster by placing a properties file called *crossdc.properties* in the root Solr Zookeeper znode, eg, */solr/crossdc.properties*.  This allows you to update the configuration in a central location rather than at each solrconfig.xml in each Solr node and also automatically deals with new Solr nodes or Consumers to come up without requiring additional configuration.
+
+
+
+Both *bootstrapServers* and *topicName* properties can be put in this file, in which case you would not have to specify any Kafka configuration in the solrconfig.xml for the CrossDC Producer Solr plugin. Likewise, for the CrossDC Consumer application, you would only have to set *zkConnectString* for the local Solr cluster. Note that the two components will be looking in the Zookeeper clusters in their respective data center locations.
+
+You can override the properties file location and znode name in Zookeeper using the system property *zkCrossDcPropsPath=/path/to/props_file_name.properties*
+
+#### Making the Cross DC UpdateProcessor Optional in a Common solrconfig.xml
+
+The simplest and least invasive way to control whether the Cross DC UpdateProcessor is on or off for a node is to configure the update chain it's used in to be the default chain or not via Solr's system property configuration syntax.  This syntax takes the form of ${*system_property_name*} and will be substituted with the value of that system property when the configuration is parsed. You can specify a default value using the following syntax: ${*system_property_name*:*default_value*}. Y [...]
+
+*Having a separate updateRequestProcessorChain avoids a lot of additional constraints you have to deal with or consider, now or in the future, when compared to forcing all Cross DC and non-Cross DC use down a single, required, common updateRequestProcessorChain.*
+
+Further, any application consuming the configuration with no concern for enabling Cross DC will not be artificially limited in its ability to define, manage and use updateRequestProcessorChain's.
+
+The following would enable a system property to safely and non invasively enable or disable Cross DC for a node:
+
+
+```xml
+<updateRequestProcessorChain  name="crossdcUpdateChain" default="${crossdcEnabled:false}">
+  <processor class="org.apache.solr.update.processor.MirroringUpdateRequestProcessorFactory">
+    <bool name="enabled">${enabled:false}</bool>
+  </processor>
+  <processor class="solr.LogUpdateProcessorFactory" />
+  <processor class="solr.RunUpdateProcessorFactory" />
+</updateRequestProcessorChain>
+```
+
+
+
+The above configuration would default to Cross DC being disabled with minimal impact to any non-Cross DC use, and Cross DC could be enabled by starting Solr with the system property crossdcEnabled=true.
+
+The last chain to declare it's the default wins, so you can put this at the bottom of almost any existing solrconfig.xml to create an optional Cross DC path without having to audit, understand, adapt, or test existing non-Cross DC paths as other options call for.
+
+The above is the simplest and least obtrusive way to manage an on/off switch for Cross DC.
+
+**Note:** If your configuration already makes use of update handlers and/or updates independently specifying different updateRequestProcessorChains, your solution may end up a bit more sophisticated.
+
+
+
+For situations where you do want to control and enforce a single updateRequestProcessorChain path for every consumer of the solrconfig.xml, it's enough to simply use the *enabled* attribute, turning the processor into a NOOP in the chain.
+
+
+
+```xml
+<updateRequestProcessorChain  name="crossdcUpdateChain">
+  <processor class="org.apache.solr.update.processor.MirroringUpdateRequestProcessorFactory">
+    <bool name="enabled">${enabled:false}</bool>
+  </processor>
+  <processor class="solr.LogUpdateProcessorFactory" />
+  <processor class="solr.RunUpdateProcessorFactory" />
+</updateRequestProcessorChain>
+```
+
+
+
+## Limitations
+
+- Delete-By-Query is not officially supported.
+
+    - Work-In-Progress: A non-efficient option to issue multiple delete by id queries using the results of a given standard query.
+
+    - Simply forwarding a real Delete-By-Query could also be reasonable if it is not strictly reliant on not being reordered with other requests.
+
+
+
+## Additional Notes
+
+In these early days, it may help to reference the *cluster.sh* script located in the root of the CrossDC repository. This script is a helpful developer tool for manual testing and it will download Solr and Kafka and then configure both for Cross DC.
\ No newline at end of file
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
index 26dbc13..5b67f91 100644
--- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
@@ -18,6 +18,7 @@ package org.apache.solr.crossdc.common;
 
 public abstract class CrossDcConf {
     public static final String CROSSDC_PROPERTIES = "/crossdc.properties";
+    public static final String ZK_CROSSDC_PROPS_PATH = "zkCrossDcPropsPath";
 
     public abstract String getClusterName();
 }
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
index c6d1586..a273b43 100644
--- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
@@ -18,18 +18,23 @@ package org.apache.solr.crossdc.common;
 
 public class KafkaCrossDcConf extends CrossDcConf {
     private final String topicName;
+
+    private final String groupId;
     private final boolean enableDataEncryption;
     private final String bootstrapServers;
     private long slowSubmitThresholdInMillis;
     private int numOfRetries = 5;
     private final String solrZkConnectString;
 
+    private final int maxPollRecords;
 
-    public KafkaCrossDcConf(String bootstrapServers, String topicName, boolean enableDataEncryption, String solrZkConnectString) {
+    public KafkaCrossDcConf(String bootstrapServers, String topicName, String groupId, int maxPollRecords, boolean enableDataEncryption, String solrZkConnectString) {
         this.bootstrapServers = bootstrapServers;
         this.topicName = topicName;
         this.enableDataEncryption = enableDataEncryption;
         this.solrZkConnectString = solrZkConnectString;
+        this.groupId = groupId;
+        this.maxPollRecords = maxPollRecords;
     }
     public String getTopicName() {
         return topicName;
@@ -58,20 +63,29 @@ public class KafkaCrossDcConf extends CrossDcConf {
         return null;
     }
 
-  public String getBootStrapServers() {
-        return bootstrapServers;
+    public String getGroupId() {
+        return groupId;
   }
 
+    public int getMaxPollRecords() {
+        return maxPollRecords;
+    }
+
+    public String getBootStrapServers() {
+        return bootstrapServers;
+    }
+
     @Override
     public String toString() {
         return String.format("KafkaCrossDcConf{" +
                 "topicName='%s', " +
+                "groupId='%s', " +
                 "enableDataEncryption='%b', " +
                 "bootstrapServers='%s', " +
                 "slowSubmitThresholdInMillis='%d', " +
                 "numOfRetries='%d', " +
                 "solrZkConnectString='%s'}",
-                topicName, enableDataEncryption, bootstrapServers,
+                topicName, groupId, enableDataEncryption, bootstrapServers,
                 slowSubmitThresholdInMillis, numOfRetries, solrZkConnectString);
     }
 }
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
index 75cf09d..592d34d 100644
--- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
@@ -56,7 +56,10 @@ public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
         try {
 
             producer.send(new ProducerRecord(conf.getTopicName(), request), (metadata, exception) -> {
-                log.info("Producer finished sending metadata={}, exception={}", metadata, exception);
+                if (log.isDebugEnabled()) {
+                    log.debug("Producer finished sending metadata={}, exception={}", metadata,
+                        exception);
+                }
             });
             producer.flush(); // TODO: remove
 
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
index 30f82b2..856f1c9 100644
--- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
@@ -62,7 +62,10 @@ public class MirroredSolrRequestSerializer implements Serializer<MirroredSolrReq
         try {
             solrRequest = (Map) codec.unmarshal(bais);
 
-            log.info("Deserialized class={} solrRequest={}", solrRequest.getClass().getName(), solrRequest);
+            if (log.isTraceEnabled()) {
+                log.trace("Deserialized class={} solrRequest={}", solrRequest.getClass().getName(),
+                    solrRequest);
+            }
 
 
         } catch (Exception e) {
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
index b69a8a2..d6472fb 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -36,6 +36,14 @@ import java.util.concurrent.Executors;
 // Cross-DC Consumer main class
 public class Consumer {
     public static final String DEFAULT_PORT = "8090";
+    public static final String TOPIC_NAME = "topicName";
+    public static final String GROUP_ID = "groupId";
+    public static final String PORT = "port";
+    public static final String BOOTSTRAP_SERVERS = "bootstrapServers";
+    private static final String DEFAULT_GROUP_ID = "SolrCrossDCConsumer";
+    private static final String MAX_POLL_RECORDS = "maxPollRecords";
+    public static final String DEFAULT_MAX_POLL_RECORDS = "500";
+
     private static boolean enabled = true;
 
     private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -48,8 +56,9 @@ public class Consumer {
     private Server server;
     CrossDcConsumer crossDcConsumer;
     private String topicName;
+    private int maxPollRecords;
 
-    public void start(String bootstrapServers, String zkConnectString, String topicName, boolean enableDataEncryption, int port) {
+    public void start(String bootstrapServers, String zkConnectString, String topicName, String groupId, int maxPollRecords, boolean enableDataEncryption, int port) {
         if (bootstrapServers == null) {
             throw new IllegalArgumentException("bootstrapServers config was not passed at startup");
         }
@@ -60,18 +69,23 @@ public class Consumer {
             throw new IllegalArgumentException("topicName config was not passed at startup");
         }
 
+        if (maxPollRecords == -1) {
+            maxPollRecords = Integer.parseInt(DEFAULT_MAX_POLL_RECORDS);
+        }
+
         this.topicName = topicName;
+        this.maxPollRecords = maxPollRecords;
 
         //server = new Server();
         //ServerConnector connector = new ServerConnector(server);
         //connector.setPort(port);
         //server.setConnectors(new Connector[] {connector})
 
-        crossDcConsumer = getCrossDcConsumer(bootstrapServers, zkConnectString, topicName, enableDataEncryption);
+        crossDcConsumer = getCrossDcConsumer(bootstrapServers, zkConnectString, topicName, groupId, maxPollRecords, enableDataEncryption);
 
         // Start consumer thread
 
-        log.info("Starting CrossDC Consumer bootstrapServers={}, zkConnectString={}, topicName={}, enableDataEncryption={}", bootstrapServers, zkConnectString, topicName, enableDataEncryption);
+        log.info("Starting CrossDC Consumer bootstrapServers={}, zkConnectString={}, topicName={}, groupId={}, enableDataEncryption={}", bootstrapServers, zkConnectString, topicName, groupId, enableDataEncryption);
 
         consumerThreadExecutor = Executors.newSingleThreadExecutor();
         consumerThreadExecutor.submit(crossDcConsumer);
@@ -81,10 +95,10 @@ public class Consumer {
         Runtime.getRuntime().addShutdownHook(shutdownHook);
     }
 
-    private CrossDcConsumer getCrossDcConsumer(String bootstrapServers, String zkConnectString, String topicName,
+    private CrossDcConsumer getCrossDcConsumer(String bootstrapServers, String zkConnectString, String topicName, String groupId, int maxPollRecords,
         boolean enableDataEncryption) {
 
-        KafkaCrossDcConf conf = new KafkaCrossDcConf(bootstrapServers, topicName, enableDataEncryption, zkConnectString);
+        KafkaCrossDcConf conf = new KafkaCrossDcConf(bootstrapServers, topicName, groupId, maxPollRecords, enableDataEncryption, zkConnectString);
         return new KafkaCrossDcConsumer(conf);
     }
 
@@ -95,31 +109,29 @@ public class Consumer {
             throw new IllegalArgumentException("zkConnectString not specified for producer");
         }
 
-        String bootstrapServers = System.getProperty("bootstrapServers");
+        String bootstrapServers = System.getProperty(BOOTSTRAP_SERVERS);
         // boolean enableDataEncryption = Boolean.getBoolean("enableDataEncryption");
-        String topicName = System.getProperty("topicName");
-        String port = System.getProperty("port");
+        String topicName = System.getProperty(TOPIC_NAME);
+        String port = System.getProperty(PORT);
+        String groupId = System.getProperty(GROUP_ID, "");
+        String maxPollRecords = System.getProperty("maxPollRecords");
 
 
         try (SolrZkClient client = new SolrZkClient(zkConnectString, 15000)) {
 
             try {
-                if ((topicName == null || topicName.isBlank())
-                    || (bootstrapServers == null || bootstrapServers.isBlank()) || (port == null || port.isBlank()) && client
-                    .exists(CrossDcConf.CROSSDC_PROPERTIES, true)) {
-                    byte[] data = client.getData("/crossdc.properties", null, null, true);
+                if ((topicName == null || topicName.isBlank()) || (groupId == null || groupId.isBlank())
+                    || (bootstrapServers == null || bootstrapServers.isBlank()) || (port == null || port.isBlank()) || (maxPollRecords == null || maxPollRecords.isBlank()) && client
+                    .exists(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH, KafkaCrossDcConf.CROSSDC_PROPERTIES), true)) {
+                    byte[] data = client.getData(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH, KafkaCrossDcConf.CROSSDC_PROPERTIES), null, null, true);
                     Properties props = new Properties();
                     props.load(new ByteArrayInputStream(data));
 
-                    if (topicName == null || topicName.isBlank()) {
-                        topicName = props.getProperty("topicName");
-                    }
-                    if (bootstrapServers == null || bootstrapServers.isBlank()) {
-                        bootstrapServers = props.getProperty("bootstrapServers");
-                    }
-                    if (port == null || port.isBlank()) {
-                        port = props.getProperty("port");
-                    }
+                    topicName = getConfig(TOPIC_NAME, topicName, props);
+                    bootstrapServers = getConfig(BOOTSTRAP_SERVERS, bootstrapServers, props);
+                    port = getConfig(PORT, port, props);
+                    groupId = getConfig(GROUP_ID, groupId, props);
+                    maxPollRecords = getConfig(MAX_POLL_RECORDS, maxPollRecords, props);
                 }
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
@@ -140,8 +152,23 @@ public class Consumer {
             throw new IllegalArgumentException("topicName not specified for producer");
         }
 
+        if (groupId.isBlank()) {
+            groupId = DEFAULT_GROUP_ID;
+        }
+
+        if (maxPollRecords == null || maxPollRecords.isBlank()) {
+            maxPollRecords = DEFAULT_MAX_POLL_RECORDS;
+        }
+
         Consumer consumer = new Consumer();
-        consumer.start(bootstrapServers, zkConnectString, topicName, false, Integer.parseInt(port));
+        consumer.start(bootstrapServers, zkConnectString, topicName, groupId, Integer.parseInt(maxPollRecords), false, Integer.parseInt(port));
+    }
+
+    private static String getConfig(String configName, String configValue, Properties props) {
+        if (configValue == null || configValue.isBlank()) {
+            configValue = props.getProperty(configName);
+        }
+        return configValue;
     }
 
     public void shutdown() {
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index a399fbc..927a3df 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -49,7 +49,9 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
 
     kafkaConsumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.getBootStrapServers());
 
-    kafkaConsumerProp.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1"); // TODO
+    kafkaConsumerProp.put(ConsumerConfig.GROUP_ID_CONFIG, conf.getGroupId());
+
+    kafkaConsumerProp.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, conf.getMaxPollRecords());
 
     kafkaConsumerProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
index 34bc29b..df26c43 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
@@ -34,7 +34,7 @@ public class KafkaRequestMirroringHandler implements RequestMirroringHandler {
     final KafkaMirroringSink sink;
 
     public KafkaRequestMirroringHandler(KafkaMirroringSink sink) {
-        log.info("create KafkaRequestMirroringHandler");
+        log.debug("create KafkaRequestMirroringHandler");
         this.sink = sink;
     }
 
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
index fecd445..02282b6 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
@@ -221,7 +221,7 @@ public class MirroringUpdateProcessor extends UpdateRequestProcessor {
   }
 
   public void processCommit(CommitUpdateCommand cmd) throws IOException {
-    log.info("process commit cmd={}", cmd);
+    log.debug("process commit cmd={}", cmd);
     if (next != null) next.processCommit(cmd);
   }
 
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
index bca9656..2757ee3 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -111,8 +111,8 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
 
         try {
             if (((topicName == null || topicName.isBlank()) || (bootstrapServers == null || bootstrapServers.isBlank())) && core.getCoreContainer().getZkController()
-                .getZkClient().exists(CrossDcConf.CROSSDC_PROPERTIES, true)) {
-                byte[] data = core.getCoreContainer().getZkController().getZkClient().getData("/crossdc.properties", null, null, true);
+                .getZkClient().exists(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH, KafkaCrossDcConf.CROSSDC_PROPERTIES), true)) {
+                byte[] data = core.getCoreContainer().getZkController().getZkClient().getData(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH, KafkaCrossDcConf.CROSSDC_PROPERTIES), null, null, true);
 
                 if (data == null) {
                     log.error("crossdc.properties file in Zookeeper has no data");
@@ -153,7 +153,7 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
         // load the request mirroring sink class and instantiate.
        // mirroringHandler = core.getResourceLoader().newInstance(RequestMirroringHandler.class.getName(), KafkaRequestMirroringHandler.class);
 
-        KafkaCrossDcConf conf = new KafkaCrossDcConf(bootstrapServers, topicName, false, null);
+        KafkaCrossDcConf conf = new KafkaCrossDcConf(bootstrapServers, topicName, "",  -1, false,  null);
         KafkaMirroringSink sink = new KafkaMirroringSink(conf);
 
         Closer closer = new Closer(sink);
@@ -214,7 +214,7 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
         if (log.isTraceEnabled()) {
             log.trace("Create MirroringUpdateProcessor with mirroredParams={}", mirroredParams);
         }
-        log.info("Create MirroringUpdateProcessor");
+
         return new MirroringUpdateProcessor(next, doMirroring, mirroredParams,
                 DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)), doMirroring ? mirroringHandler : null);
     }
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
index 0f5c823..9a5ba80 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
@@ -104,7 +104,7 @@ import java.util.Properties;
     String bootstrapServers = kafkaCluster.bootstrapServers();
     log.info("bootstrapServers={}", bootstrapServers);
 
-    consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, false, 0);
+    consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1,false, 0);
 
   }
 
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
index 2fb6c64..ec727ea 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
@@ -106,7 +106,7 @@ import java.util.Properties;
     String bootstrapServers = kafkaCluster.bootstrapServers();
     log.info("bootstrapServers={}", bootstrapServers);
 
-    consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, false, 0);
+    consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC,  "group1", -1,false, 0);
   }
 
   private static MiniSolrCloudCluster startCluster(MiniSolrCloudCluster solrCluster, ZkTestServer zkTestServer, Path baseDir) throws Exception {
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
index 52ddedf..fb0bb48 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
@@ -99,7 +99,7 @@ import static org.mockito.Mockito.spy;
     String bootstrapServers = kafkaCluster.bootstrapServers();
     log.info("bootstrapServers={}", bootstrapServers);
 
-    consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, false, 0);
+    consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1, false, 0);
 
   }
 
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
similarity index 57%
copy from crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
copy to crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
index 52ddedf..cd2c0f2 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
@@ -1,12 +1,7 @@
 package org.apache.solr.crossdc;
 
-import com.carrotsearch.randomizedtesting.annotations.ThreadLeakAction;
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.lucene.util.QuickPatchThreadsFilter;
 import org.apache.solr.SolrIgnoredThreadsFilter;
@@ -15,14 +10,11 @@ import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.cloud.MiniSolrCloudCluster;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.util.ObjectReleaseTracker;
-import org.apache.solr.crossdc.common.MirroredSolrRequest;
-import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
 import org.apache.solr.crossdc.consumer.Consumer;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -32,13 +24,13 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Properties;
 
-import static org.mockito.Mockito.spy;
-
 @ThreadLeakFilters(defaultFilters = true, filters = { SolrIgnoredThreadsFilter.class,
     QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class })
-@ThreadLeakLingering(linger = 5000) public class SolrAndKafkaIntegrationTest extends
+@ThreadLeakLingering(linger = 5000) public class SolrAndKafkaReindexTest extends
     SolrTestCaseJ4 {
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -76,30 +68,30 @@ import static org.mockito.Mockito.spy;
     System.setProperty("topicName", TOPIC);
     System.setProperty("bootstrapServers", kafkaCluster.bootstrapServers());
 
-    solrCluster1 = new SolrCloudTestCase.Builder(1, createTempDir()).addConfig("conf",
+    solrCluster1 = new SolrCloudTestCase.Builder(3, createTempDir()).addConfig("conf",
         getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
 
     CollectionAdminRequest.Create create =
-        CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
+        CollectionAdminRequest.createCollection(COLLECTION, "conf", 3, 2).setMaxShardsPerNode(10);;
     solrCluster1.getSolrClient().request(create);
-    solrCluster1.waitForActiveCollection(COLLECTION, 1, 1);
+    solrCluster1.waitForActiveCollection(COLLECTION, 3, 6);
 
     solrCluster1.getSolrClient().setDefaultCollection(COLLECTION);
 
-    solrCluster2 = new SolrCloudTestCase.Builder(1, createTempDir()).addConfig("conf",
+    solrCluster2 = new SolrCloudTestCase.Builder(3, createTempDir()).addConfig("conf",
         getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
 
     CollectionAdminRequest.Create create2 =
-        CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
+        CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 3).setMaxShardsPerNode(10);
     solrCluster2.getSolrClient().request(create2);
-    solrCluster2.waitForActiveCollection(COLLECTION, 1, 1);
+    solrCluster2.waitForActiveCollection(COLLECTION, 2, 6);
 
     solrCluster2.getSolrClient().setDefaultCollection(COLLECTION);
 
     String bootstrapServers = kafkaCluster.bootstrapServers();
     log.info("bootstrapServers={}", bootstrapServers);
 
-    consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, false, 0);
+    consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1,false, 0);
 
   }
 
@@ -110,7 +102,9 @@ import static org.mockito.Mockito.spy;
     consumer.shutdown();
 
     try {
-      kafkaCluster.stop();
+      if (kafkaCluster != null) {
+        kafkaCluster.stop();
+      }
     } catch (Exception e) {
       log.error("Exception stopping Kafka cluster", e);
     }
@@ -136,15 +130,8 @@ import static org.mockito.Mockito.spy;
 
   public void testFullCloudToCloud() throws Exception {
     CloudSolrClient client = solrCluster1.getSolrClient();
-    SolrInputDocument doc = new SolrInputDocument();
-    doc.addField("id", String.valueOf(System.currentTimeMillis()));
-    doc.addField("text", "some test");
-
-    client.add(doc);
 
-    client.commit(COLLECTION);
-
-    System.out.println("Sent producer record");
+    addDocs(client, "first");
 
     QueryResponse results = null;
     boolean foundUpdates = false;
@@ -152,7 +139,29 @@ import static org.mockito.Mockito.spy;
       solrCluster2.getSolrClient().commit(COLLECTION);
       solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
       results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
-      if (results.getResults().getNumFound() == 1) {
+      if (results.getResults().getNumFound() == 7) {
+        foundUpdates = true;
+      } else {
+        Thread.sleep(100);
+      }
+    }
+
+    assertTrue("results=" + results, foundUpdates);
+
+    QueryResponse results1 =solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("first"));
+    QueryResponse results2 = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("first"));
+
+    assertEquals("results=" + results1, 7, results1.getResults().getNumFound());
+    assertEquals("results=" + results2, 7, results2.getResults().getNumFound());
+
+    addDocs(client, "second");
+
+    foundUpdates = false;
+    for (int i = 0; i < 50; i++) {
+      solrCluster2.getSolrClient().commit(COLLECTION);
+      solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+      results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+      if (results.getResults().getNumFound() == 7) {
         foundUpdates = true;
       } else {
         Thread.sleep(100);
@@ -164,50 +173,93 @@ import static org.mockito.Mockito.spy;
     assertTrue("results=" + results, foundUpdates);
     System.out.println("Rest: " + results);
 
-  }
+    results1 =solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("second"));
+    results2 = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("second"));
 
-  public void testProducerToCloud() throws Exception {
-    Properties properties = new Properties();
-    properties.put("bootstrap.servers", kafkaCluster.bootstrapServers());
-    properties.put("acks", "all");
-    properties.put("retries", 0);
-    properties.put("batch.size", 1);
-    properties.put("buffer.memory", 33554432);
-    properties.put("linger.ms", 1);
-    properties.put("key.serializer", StringSerializer.class.getName());
-    properties.put("value.serializer", MirroredSolrRequestSerializer.class.getName());
-    Producer<String, MirroredSolrRequest> producer = new KafkaProducer(properties);
-    UpdateRequest updateRequest = new UpdateRequest();
-    updateRequest.setParam("shouldMirror", "true");
-    updateRequest.add("id", String.valueOf(System.currentTimeMillis()), "text", "test");
-    updateRequest.add("id", String.valueOf(System.currentTimeMillis() + 22), "text", "test2");
-    updateRequest.setParam("collection", COLLECTION);
-    MirroredSolrRequest mirroredSolrRequest = new MirroredSolrRequest(updateRequest);
-    System.out.println("About to send producer record");
-    producer.send(new ProducerRecord(TOPIC, mirroredSolrRequest), (metadata, exception) -> {
-      log.info("Producer finished sending metadata={}, exception={}", metadata, exception);
-    });
-    producer.flush();
-
-    System.out.println("Sent producer record");
-
-    solrCluster2.getSolrClient().commit(COLLECTION);
+    assertEquals("results=" + results1, 7, results1.getResults().getNumFound());
+    assertEquals("results=" + results2, 7, results2.getResults().getNumFound());
 
-    QueryResponse results = null;
-    boolean foundUpdates = false;
+    addDocs(client, "third");
+
+    foundUpdates = false;
     for (int i = 0; i < 50; i++) {
       solrCluster2.getSolrClient().commit(COLLECTION);
+      solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
       results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
-      if (results.getResults().getNumFound() == 2) {
+      if (results.getResults().getNumFound() == 7) {
         foundUpdates = true;
       } else {
         Thread.sleep(100);
       }
     }
 
+    System.out.println("Closed producer");
+
     assertTrue("results=" + results, foundUpdates);
     System.out.println("Rest: " + results);
 
-    producer.close();
+    results1 =solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("third"));
+    results2 = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("third"));
+
+    assertEquals("results=" + results1, 7, results1.getResults().getNumFound());
+    assertEquals("results=" + results2, 7, results2.getResults().getNumFound());
+
+
+
+  }
+
+  private void addDocs(CloudSolrClient client, String tag) throws SolrServerException, IOException {
+    String id1 = "1";
+    String id2 = "2";
+    String id3 = "3";
+    String id4 = "4";
+    String id5 = "5";
+    String id6 = "6";
+    String id7 = "7";
+
+    SolrInputDocument doc1 = new SolrInputDocument();
+    doc1.addField("id", id1);
+    doc1.addField("text", "some test one " + tag);
+
+    SolrInputDocument doc2 = new SolrInputDocument();
+    doc2.addField("id", id2);
+    doc2.addField("text", "some test two " + tag);
+
+    List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(2);
+    docs.add(doc1);
+    docs.add(doc2);
+
+    client.add(docs);
+
+    client.commit(COLLECTION);
+
+    SolrInputDocument doc3 = new SolrInputDocument();
+    doc3.addField("id", id3);
+    doc3.addField("text", "some test three " + tag);
+
+    SolrInputDocument doc4 = new SolrInputDocument();
+    doc4.addField("id", id4);
+    doc4.addField("text", "some test four " + tag);
+
+    SolrInputDocument doc5 = new SolrInputDocument();
+    doc5.addField("id", id5);
+    doc5.addField("text", "some test five " + tag);
+
+    SolrInputDocument doc6 = new SolrInputDocument();
+    doc6.addField("id", id6);
+    doc6.addField("text", "some test six " + tag);
+
+    SolrInputDocument doc7 = new SolrInputDocument();
+    doc7.addField("id", id7);
+    doc7.addField("text", "some test seven " + tag);
+
+    client.add(doc3);
+    client.add(doc4);
+    client.add(doc5);
+    client.add(doc6);
+    client.add(doc7);
+
+    client.commit(COLLECTION);
   }
+
 }
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
index f2c5fc4..8d4cd13 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
@@ -108,7 +108,7 @@ import java.util.Properties;
     String bootstrapServers = kafkaCluster.bootstrapServers();
     log.info("bootstrapServers={}", bootstrapServers);
 
-    consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, false, 0);
+    consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1, false, 0);
 
   }