You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ma...@apache.org on 2022/11/16 06:09:35 UTC

[solr-sandbox] branch main updated: Merge crossdc-wip into main. (#50)

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/main by this push:
     new e68c00b  Merge crossdc-wip into main. (#50)
e68c00b is described below

commit e68c00beeaa70902000d7bc947f9bcda448aa0d8
Author: Mark Robert Miller <ma...@apache.org>
AuthorDate: Wed Nov 16 00:09:30 2022 -0600

    Merge crossdc-wip into main. (#50)
    
    * WIP for Cross DC consumer  (#5)
    
    * CrossDc Consumer wip
    
    * Temp commit
    
    * wip commit
    
    * Draft commit, cleanup code
    
    * WIP commit
    
    * Fix tests, WIP commit
    
    * Refactor MessageProcessor and some cleanup.
    
    Co-authored-by: Anshum Gupta <an...@apache.org>
    
    * Support for integration tests with SolrCloud. (#6)
    
    I've had a few little issues that I've been chasing, so to get something in, I pulled out kafka and some other items and setup a minimal set of changes to use SolrCloud in tests and I have a simple test that runs the same messsage processor test but with a real cloud instance instead of a mock. Hang on a moment and I'll get kafka and the rest back in.
    
    * Add back in the EmbeddedKafkaCluster. (#7)
    
    * WIP for UpdateRequestProcessor and other producer related files (#8)
    
    * IntegrationTest WIP (#9)
    
    * Fix a compile error, add solr-core as a dependency as the URP requires it (#10)
    
    * Move cross dc producer code into separate module (#11)
    
    * Move cross dc producer code into a separate module
    
    * Fix package name in javadocs
    
    * Create a commons module for crossdc (#12)
    
    * Integration tests for cross dc consumer with SolrCloud (#14)
    
    * Support for integration tests with SolrCloud.
    * Add back in the EmbeddedKafkaCluster.
    
    * Working out remaining issues via manual testing. (#15)
    
    * Tie up
    
    * Fix a variety of issues.
    
    * Work out SolrAndKafkaIntegrationTest (#16)
    
    Each of the 2 tests still needs to be run separately - need to fix up running both in same run.
    
    * Test Passing (#17)
    
    * Producer only needs one of the many runtime dependencies. (#18)
    
    * Experiment with Producer in crossdc package. (#19)
    
    * Simplify build artifacts. (#20)
    
    * Revert "Experiment with Producer in crossdc package. (#19)"
    
    This reverts commit 9007aa6b8d51384ab5f146ebdce013633bae4660.
    
    * Simplify build artifacts and allow for future shading if necessary with commons and producer only creating a single uber jar artifact.
    
    * Allow central config from ZK and build updates. (#21)
    
    * Docs and enable flag. (#22)
    
    * Add explicit error logging around CrossDC config. (#23)
    
    * Work around access limitations (what you can do here seems to vary by Java version) (#24)
    
    * Fix startup checks (#26)
    
    This change corrects the `if` statements in the consumer start, so that it
    actually checks the config being passed.
    
    Previously it just checked the `bootstrapServers` three times.
    
    * KafkaCrossDcConf: Add a human readable toString (#27)
    
    This change adds a human readable `toString` function to `KafkaCrossDcConf`.
    
    * Update Solr version, build improvement for some envs, include missing solrj dep (#28)
    
    * Cleanup some of the testing issues. (#25)
    
    * Consumer: Add initial metrics (#29)
    
    This change adds initial support for Dropwizard Metrics to the Consumer.
    With this we could allow users to send metrics to a number of reporting
    backends.
    
    * Inefficient DBQ (#31)
    
    * Finish setting up the retry queue based on existing implementation. (#30)
    
    * Reindex Test, cover a few more test cases, additional config options. (#32)
    
    * Reindex Test, cover a few more test cases, additional config options.
    
    Adds a basic test focused on reindexing, covers a few more test cases: more replicas and shards, different shard count on primary and secondary dc, allows consumer group to be configured, allows crossdc zk prop file location to be configured.
    
    * Remove TODO, some minor fixes, doc.
    
    * Add maxPolledRecords and some logging cleanup.
    
    * Config improvements and queue bug fixes. (#34)
    
    * Configuration improvements, cleanup, minor fixes. (#35)
    
    * Explicit SSL Config (#36)
    
    * SSL config passed on to Kafka and a variety of general cleanup. (#37)
    
    * Flush producer on close to prevent losing any pending updates. (#38)
    
    * Config override test and cleanup. (#39)
    
    * Flush producer on close to prevent losing any pending updates.
    
    * Add a config override test and some cleanup.
    
    * Cleanup Kafka config class. (#40)
    
    * Improve test to cover a bit more. (#41)
    
    * Beef up config override test, additional config logging. (#42)
    
    * Allow empty string as a config property value. (#43)
    
    * Check docSize against max before mirroring (#45)
    
    * Check docSize against max before mirroring
    
    Prior to this commit, the MirroringUpdateProcessor had no check to
    ensure that docs weren't running afoul of the batch size limit set at
    the Kafka level.
    
    This commit changes this to ensure that docs exceeding this limit are
    not mirrored.  These offending docs may still be indexed, based on the
    value of the URP's "indexUnmirrorableDocs" config property (which
    defaults to 'false' if not set).
    
    * Fix log message printed when mirroring skipped for large doc (#46)
    
    * Check batch-size against "max" before mirroring (#48)
    
    Prior to this commit, MirroringUpdateProcessor checked that individual
    documents didn't exceed the mirroring batch-size limit on their own, but
    no check existed to ensure that the entire list of docs doesn't exceed
    the limit.
    
    This commit adds that check (in MirroringUpdateProcessor.finish()).  If
    the configured max batch-size is exceeded, MUP will log out an error
    (which includes the IDs of all documents in the batch).
    
    * Redact properties before string-ifying or logging (#49)
    
    Prior to this commit, certain log messages during Producer/Consumer
    startup contained potentially sensitive information, such as SSL
    passwords, etc.
    
    This commit amends the string-ification of KafkaCrossDcConf and other
    Map/Properties types to "redact" properties whose key/name contains a
    block-list of values.  This list currently consists of "password" and
    "credentials": any property name containing these substrings (ignoring
    case) will be redacted before logging.
    
    * Remove a couple wip testing tools.
    
    Co-authored-by: Anshum Gupta <an...@apache.org>
    Co-authored-by: Patrik Greco <pg...@apple.com>
    Co-authored-by: Jason Gerlowski <ge...@apache.org>
---
 .gitignore                                         |   4 +
 CROSSDC.md                                         | 199 +++++++++++
 build.gradle                                       |   8 +
 cluster-stop.sh                                    |  23 ++
 cluster.sh                                         | 111 ++++++
 crossdc-commons/README.md                          |   5 +
 {crossdc-consumer => crossdc-commons}/build.gradle |  33 +-
 crossdc-commons/gradle.properties                  |   2 +
 .../apache/solr/crossdc/common/ConfigProperty.java |  74 ++++
 .../apache/solr/crossdc/common/CrossDcConf.java    |  10 +-
 .../solr/crossdc/common/CrossDcConstants.java      |  10 +-
 .../apache/solr/crossdc/common/IQueueHandler.java  |  74 ++++
 .../solr/crossdc/common/KafkaCrossDcConf.java      | 262 +++++++++++++++
 .../solr/crossdc/common/KafkaMirroringSink.java    | 144 ++++++++
 .../solr/crossdc/common/MirroredSolrRequest.java   |  99 ++++++
 .../common/MirroredSolrRequestSerializer.java      | 161 +++++++++
 .../solr/crossdc/common/MirroringException.java    |  26 +-
 .../solr/crossdc/common/RequestMirroringSink.java  |  15 +-
 .../solr/crossdc/common/ResubmitBackoffPolicy.java |   9 +-
 .../common/SensitivePropRedactionUtils.java        |  66 ++++
 .../solr/crossdc/common/SolrExceptionUtil.java     |  17 +-
 crossdc-consumer/build.gradle                      |  44 ++-
 crossdc-consumer/gradle.properties                 |   2 +
 crossdc-consumer/gradlew                           |   0
 .../org/apache/solr/crossdc/consumer/Consumer.java | 149 +++++++++
 .../crossdc/consumer/KafkaCrossDcConsumer.java     | 269 +++++++++++++++
 .../solr/crossdc/helpers/SendDummyUpdates.java     |  51 +++
 .../messageprocessor/MessageProcessor.java}        |  20 +-
 .../messageprocessor/SolrMessageProcessor.java     | 351 +++++++++++++++++++
 .../configs/cloud-minimal/conf/schema.xml          |  54 +++
 .../configs/cloud-minimal/conf/solrconfig.xml      | 112 +++++++
 crossdc-consumer/src/resources/log4j2.xml          |  42 +++
 .../solr/crossdc/SimpleSolrIntegrationTest.java    |  91 +++++
 .../apache/solr/crossdc/TestMessageProcessor.java  | 132 ++++++++
 .../configs/cloud-minimal/conf/schema.xml          |  54 +++
 .../configs/cloud-minimal/conf/solrconfig.xml      | 112 +++++++
 crossdc-consumer/src/test/resources/log4j2.xml     |  67 ++++
 crossdc-producer/build.gradle                      |  74 ++++
 crossdc-producer/gradle.properties                 |   2 +
 .../processor/KafkaRequestMirroringHandler.java    |  54 +++
 .../solr/update/processor/MirroringException.java  |  26 +-
 .../update/processor/MirroringUpdateProcessor.java | 372 +++++++++++++++++++++
 .../MirroringUpdateRequestProcessorFactory.java    | 265 +++++++++++++++
 .../update/processor/RequestMirroringHandler.java  |  11 +-
 .../apache/solr/crossdc/DeleteByQueryToIdTest.java | 197 +++++++++++
 .../solr/crossdc/RetryQueueIntegrationTest.java    | 228 +++++++++++++
 .../solr/crossdc/SolrAndKafkaIntegrationTest.java  | 298 +++++++++++++++++
 .../solr/crossdc/SolrAndKafkaReindexTest.java      | 271 +++++++++++++++
 .../SolrKafkaTestsIgnoredThreadsFilter.java        |  37 +-
 .../solr/crossdc/ZkConfigIntegrationTest.java      | 228 +++++++++++++
 .../configs/cloud-minimal/conf/schema.xml          |  54 +++
 .../configs/cloud-minimal/conf/solrconfig.xml      | 122 +++++++
 crossdc-producer/src/test/resources/log4j2.xml     |  74 ++++
 gradle/wrapper/gradle-wrapper.properties           |   2 +-
 gradlew                                            |   0
 log4j2.xml                                         |  67 ++++
 manual-test.sh                                     |  23 ++
 settings.gradle                                    |   6 +-
 version.props                                      |   0
 59 files changed, 5237 insertions(+), 76 deletions(-)

diff --git a/.gitignore b/.gitignore
index 7013eda..1070024 100644
--- a/.gitignore
+++ b/.gitignore
@@ -14,3 +14,7 @@
 
 # Ignore Gradle build output directory
 build
+out
+cluster
+
+logs
diff --git a/CROSSDC.md b/CROSSDC.md
new file mode 100644
index 0000000..5c528f4
--- /dev/null
+++ b/CROSSDC.md
@@ -0,0 +1,199 @@
+# Solr Cross DC: Getting Started
+
+**A simple cross-data-center fail-over solution for Apache Solr.**
+
+
+
+[TOC]
+
+## Overview
+
+The design for this feature involves three key components:
+
+- A UpdateProccessor plugin for Solr to forward updates from the primary data center.
+- An update request consumer application to receive updates in the backup data center.
+- A distributed queue to connect the above two.
+
+The UpdateProcessor plugin is called the CrossDC Producer, the consumer application is called the CrossDC Consumer, and the supported distributed queue application is Apache Kafka.
+
+## Getting Started
+
+To use Solr Cross DC, you must complete the following steps:
+
+- Startup or obtain access to an Apache Kafka cluster to provide the distributed queue between data centers.
+- Install the CrossDC Solr plugin on each of the nodes in your Solr cluster (in your primary and backup data centers) by placing the jar in the correct location and configuring solrconfig.xml to reference the new UpdateProcessor and then configure it for the Kafka cluster.
+- Install the CrossDC consumer application in the backup data center and configure it for the Kafka cluster and the Solr cluster it will send consumed updates to.
+
+The Solr UpdateProccessor plugin will intercept updates when the node acts as the leader and then put those updates onto the distributed queue. The CrossDC Consumer application will poll the distributed queue and forward updates on to the configured Solr cluster upon receiving the update requests.
+
+### Configuration and Startup
+
+The current configuration options are entirely minimal. Further configuration options will be added over time. At this early stage, some may also change.
+
+#### Installing and Configuring the Cross DC Producer Solr Plug-In
+
+1. Configure the sharedLib directory in solr.xml (eg sharedLIb=lib) and place the CrossDC producer plug-in jar file into the specified folder. It's not advisable to attempt to use the per SolrCore instance directory lib folder as you would have to duplicate the plug-in many times and manage it when creating new collections or adding replicas or shards.
+
+
+**solr.xml**
+
+   ```xml
+   <solr>
+     <str name="sharedLib">${solr.sharedLib:}</str>
+   ```
+
+
+
+2. Configure the new UpdateProcessor in solrconfig.xml
+
+   **NOTE:** `The following is not the recommended configuration approach in production, see the information on central configuration below!`
+
+
+
+**solrconfig.xml**
+
+   ```xml
+   <updateRequestProcessorChain  name="mirrorUpdateChain" default="true">
+   
+     <processor class="org.apache.solr.update.processor.MirroringUpdateRequestProcessorFactory">
+       <str name="bootstrapServers">${bootstrapServers:}</str>
+       <str name="topicName">${topicName:}</str>
+     </processor>
+   
+     <processor class="solr.LogUpdateProcessorFactory" />
+     <processor class="solr.RunUpdateProcessorFactory" />
+   </updateRequestProcessorChain>
+   ```
+
+Notice that this update chain has been declared to be the default chain used.
+
+
+
+##### Configuration Properties
+
+There are two configuration properties. You can specify them directly, or use the above notation to allow them to specified via system property (generally configured for Solr in the bin/solr.in.sh file).
+
+   ```
+   bootstrapServers
+   ```
+
+The list of servers used to connect to the Kafka cluster, see https://kafka.apache.org/28/documentation.html#producerconfigs_bootstrap.servers
+
+   ```
+   topicName 
+   ```
+
+The Kafka topicName used to indicate which Kafka queue the Solr updates will be pushed on to.
+
+
+
+3. Add an external version constraint UpdateProcessor to the update chain added to solrconfig.xml to allow user-provided update versions (as opposed to the two Solr clusters using the independently managed built-in versioning).
+
+   https://solr.apache.org/guide/8_11/update-request-processors.html#general-use-updateprocessorfactories
+
+   https://solr.apache.org/docs/8_1_1/solr-core/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessor.html
+
+
+4. Start or restart the Solr cluster(s).
+
+
+
+#### Installing and Configuring the CrossDC Consumer Application
+
+1. Uncompress the distribution tar or zip file for the CrossDC Consumer into an appropriate install location on a node in the receiving data center.
+2. You can start the Consumer process via the included shell start script at bin/crossdc-consumer.
+3. You can configure the CrossDC Consumer via Java system properties pass in the CROSSDC_CONSUMER_OPTS environment variable, i.e. CROSSDC_CONSUMER_OPTS="-DbootstrapServers=127.0.0.1:2181 -DzkConnectString=127.0.0.1:2181 -DtopicName=crossdc" bin/crossdc-consumer
+
+The required configuration properties are:
+
+
+   *bootstrapServers* - the list of servers used to connect to the Kafka cluster https://kafka.apache.org/28/documentation.html#producerconfigs_bootstrap.servers
+
+   *topicName* - the Kafka topicName used to indicate which Kafka queue the Solr updates will be pushed to.
+
+   *zkConnectString* - the Zookeeper connection string used by Solr to connect to its Zookeeper cluster in the backup data center
+
+Additional configuration properties:
+
+   *groupId* - the group id to give Kafka for the consumer, default to the empty string if not specified.
+
+The following additional configuration properties should either be specified for both the producer and the consumer or in the shared Zookeeper
+central config properties file. This is because the Consumer will use a Producer for retries.
+
+   *batchSizeBytes* - the maximum batch size in bytes for the queue
+   *bufferMemoryBytes* - the amount of memory in bytes allocated by the Producer in total for buffering 
+   *lingerMs* - the amount of time that the Producer will wait to add to a batch
+   *requestTimeout* - request timeout for the Producer - when used for the Consumers retry Producer, this should be less than the timeout that will cause the Consumer to be removed from the group for taking too long.
+
+#### Central Configuration Option
+
+You can optionally manage the configuration centrally in Solr's Zookeeper cluster by placing a properties file called *crossdc.properties* in the root Solr Zookeeper znode, eg, */solr/crossdc.properties*.  This allows you to update the configuration in a central location rather than at each solrconfig.xml in each Solr node and also automatically deals with new Solr nodes or Consumers to come up without requiring additional configuration.
+
+
+
+Both *bootstrapServers* and *topicName* properties can be put in this file, in which case you would not have to specify any Kafka configuration in the solrconfig.xml for the CrossDC Producer Solr plugin. Likewise, for the CrossDC Consumer application, you would only have to set *zkConnectString* for the local Solr cluster. Note that the two components will be looking in the Zookeeper clusters in their respective data center locations.
+
+You can override the properties file location and znode name in Zookeeper using the system property *zkCrossDcPropsPath=/path/to/props_file_name.properties*
+
+#### Making the Cross DC UpdateProcessor Optional in a Common solrconfig.xml
+
+The simplest and least invasive way to control whether the Cross DC UpdateProcessor is on or off for a node is to configure the update chain it's used in to be the default chain or not via Solr's system property configuration syntax.  This syntax takes the form of ${*system_property_name*} and will be substituted with the value of that system property when the configuration is parsed. You can specify a default value using the following syntax: ${*system_property_name*:*default_value*}. Y [...]
+
+*Having a separate updateRequestProcessorChain avoids a lot of additional constraints you have to deal with or consider, now or in the future, when compared to forcing all Cross DC and non-Cross DC use down a single, required, common updateRequestProcessorChain.*
+
+Further, any application consuming the configuration with no concern for enabling Cross DC will not be artificially limited in its ability to define, manage and use updateRequestProcessorChain's.
+
+The following would enable a system property to safely and non invasively enable or disable Cross DC for a node:
+
+
+```xml
+<updateRequestProcessorChain  name="crossdcUpdateChain" default="${crossdcEnabled:false}">
+  <processor class="org.apache.solr.update.processor.MirroringUpdateRequestProcessorFactory">
+    <bool name="enabled">${enabled:false}</bool>
+  </processor>
+  <processor class="solr.LogUpdateProcessorFactory" />
+  <processor class="solr.RunUpdateProcessorFactory" />
+</updateRequestProcessorChain>
+```
+
+
+
+The above configuration would default to Cross DC being disabled with minimal impact to any non-Cross DC use, and Cross DC could be enabled by starting Solr with the system property crossdcEnabled=true.
+
+The last chain to declare it's the default wins, so you can put this at the bottom of almost any existing solrconfig.xml to create an optional Cross DC path without having to audit, understand, adapt, or test existing non-Cross DC paths as other options call for.
+
+The above is the simplest and least obtrusive way to manage an on/off switch for Cross DC.
+
+**Note:** If your configuration already makes use of update handlers and/or updates independently specifying different updateRequestProcessorChains, your solution may end up a bit more sophisticated.
+
+
+
+For situations where you do want to control and enforce a single updateRequestProcessorChain path for every consumer of the solrconfig.xml, it's enough to simply use the *enabled* attribute, turning the processor into a NOOP in the chain.
+
+
+
+```xml
+<updateRequestProcessorChain  name="crossdcUpdateChain">
+  <processor class="org.apache.solr.update.processor.MirroringUpdateRequestProcessorFactory">
+    <bool name="enabled">${enabled:false}</bool>
+  </processor>
+  <processor class="solr.LogUpdateProcessorFactory" />
+  <processor class="solr.RunUpdateProcessorFactory" />
+</updateRequestProcessorChain>
+```
+
+
+
+## Limitations
+
+- Delete-By-Query is not officially supported.
+
+    - Work-In-Progress: A non-efficient option to issue multiple delete by id queries using the results of a given standard query.
+
+    - Simply forwarding a real Delete-By-Query could also be reasonable if it is not strictly reliant on not being reordered with other requests.
+
+
+
+## Additional Notes
+
+In these early days, it may help to reference the *cluster.sh* script located in the root of the CrossDC repository. This script is a helpful developer tool for manual testing and it will download Solr and Kafka and then configure both for Cross DC.
\ No newline at end of file
diff --git a/build.gradle b/build.gradle
index b9644b2..79f252e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -21,3 +21,11 @@
  * This is a general purpose Gradle build.
  * Learn more about Gradle by exploring our samples at https://docs.gradle.org/6.7.1/samples
  */
+
+
+description 'Root for Solr plugins sandbox'
+
+
+subprojects {
+    group "org.apache.solr.crossdc"
+}
diff --git a/cluster-stop.sh b/cluster-stop.sh
new file mode 100644
index 0000000..4df6097
--- /dev/null
+++ b/cluster-stop.sh
@@ -0,0 +1,23 @@
+#!/bin/bash
+
+kafkaBase="https://archive.apache.org/dist/kafka/2.8.1"
+solrBase="https://dlcdn.apache.org/lucene/solr/8.11.2"
+
+kafka="kafka_2.12-2.8.1"
+solr="solr-8.11.2"
+
+
+cd cluster
+
+(
+  cd "${kafka}" || exit
+
+  bin/zookeeper-server-stop.sh config/zookeeper.properties
+  bin/kafka-server-stop.sh config/server.properties
+)
+
+(
+  cd "${solr}" || exit
+
+  bin/solr stop -all
+)
\ No newline at end of file
diff --git a/cluster.sh b/cluster.sh
new file mode 100644
index 0000000..9125d0c
--- /dev/null
+++ b/cluster.sh
@@ -0,0 +1,111 @@
+#!/bin/bash
+
+pid="$$"
+
+echo "pid=${pid}"
+
+kafkaBase="https://archive.apache.org/dist/kafka/2.8.1"
+solrBase="https://dlcdn.apache.org/lucene/solr/8.11.2"
+
+kafka="kafka_2.12-2.8.1"
+solr="solr-8.11.2"
+
+base="${PWD}/cluster"
+
+#trap 'echo exittrap;cd ${base}/${kafka};bin/zookeeper-server-stop.sh config/zookeeper.properties;bin/kafka-server-stop.sh config/server.properties;cd ${base}/${solr};bin/solr stop -all;pkill -TERM -P ${pid}' EXIT
+
+
+
+if [ ! -d cluster ]
+then
+  mkdir cluster
+fi
+
+cd cluster || exit
+
+if [ ! -f ${kafka}.tgz ]
+then
+  wget "${kafkaBase}/${kafka}.tgz"
+fi
+
+if [ ! -d ${kafka} ]
+then
+  tar -xvzf ${kafka}.tgz
+fi
+
+if [ ! -f ${solr}.tgz ]
+then
+  wget "${solrBase}/${solr}.tgz"
+fi
+
+if [ ! -d ${solr} ]
+then
+  tar -xvzf ${solr}.tgz
+fi
+
+(
+  cd "${kafka}" || exit
+
+
+
+sed -i "s|/tmp/kafka-logs|${PWD}/kafka_data/|" config/server.properties
+sed -i "s|/tmp/zookeeper|${PWD}/zk_data|" config/zookeeper.properties
+
+
+bin/zookeeper-server-start.sh config/zookeeper.properties > ../kafka_zk.log &
+
+bin/kafka-server-start.sh config/server.properties > ../kafka_server.log &
+
+# The following commented out  section is just for helpful reference
+
+# for kafka 2.x zk port of 2181, for 3.x broker of 9093
+
+bin/kafka-topics.sh --create --topic crossdc --bootstrap-server 127.0.0.1:9092 --partitions 1 --replication-factor 1
+
+# bin/kafka-topics.sh --list --bootstrap-server 127.0.0.1:9093
+
+# bin/kafka-console-producer.sh --broker-list 127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095 --topic my-kafka-topic
+
+# bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9093 --topic my-kafka-topic --from-beginning
+
+# bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9093 --topic my-kafka-topic --from-beginning --group group2
+)
+
+# need to go to lib folder - I can't believe there is no shared lib folder by default - crazy
+mkdir "${solr}/server/solr/lib"
+
+cp ../crossdc-producer/build/libs/crossdc-producer-*.jar "${solr}"/server/solr/lib
+
+(
+  cd "${solr}" || exit
+
+  echo -e "SOLR_OPTS=\"$SOLR_OPTS -Dsolr.sharedLib=lib -DbootstrapServers=127.0.0.1:9092 -DtopicName=crossdc\"" >>  bin/solr.in.sh
+
+  chmod +x  bin/solr
+
+  bin/solr start -cloud > ../solr.log
+
+  # for kafka 2.x ZK is on 2181, for Solr ZK is on 9983
+  # for the moment we upload the config set used in crossdc-producer tests
+
+  if [ ! -d "../../crossdc-producer/src/test/resources/configs/cloud-minimal/conf" ]
+  then
+    echo "Could not find configset folder to upload"
+    exit 1
+  fi
+  bin/solr zk upconfig -z 127.0.0.1:9983 -n crossdc -d ../../crossdc-producer/src/test/resources/configs/cloud-minimal/conf
+
+  bin/solr create -c collection1 -n crossdc
+
+  bin/solr status
+)
+
+cp ../crossdc-consumer/build/distributions/crossdc-consumer-*.tar .
+
+tar -xvf crossdc-consumer-*.tar
+rm crossdc-consumer-*.tar
+
+(
+  cd crossdc-consumer* || exit
+  CROSSDC_CONSUMER_OPTS="-Dlog4j2.configurationFile=../log4j2.xml -DbootstrapServers=127.0.0.1:9092 -DzkConnectString=127.0.0.1:9983 -DtopicName=crossdc" bin/crossdc-consumer > ../crossdc_consumer.log &
+)
diff --git a/crossdc-commons/README.md b/crossdc-commons/README.md
new file mode 100644
index 0000000..0a80190
--- /dev/null
+++ b/crossdc-commons/README.md
@@ -0,0 +1,5 @@
+Cross Data Center Replication Commons
+=====================================
+
+Commons module for Cross DC producer and consumer.
+This contains classes that are shared between the producer and consumer like configuration, constants, message processor, etc.
\ No newline at end of file
diff --git a/crossdc-consumer/build.gradle b/crossdc-commons/build.gradle
similarity index 58%
copy from crossdc-consumer/build.gradle
copy to crossdc-commons/build.gradle
index 8c44542..a687058 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-commons/build.gradle
@@ -16,15 +16,42 @@
  */
 plugins {
     id 'java'
+    id 'com.github.johnrengelman.shadow' version '7.1.2'
 }
 
-description = 'Cross-DC Consumer package'
-
-version '1.0-SNAPSHOT'
+description = 'Cross-DC Commons package'
 
 repositories {
     mavenCentral()
 }
 
+configurations {
+    provided
+}
+
+sourceSets {
+    main { compileClasspath += configurations.provided }
+}
+
 dependencies {
+    provided 'org.apache.solr:solr-solrj:8.11.2'
+    implementation 'org.apache.kafka:kafka-clients:2.8.1'
+    implementation 'com.google.guava:guava:14.0'
+}
+
+jar.enabled = false
+
+shadowJar {
+    archiveBaseName.set('crossdc-commons')
+    configurations = [project.configurations.compileClasspath]
+}
+
+jar.dependsOn(shadowJar)
+
+artifacts {
+    shadowJar;
+}
+
+test {
+    jvmArgs '-Djava.security.egd=file:/dev/./urandom'
 }
diff --git a/crossdc-commons/gradle.properties b/crossdc-commons/gradle.properties
new file mode 100644
index 0000000..0df7afe
--- /dev/null
+++ b/crossdc-commons/gradle.properties
@@ -0,0 +1,2 @@
+group=org.apache.solr
+version=0.1-SNAPSHOT
\ No newline at end of file
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/ConfigProperty.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/ConfigProperty.java
new file mode 100644
index 0000000..8c1af02
--- /dev/null
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/ConfigProperty.java
@@ -0,0 +1,74 @@
+package org.apache.solr.crossdc.common;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+
+public class ConfigProperty {
+
+  private final String key;
+  private final String defaultValue;
+
+  private boolean required = false;
+
+  public ConfigProperty(String key, String defaultValue, boolean required) {
+    this.key = key;
+    this.defaultValue = defaultValue;
+    this.required = required;
+  }
+
+  public ConfigProperty(String key, String defaultValue) {
+    this.key = key;
+    this.defaultValue = defaultValue;
+  }
+
+  public ConfigProperty(String key) {
+    this.key = key;
+    this.defaultValue = null;
+  }
+
+  public String getKey() {
+    return key;
+  }
+
+  public boolean isRequired() {
+    return required;
+  }
+
+  public String getDefaultValue() {
+    return defaultValue;
+  }
+
+  public String getValue(Map properties) {
+    String val = (String) properties.get(key);
+    if (val == null) {
+     return defaultValue;
+    }
+    return val;
+  }
+
+  public Integer getValueAsInt(Map properties) {
+    Object value = (Object) properties.get(key);
+    if (value != null) {
+      if (value instanceof Integer) {
+        return (Integer) value;
+      }
+      return Integer.parseInt(value.toString());
+    }
+    if (defaultValue == null) {
+      return null;
+    }
+    return Integer.parseInt(defaultValue);
+  }
+
+  public Boolean getValueAsBoolean(Map properties) {
+    Object value = (Object) properties.get(key);
+    if (value != null) {
+      if (value instanceof Boolean) {
+        return (Boolean) value;
+      }
+      return Boolean.parseBoolean(value.toString());
+    }
+    return Boolean.parseBoolean(defaultValue);
+  }
+}
diff --git a/crossdc-consumer/settings.gradle b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
similarity index 77%
copy from crossdc-consumer/settings.gradle
copy to crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
index 8c7c712..b34ae5b 100644
--- a/crossdc-consumer/settings.gradle
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
@@ -14,11 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.solr.crossdc.common;
 
-rootProject.name = 'crossdc-consumer'
-
-description = 'Module for Apache Solr Cross DC Consumer'
-
-subprojects {
-    group "org.apache.solr.crossdc"
+public abstract class CrossDcConf {
+    public static final String CROSSDC_PROPERTIES = "/crossdc.properties";
+    public static final String ZK_CROSSDC_PROPS_PATH = "zkCrossDcPropsPath";
 }
diff --git a/crossdc-consumer/settings.gradle b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConstants.java
similarity index 80%
copy from crossdc-consumer/settings.gradle
copy to crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConstants.java
index 8c7c712..ba0a73d 100644
--- a/crossdc-consumer/settings.gradle
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConstants.java
@@ -14,11 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.solr.crossdc.common;
 
-rootProject.name = 'crossdc-consumer'
-
-description = 'Module for Apache Solr Cross DC Consumer'
-
-subprojects {
-    group "org.apache.solr.crossdc"
+public class CrossDcConstants {
+    // Requests containing this parameter will not be mirrored.
+    public static final String SHOULD_MIRROR = "shouldMirror";
 }
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/IQueueHandler.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/IQueueHandler.java
new file mode 100644
index 0000000..a242932
--- /dev/null
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/IQueueHandler.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc.common;
+
+public interface IQueueHandler<T> {
+    enum ResultStatus {
+        /** Item was successfully processed */
+        HANDLED,
+
+        /** Item was not processed, and the consumer should shutdown */
+        NOT_HANDLED_SHUTDOWN,
+
+        /** Item processing failed, and the item should be retried immediately */
+        FAILED_RETRY,
+
+        /** Item processing failed, and the item should not be retried (unsuccessfully processed) */
+        FAILED_NO_RETRY,
+
+        /** Item processing failed, and the item should be re-queued */
+        FAILED_RESUBMIT
+    }
+
+    class Result<T> {
+        private final ResultStatus _status;
+        private final Throwable _throwable;
+        private final T _newItem;
+
+        public Result(final ResultStatus status) {
+            _status = status;
+            _throwable = null;
+            _newItem = null;
+        }
+
+        public Result(final ResultStatus status, final Throwable throwable) {
+            _status = status;
+            _throwable = throwable;
+            _newItem = null;
+        }
+
+        public Result(final ResultStatus status, final Throwable throwable, final T newItem) {
+            _status = status;
+            _throwable = throwable;
+            _newItem = newItem;
+        }
+
+        public ResultStatus status() {
+            return _status;
+        }
+
+        public Throwable throwable() {
+            return _throwable;
+        }
+
+        public T newItem() {
+            return _newItem;
+        }
+    }
+
+    Result<T> handleItem(T item);
+}
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
new file mode 100644
index 0000000..0b45bbb
--- /dev/null
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc.common;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+
+import java.util.*;
+
+import static org.apache.solr.crossdc.common.SensitivePropRedactionUtils.propertyRequiresRedaction;
+import static org.apache.solr.crossdc.common.SensitivePropRedactionUtils.redactPropertyIfNecessary;
+
+public class KafkaCrossDcConf extends CrossDcConf {
+
+  public static final String DEFAULT_BATCH_SIZE_BYTES = "2097152";
+  public static final String DEFAULT_BUFFER_MEMORY_BYTES = "536870912";
+  public static final String DEFAULT_LINGER_MS = "30";
+  public static final String DEFAULT_REQUEST_TIMEOUT = "60000";
+  public static final String DEFAULT_MAX_REQUEST_SIZE = "5242880";
+  public static final String DEFAULT_ENABLE_DATA_COMPRESSION = "none";
+  public static final String DEFAULT_SLOW_SEND_THRESHOLD= "1000";
+  public static final String DEFAULT_NUM_RETRIES = null; // by default, we control retries with DELIVERY_TIMEOUT_MS_DOC
+  private static final String DEFAULT_RETRY_BACKOFF_MS = "500";
+
+  private static final String DEFAULT_DELIVERY_TIMEOUT_MS = "120000";
+
+  public static final String DEFAULT_MAX_POLL_RECORDS = "500"; // same default as Kafka
+
+  private static final String DEFAULT_FETCH_MIN_BYTES = "512000";
+  private static final String DEFAULT_FETCH_MAX_WAIT_MS = "1000"; // Kafka default is 500
+
+  public static final String DEFAULT_FETCH_MAX_BYTES = "100663296";
+
+  public static final String DEFAULT_MAX_PARTITION_FETCH_BYTES = "33554432";
+
+  public static final String DEFAULT_PORT = "8090";
+
+  private static final String DEFAULT_GROUP_ID = "SolrCrossDCConsumer";
+
+
+  public static final String TOPIC_NAME = "topicName";
+
+  public static final String BOOTSTRAP_SERVERS = "bootstrapServers";
+
+  public static final String BATCH_SIZE_BYTES = "batchSizeBytes";
+
+  public static final String BUFFER_MEMORY_BYTES = "bufferMemoryBytes";
+
+  public static final String LINGER_MS = "lingerMs";
+
+  public static final String REQUEST_TIMEOUT_MS = "requestTimeoutMS";
+
+  public static final String MAX_REQUEST_SIZE_BYTES = "maxRequestSizeBytes";
+
+  public static final String ENABLE_DATA_COMPRESSION = "enableDataCompression";
+
+  public static final String SLOW_SUBMIT_THRESHOLD_MS = "slowSubmitThresholdMs";
+
+  public static final String NUM_RETRIES = "numRetries";
+
+  public static final String RETRY_BACKOFF_MS = "retryBackoffMs";
+
+  public static final String DELIVERY_TIMEOUT_MS = "retryBackoffMs";
+
+  public static final String FETCH_MIN_BYTES = "fetchMinBytes";
+
+  public static final String FETCH_MAX_WAIT_MS = "fetchMaxWaitMS";
+
+  public static final String MAX_POLL_RECORDS = "maxPollRecords";
+
+  public static final String FETCH_MAX_BYTES = "fetchMaxBytes";
+
+  public static final String MAX_PARTITION_FETCH_BYTES = "maxPartitionFetchBytes";
+
+  public static final String ZK_CONNECT_STRING = "zkConnectString";
+
+
+  public static final List<ConfigProperty> CONFIG_PROPERTIES;
+  private static final Map<String, ConfigProperty> CONFIG_PROPERTIES_MAP;
+
+  public static final List<ConfigProperty> SECURITY_CONFIG_PROPERTIES;
+
+  public static final String PORT = "port";
+
+  public static final String GROUP_ID = "groupId";
+
+
+
+  static {
+    List<ConfigProperty> configProperties = new ArrayList<>(
+        List.of(new ConfigProperty(TOPIC_NAME), new ConfigProperty(BOOTSTRAP_SERVERS),
+            new ConfigProperty(BATCH_SIZE_BYTES, DEFAULT_BATCH_SIZE_BYTES),
+            new ConfigProperty(BUFFER_MEMORY_BYTES, DEFAULT_BUFFER_MEMORY_BYTES),
+            new ConfigProperty(LINGER_MS, DEFAULT_LINGER_MS),
+            new ConfigProperty(REQUEST_TIMEOUT_MS, DEFAULT_REQUEST_TIMEOUT),
+            new ConfigProperty(MAX_REQUEST_SIZE_BYTES, DEFAULT_MAX_REQUEST_SIZE),
+            new ConfigProperty(ENABLE_DATA_COMPRESSION, DEFAULT_ENABLE_DATA_COMPRESSION),
+            new ConfigProperty(SLOW_SUBMIT_THRESHOLD_MS, DEFAULT_SLOW_SEND_THRESHOLD),
+            new ConfigProperty(NUM_RETRIES, DEFAULT_NUM_RETRIES),
+            new ConfigProperty(RETRY_BACKOFF_MS, DEFAULT_RETRY_BACKOFF_MS),
+            new ConfigProperty(DELIVERY_TIMEOUT_MS, DEFAULT_DELIVERY_TIMEOUT_MS),
+
+            // Consumer only zkConnectString
+            new ConfigProperty(ZK_CONNECT_STRING, null),
+            new ConfigProperty(FETCH_MIN_BYTES, DEFAULT_FETCH_MIN_BYTES),
+            new ConfigProperty(FETCH_MAX_BYTES, DEFAULT_FETCH_MAX_BYTES),
+            new ConfigProperty(FETCH_MAX_WAIT_MS, DEFAULT_FETCH_MAX_WAIT_MS),
+
+            new ConfigProperty(MAX_PARTITION_FETCH_BYTES, DEFAULT_MAX_PARTITION_FETCH_BYTES),
+            new ConfigProperty(MAX_POLL_RECORDS, DEFAULT_MAX_POLL_RECORDS),
+            new ConfigProperty(PORT, DEFAULT_PORT),
+            new ConfigProperty(GROUP_ID, DEFAULT_GROUP_ID)));
+
+
+    SECURITY_CONFIG_PROPERTIES =
+        List.of(
+            new ConfigProperty(SslConfigs.SSL_PROTOCOL_CONFIG),
+            new ConfigProperty(SslConfigs.SSL_PROVIDER_CONFIG),
+            new ConfigProperty(SslConfigs.SSL_CIPHER_SUITES_CONFIG),
+            new ConfigProperty(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG),
+            new ConfigProperty(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG),
+            new ConfigProperty(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG),
+            new ConfigProperty(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG),
+            new ConfigProperty(SslConfigs.SSL_KEY_PASSWORD_CONFIG),
+            new ConfigProperty(SslConfigs.SSL_KEYSTORE_KEY_CONFIG),
+            new ConfigProperty(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG),
+            new ConfigProperty(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG),
+            new ConfigProperty(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG),
+            new ConfigProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG),
+            new ConfigProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG),
+            new ConfigProperty(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG),
+            new ConfigProperty(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG),
+            new ConfigProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG),
+            new ConfigProperty(SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG),
+
+            new ConfigProperty(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG),
+
+
+            // From Common and Admin Client Security
+            new ConfigProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG),
+            new ConfigProperty(AdminClientConfig.SECURITY_PROVIDERS_CONFIG)
+        );
+
+    configProperties.addAll(SECURITY_CONFIG_PROPERTIES);
+    CONFIG_PROPERTIES = Collections.unmodifiableList(configProperties);
+
+    Map<String, ConfigProperty> configPropertiesMap =
+        new HashMap<String, ConfigProperty>(CONFIG_PROPERTIES.size());
+    for (ConfigProperty prop : CONFIG_PROPERTIES) {
+      configPropertiesMap.put(prop.getKey(), prop);
+    }
+    CONFIG_PROPERTIES_MAP = configPropertiesMap;
+  }
+
+  private final Map<String, Object> properties;
+
+  public KafkaCrossDcConf(Map<String, Object> properties) {
+    List<String> nullValueKeys = new ArrayList<String>();
+    properties.forEach((k, v) -> {
+      if (v == null) {
+        nullValueKeys.add(k);
+      }
+    });
+    nullValueKeys.forEach(properties::remove);
+    this.properties = properties;
+  }
+
+  public static void addSecurityProps(KafkaCrossDcConf conf, Properties kafkaConsumerProps) {
+    for (ConfigProperty property : SECURITY_CONFIG_PROPERTIES) {
+      String val = conf.get(property.getKey());
+      if (val != null) {
+        kafkaConsumerProps.put(property.getKey(), val);
+      }
+    }
+  }
+
+  public String get(String property) {
+    return CONFIG_PROPERTIES_MAP.get(property).getValue(properties);
+  }
+
+  public Integer getInt(String property) {
+    ConfigProperty prop = CONFIG_PROPERTIES_MAP.get(property);
+    if (prop == null) {
+      throw new IllegalArgumentException("Property not found key=" + property);
+    }
+    return prop.getValueAsInt(properties);
+  }
+
+  public Boolean getBool(String property) {
+    ConfigProperty prop = CONFIG_PROPERTIES_MAP.get(property);
+    if (prop == null) {
+      throw new IllegalArgumentException("Property not found key=" + property);
+    }
+    return prop.getValueAsBoolean(properties);
+  }
+  
+  public Map<String,Object> getAdditionalProperties() {
+    Map<String, Object> additional = new HashMap<>(properties);
+    for (ConfigProperty configProperty : CONFIG_PROPERTIES) {
+      additional.remove(configProperty.getKey());
+    }
+    Map<String, Object> integerProperties = new HashMap<>();
+    additional.forEach((key, v) -> {
+      try {
+        int intVal = Integer.parseInt((String) v);
+        integerProperties.put(key.toString(), intVal);
+      } catch (NumberFormatException ignored) {
+
+      }
+    });
+    additional.putAll(integerProperties);
+    return additional;
+  }
+
+  public static void readZkProps(Map<String,Object> properties, Properties zkProps) {
+    Map<Object, Object> zkPropsUnprocessed = new HashMap<>(zkProps);
+    for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
+      if (properties.get(configKey.getKey()) == null || ((String)properties.get(configKey.getKey())).isBlank()) {
+        properties.put(configKey.getKey(), (String) zkProps.getProperty(
+            configKey.getKey()));
+        zkPropsUnprocessed.remove(configKey.getKey());
+      }
+    }
+    zkPropsUnprocessed.forEach((key, val) -> {
+      if (properties.get(key) == null) {
+        properties.put((String) key, (String) val);
+      }
+    });
+  }
+
+  @Override public String toString() {
+    StringBuilder sb = new StringBuilder(128);
+    for (ConfigProperty configProperty : CONFIG_PROPERTIES) {
+      if (properties.get(configProperty.getKey()) != null) {
+        final String printablePropertyValue = redactPropertyIfNecessary(configProperty.getKey(),
+                String.valueOf(properties.get(configProperty.getKey())));
+        sb.append(configProperty.getKey()).append("=").append(printablePropertyValue).append(",");
+      }
+    }
+    if (sb.length() > 0) {
+      sb.setLength(sb.length() - 1);
+    }
+
+    return "KafkaCrossDcConf{" + sb + "}";
+  }
+}
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
new file mode 100644
index 0000000..df48c60
--- /dev/null
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc.common;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.solr.crossdc.common.KafkaCrossDcConf.SLOW_SUBMIT_THRESHOLD_MS;
+
+public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
+
+    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+    private final KafkaCrossDcConf conf;
+    private final Producer<String, MirroredSolrRequest> producer;
+
+    public KafkaMirroringSink(final KafkaCrossDcConf conf) {
+        // Create Kafka Mirroring Sink
+        this.conf = conf;
+        this.producer = initProducer();
+    }
+
+    @Override
+    public void submit(MirroredSolrRequest request) throws MirroringException {
+        if (log.isDebugEnabled()) {
+            log.debug("About to submit a MirroredSolrRequest");
+        }
+
+        final long enqueueStartNanos = System.nanoTime();
+
+        // Create Producer record
+        try {
+
+            producer.send(new ProducerRecord<>(conf.get(KafkaCrossDcConf.TOPIC_NAME), request), (metadata, exception) -> {
+                if (exception != null) {
+                    log.error("Failed adding update to CrossDC queue! request=" + request.getSolrRequest(), exception);
+                }
+            });
+
+            long lastSuccessfulEnqueueNanos = System.nanoTime();
+            // Record time since last successful enqueue as 0
+            long elapsedTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - enqueueStartNanos);
+            // Update elapsed time
+
+            if (elapsedTimeMillis > conf.getInt(SLOW_SUBMIT_THRESHOLD_MS)) {
+                slowSubmitAction(request, elapsedTimeMillis);
+            }
+        } catch (Exception e) {
+            // We are intentionally catching all exceptions, the expected exception form this function is {@link MirroringException}
+            String message = "Unable to enqueue request " + request + ", configured retries is" + conf.getInt(KafkaCrossDcConf.NUM_RETRIES) +
+                " and configured max delivery timeout in ms is " + conf.getInt(KafkaCrossDcConf.DELIVERY_TIMEOUT_MS);
+            log.error(message, e);
+            throw new MirroringException(message, e);
+        }
+    }
+
+    /**
+     * Create and init the producer using {@link this#conf}
+     * All producer configs are listed here
+     * https://kafka.apache.org/documentation/#producerconfigs
+     *
+     * @return
+     */
+    private Producer<String, MirroredSolrRequest> initProducer() {
+        // Initialize and return Kafka producer
+        Properties kafkaProducerProps = new Properties();
+
+        log.info("Starting CrossDC Producer {}", conf);
+
+        kafkaProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
+
+        kafkaProducerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+        String retries = conf.get(KafkaCrossDcConf.NUM_RETRIES);
+        if (retries != null) {
+            kafkaProducerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.parseInt(retries));
+        }
+        kafkaProducerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, conf.getInt(KafkaCrossDcConf.RETRY_BACKOFF_MS));
+        kafkaProducerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, conf.getInt(KafkaCrossDcConf.MAX_REQUEST_SIZE_BYTES));
+        kafkaProducerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, conf.getInt(KafkaCrossDcConf.BATCH_SIZE_BYTES));
+        kafkaProducerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, conf.getInt(KafkaCrossDcConf.BUFFER_MEMORY_BYTES));
+        kafkaProducerProps.put(ProducerConfig.LINGER_MS_CONFIG, conf.getInt(KafkaCrossDcConf.LINGER_MS));
+        kafkaProducerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, conf.getInt(KafkaCrossDcConf.REQUEST_TIMEOUT_MS)); // should be less than time that causes consumer to be kicked out
+        kafkaProducerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, conf.get(KafkaCrossDcConf.ENABLE_DATA_COMPRESSION));
+
+        kafkaProducerProps.put("key.serializer", StringSerializer.class.getName());
+        kafkaProducerProps.put("value.serializer", MirroredSolrRequestSerializer.class.getName());
+
+        KafkaCrossDcConf.addSecurityProps(conf, kafkaProducerProps);
+
+        kafkaProducerProps.putAll(conf.getAdditionalProperties());
+
+        if (log.isDebugEnabled()) {
+            log.debug("Kafka Producer props={}", kafkaProducerProps);
+        }
+
+        ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader();
+        Thread.currentThread().setContextClassLoader(null);
+        Producer<String, MirroredSolrRequest> producer;
+        try {
+            producer = new KafkaProducer<>(kafkaProducerProps);
+        } finally {
+            Thread.currentThread().setContextClassLoader(originalContextClassLoader);
+        }
+        return producer;
+    }
+
+    private void slowSubmitAction(Object request, long elapsedTimeMillis) {
+        log.warn("Enqueuing the request to Kafka took more than {} millis. enqueueElapsedTime={}",
+                conf.get(KafkaCrossDcConf.SLOW_SUBMIT_THRESHOLD_MS),
+                elapsedTimeMillis);
+    }
+
+    @Override public void close() throws IOException {
+        if (producer != null) {
+            producer.flush();
+            producer.close();
+        }
+    }
+}
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequest.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequest.java
new file mode 100644
index 0000000..74dc785
--- /dev/null
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc.common;
+
+import org.apache.solr.client.solrj.SolrRequest;
+
+import java.util.*;
+
+/**
+ * Class to encapsulate a mirrored Solr request.
+ * This adds a timestamp and #attempts to the request for tracking purpopse.
+ */
+public class MirroredSolrRequest {
+    private final SolrRequest solrRequest;
+
+    // Attempts counter for processing the request
+    private int attempt = 1;
+
+    // Timestamp to track when this request was first written. This should be used to track the replication lag.
+    private long submitTimeNanos = 0;
+
+    public MirroredSolrRequest(final SolrRequest solrRequest) {
+        this(1, solrRequest, 0);
+    }
+
+    public MirroredSolrRequest(final int attempt, final SolrRequest solrRequest, final long submitTimeNanos) {
+        if (solrRequest == null) {
+            throw new NullPointerException("solrRequest cannot be null");
+        }
+        this.attempt = attempt;
+        this.solrRequest = solrRequest;
+        this.submitTimeNanos = submitTimeNanos;
+    }
+
+    public MirroredSolrRequest(final int attempt,
+                               final long submitTimeNanos) {
+        this.attempt = attempt;
+        this.submitTimeNanos = submitTimeNanos;
+        solrRequest = null;
+    }
+
+    public int getAttempt() {
+        return attempt;
+    }
+
+    public void setAttempt(final int attempt) {
+        this.attempt = attempt;
+    }
+
+    public SolrRequest getSolrRequest() {
+        return solrRequest;
+    }
+
+    public long getSubmitTimeNanos() {
+        return submitTimeNanos;
+    }
+
+    public void setSubmitTimeNanos(final long submitTimeNanos) {
+        this.submitTimeNanos = submitTimeNanos;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (!(o instanceof MirroredSolrRequest)) return false;
+
+        final MirroredSolrRequest that = (MirroredSolrRequest)o;
+
+        return Objects.equals(solrRequest, that.solrRequest);
+    }
+
+    @Override
+    public int hashCode() {
+        return solrRequest.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "MirroredSolrRequest{" +
+               "solrRequest=" + solrRequest +
+               ", attempt=" + attempt +
+               ", submitTimeNanos=" + submitTimeNanos +
+               '}';
+    }
+}
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
new file mode 100644
index 0000000..3f0684d
--- /dev/null
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc.common;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.params.MapSolrParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.JavaBinCodec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MirroredSolrRequestSerializer implements Serializer<MirroredSolrRequest>, Deserializer<MirroredSolrRequest> {
+
+    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+    private boolean isKey;
+    /**
+     * Configure this class.
+     *
+     * @param configs configs in key/value pairs
+     * @param isKey   whether is for key or value
+     */
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        this.isKey = isKey;
+    }
+
+    @Override
+    public MirroredSolrRequest deserialize(String topic, byte[] data) {
+        Map solrRequest;
+
+        JavaBinCodec codec = new JavaBinCodec();
+        ByteArrayInputStream bais = new ByteArrayInputStream(data);
+
+        try {
+            solrRequest = (Map) codec.unmarshal(bais);
+
+            if (log.isTraceEnabled()) {
+                log.trace("Deserialized class={} solrRequest={}", solrRequest.getClass().getName(),
+                    solrRequest);
+            }
+
+
+        } catch (Exception e) {
+            log.error("Exception unmarshalling JavaBin", e);
+            throw new RuntimeException(e);
+        }
+
+        UpdateRequest updateRequest = new UpdateRequest();
+        List docs = (List) solrRequest.get("docs");
+        if (docs != null) {
+            updateRequest.add(docs);
+        } else {
+            updateRequest.add("id", "1");
+            updateRequest.getDocumentsMap().clear();
+        }
+
+        List deletes = (List) solrRequest.get("deletes");
+        if (deletes != null) {
+            updateRequest.deleteById(deletes);
+        }
+
+        List deletesQuery = (List) solrRequest.get("deleteQuery");
+        if (deletesQuery != null) {
+            for (Object delQuery : deletesQuery) {
+                updateRequest.deleteByQuery((String) delQuery);
+            }
+        }
+
+
+        Map params = (Map) solrRequest.get("params");
+        if (params != null) {
+            updateRequest.setParams(ModifiableSolrParams.of(new MapSolrParams(params)));
+        }
+
+        return new MirroredSolrRequest(updateRequest);
+    }
+
+    /**
+     * Convert {@code data} into a byte array.
+     *
+     * @param topic topic associated with data
+     * @param request  MirroredSolrRequest that needs to be serialized
+     * @return serialized bytes
+     */
+    @Override
+    public byte[] serialize(String topic, MirroredSolrRequest request) {
+        // TODO: add checks
+        UpdateRequest solrRequest = (UpdateRequest) request.getSolrRequest();
+
+        if (log.isTraceEnabled()) {
+            log.trace("serialize request={} docs={} deletebyid={}", solrRequest,
+                solrRequest.getDocuments(), solrRequest.getDeleteById());
+        }
+
+        try (JavaBinCodec codec = new JavaBinCodec(null)) {
+
+            ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream();
+            Map map = new HashMap(4);
+            map.put("params", solrRequest.getParams());
+            map.put("docs", solrRequest.getDocuments());
+
+            // TODO
+            //map.put("deletes", solrRequest.getDeleteByIdMap());
+            map.put("deletes", solrRequest.getDeleteById());
+            map.put("deleteQuery", solrRequest.getDeleteQuery());
+
+            codec.marshal(map, baos);
+
+            return baos.byteArray();
+
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    /**
+     * Close this serializer.
+     * <p>
+     * This method must be idempotent as it may be called multiple times.
+     */
+    @Override
+    public final void close() {
+        Serializer.super.close();
+    }
+
+    private static final class ExposedByteArrayOutputStream extends ByteArrayOutputStream {
+        ExposedByteArrayOutputStream() {
+            super();
+        }
+
+        byte[] byteArray() {
+            return buf;
+        }
+    }
+}
diff --git a/crossdc-consumer/build.gradle b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroringException.java
similarity index 63%
copy from crossdc-consumer/build.gradle
copy to crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroringException.java
index 8c44542..ac633ec 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroringException.java
@@ -14,17 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-plugins {
-    id 'java'
-}
+package org.apache.solr.crossdc.common;
 
-description = 'Cross-DC Consumer package'
+/**
+ * Exception thrown during cross-dc mirroring
+ */
+public class MirroringException extends Exception {
+    public MirroringException() {
+        super();
+    }
 
-version '1.0-SNAPSHOT'
+    public MirroringException(String message) {
+        super(message);
+    }
 
-repositories {
-    mavenCentral()
-}
+    public MirroringException(String message, Throwable cause) {
+        super(message, cause);
+    }
 
-dependencies {
+    public MirroringException(Throwable cause) {
+        super(cause);
+    }
 }
diff --git a/crossdc-consumer/settings.gradle b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/RequestMirroringSink.java
similarity index 57%
copy from crossdc-consumer/settings.gradle
copy to crossdc-commons/src/main/java/org/apache/solr/crossdc/common/RequestMirroringSink.java
index 8c7c712..e8b2c69 100644
--- a/crossdc-consumer/settings.gradle
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/RequestMirroringSink.java
@@ -14,11 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.solr.crossdc.common;
 
-rootProject.name = 'crossdc-consumer'
+public interface RequestMirroringSink {
 
-description = 'Module for Apache Solr Cross DC Consumer'
-
-subprojects {
-    group "org.apache.solr.crossdc"
+    /**
+     * Submits a mirrored solr request to the appropriate end-point such that it is eventually received by solr
+     * A direct sink may simply use CloudSolrServer to process requests directly.
+     * A queueing sink will serialize the request and submit it to a queue for later consumption
+     * @param request the request that is to be mirrored
+     * @throws MirroringException Implementations may throw an exception
+     */
+    void submit(final MirroredSolrRequest request) throws MirroringException;
 }
diff --git a/crossdc-consumer/settings.gradle b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/ResubmitBackoffPolicy.java
similarity index 79%
copy from crossdc-consumer/settings.gradle
copy to crossdc-commons/src/main/java/org/apache/solr/crossdc/common/ResubmitBackoffPolicy.java
index 8c7c712..82babbf 100644
--- a/crossdc-consumer/settings.gradle
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/ResubmitBackoffPolicy.java
@@ -14,11 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.solr.crossdc.common;
 
-rootProject.name = 'crossdc-consumer'
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
 
-description = 'Module for Apache Solr Cross DC Consumer'
-
-subprojects {
-    group "org.apache.solr.crossdc"
+public interface ResubmitBackoffPolicy {
+  long getBackoffTimeMs(MirroredSolrRequest resubmitRequest);
 }
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/SensitivePropRedactionUtils.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/SensitivePropRedactionUtils.java
new file mode 100644
index 0000000..58675a0
--- /dev/null
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/SensitivePropRedactionUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc.common;
+
+import java.util.Locale;
+import java.util.Map;
+
+/**
+ * Helper functionality related to redacting arbitrary properties which may contain sensitive password information.
+ *
+ * Used primarily as a safeguard before logging out configuration properties.  Redaction logic heavily depends on string
+ * matching against elements of the {@link #PATTERNS_REQUIRING_REDACTION_LOWERCASE} block-list.
+ */
+public class SensitivePropRedactionUtils {
+    private static final String[] PATTERNS_REQUIRING_REDACTION_LOWERCASE = new String[] {"password", "credentials"};
+    private static final String REDACTED_STRING = "<REDACTED>";
+
+    public static boolean propertyRequiresRedaction(String propName) {
+        final String propNameLowercase = propName.toLowerCase(Locale.ROOT);
+        for (String pattern : PATTERNS_REQUIRING_REDACTION_LOWERCASE) {
+            if (propNameLowercase.contains(pattern)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * Redacts a property value if necessary, and returns the result.
+     *
+     * Redaction only occurs when the property name matches a pattern on the
+     * {@link #PATTERNS_REQUIRING_REDACTION_LOWERCASE} block-list.
+     *
+     * @param propName the name or key of the property being considered for redaction
+     * @param propValue the value of the property under consideration; returned verbatim if redaction is not necessary.
+     */
+    public static String redactPropertyIfNecessary(String propName, String propValue) {
+        return propertyRequiresRedaction(propName) ? REDACTED_STRING : propValue;
+    }
+
+    public static String flattenAndRedactForLogging(Map<String, Object> properties) {
+        final StringBuilder sb = new StringBuilder();
+        sb.append("{");
+        for (Map.Entry<String, Object> entry : properties.entrySet()) {
+            final Object printablePropValue = redactPropertyIfNecessary(entry.getKey(), String.valueOf(entry.getValue()));
+            sb.append(entry.getKey()).append("=").append(printablePropValue).append(", ");
+        }
+        sb.append("}");
+        return sb.toString();
+    }
+}
diff --git a/crossdc-consumer/settings.gradle b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/SolrExceptionUtil.java
similarity index 62%
copy from crossdc-consumer/settings.gradle
copy to crossdc-commons/src/main/java/org/apache/solr/crossdc/common/SolrExceptionUtil.java
index 8c7c712..47dced2 100644
--- a/crossdc-consumer/settings.gradle
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/SolrExceptionUtil.java
@@ -14,11 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.solr.crossdc.common;
 
-rootProject.name = 'crossdc-consumer'
+import org.apache.solr.common.SolrException;
 
-description = 'Module for Apache Solr Cross DC Consumer'
-
-subprojects {
-    group "org.apache.solr.crossdc"
+public class SolrExceptionUtil {
+    public static SolrException asSolrException(final Exception e) {
+        SolrException solrException = null;
+        if (e.getCause() instanceof SolrException) {
+            solrException = (SolrException) e.getCause();
+        } else if (e instanceof SolrException) {
+            solrException = (SolrException) e;
+        }
+        return solrException;
+    }
 }
diff --git a/crossdc-consumer/build.gradle b/crossdc-consumer/build.gradle
index 8c44542..466bbfa 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-consumer/build.gradle
@@ -15,16 +15,54 @@
  * limitations under the License.
  */
 plugins {
-    id 'java'
+    id 'application'
 }
 
 description = 'Cross-DC Consumer package'
 
-version '1.0-SNAPSHOT'
-
 repositories {
     mavenCentral()
 }
 
+application {
+    mainClass = 'org.apache.solr.crossdc.consumer.Consumer'
+}
+
 dependencies {
+    implementation group: 'org.apache.solr', name: 'solr-solrj', version: '8.11.2'
+    implementation project(path: ':crossdc-commons', configuration: 'shadow')
+
+    implementation 'io.dropwizard.metrics:metrics-core:4.2.9'
+    implementation 'org.slf4j:slf4j-api:1.7.36'
+    implementation 'org.eclipse.jetty:jetty-http:9.4.41.v20210516'
+    implementation 'org.eclipse.jetty:jetty-server:9.4.41.v20210516'
+    implementation 'org.eclipse.jetty:jetty-servlet:9.4.41.v20210516'
+    implementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.17.2'
+    runtimeOnly ('com.google.protobuf:protobuf-java-util:3.19.2')
+    runtimeOnly ('commons-codec:commons-codec:1.13')
+    testImplementation 'org.hamcrest:hamcrest:2.2'
+    testImplementation 'junit:junit:4.13.2'
+    testImplementation('org.mockito:mockito-core:4.3.1', {
+        exclude group: "net.bytebuddy", module: "byte-buddy-agent"
+    })
+
+    testImplementation  project(':crossdc-producer')
+
+    testImplementation group: 'org.apache.solr', name: 'solr-core', version: '8.11.2'
+    testImplementation group: 'org.apache.solr', name: 'solr-test-framework', version: '8.11.2'
+    testImplementation 'org.apache.kafka:kafka_2.13:2.8.1'
+    testImplementation 'org.apache.kafka:kafka-streams:2.8.1'
+    testImplementation 'org.apache.kafka:kafka_2.13:2.8.1:test'
+    testImplementation 'org.apache.kafka:kafka-streams:2.8.1:test'
+}
+
+test {
+    jvmArgs '-Djava.security.egd=file:/dev/./urandom'
+}
+
+tasks.withType(Tar){
+    duplicatesStrategy = DuplicatesStrategy.EXCLUDE
 }
+tasks.withType(Zip){
+    duplicatesStrategy = DuplicatesStrategy.EXCLUDE
+}
\ No newline at end of file
diff --git a/crossdc-consumer/gradle.properties b/crossdc-consumer/gradle.properties
new file mode 100644
index 0000000..0df7afe
--- /dev/null
+++ b/crossdc-consumer/gradle.properties
@@ -0,0 +1,2 @@
+group=org.apache.solr
+version=0.1-SNAPSHOT
\ No newline at end of file
diff --git a/crossdc-consumer/gradlew b/crossdc-consumer/gradlew
old mode 100755
new mode 100644
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
new file mode 100644
index 0000000..08749b0
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc.consumer;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.crossdc.common.ConfigProperty;
+import org.apache.solr.crossdc.common.CrossDcConf;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
+import org.apache.solr.crossdc.common.SensitivePropRedactionUtils;
+import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
+import org.eclipse.jetty.server.Server;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.solr.crossdc.common.KafkaCrossDcConf.*;
+
+// Cross-DC Consumer main class
+public class Consumer {
+
+    private static final boolean enabled = true;
+
+    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+    private Server server;
+    private CrossDcConsumer crossDcConsumer;
+
+
+    public void start() {
+        start(new HashMap<>());
+    }
+
+    public void start(Map<String,Object> properties ) {
+
+        for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
+            String val = System.getProperty(configKey.getKey());
+            if (val != null) {
+                properties.put(configKey.getKey(), val);
+            }
+        }
+
+        log.info("Consumer startup config properties before adding additional properties from Zookeeper={}",
+                SensitivePropRedactionUtils.flattenAndRedactForLogging(properties));
+
+        String zkConnectString = (String) properties.get("zkConnectString");
+        if (zkConnectString == null) {
+            throw new IllegalArgumentException("zkConnectString not specified for producer");
+        }
+
+        try (SolrZkClient client = new SolrZkClient(zkConnectString, 15000)) {
+
+            try {
+                if (client.exists(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
+                    CrossDcConf.CROSSDC_PROPERTIES), true)) {
+                    byte[] data = client.getData(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
+                        CrossDcConf.CROSSDC_PROPERTIES), null, null, true);
+                    Properties zkProps = new Properties();
+                    zkProps.load(new ByteArrayInputStream(data));
+
+                    KafkaCrossDcConf.readZkProps(properties, zkProps);
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
+            } catch (Exception e) {
+                throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+            }
+        }
+
+        String bootstrapServers = (String) properties.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS);
+        if (bootstrapServers == null) {
+            throw new IllegalArgumentException("bootstrapServers not specified for Consumer");
+        }
+
+        String topicName = (String) properties.get(TOPIC_NAME);
+        if (topicName == null) {
+            throw new IllegalArgumentException("topicName not specified for Consumer");
+        }
+
+        //server = new Server();
+        //ServerConnector connector = new ServerConnector(server);
+        //connector.setPort(port);
+        //server.setConnectors(new Connector[] {connector})
+        KafkaCrossDcConf conf = new KafkaCrossDcConf(properties);
+        crossDcConsumer = getCrossDcConsumer(conf);
+
+        // Start consumer thread
+
+        log.info("Starting CrossDC Consumer {}", conf);
+
+        /**
+         * ExecutorService to manage the cross-dc consumer threads.
+         */
+        ExecutorService consumerThreadExecutor = Executors.newSingleThreadExecutor();
+        consumerThreadExecutor.submit(crossDcConsumer);
+
+        // Register shutdown hook
+        Thread shutdownHook = new Thread(() -> System.out.println("Shutting down consumers!"));
+        Runtime.getRuntime().addShutdownHook(shutdownHook);
+    }
+
+    private CrossDcConsumer getCrossDcConsumer(KafkaCrossDcConf conf) {
+        return new KafkaCrossDcConsumer(conf);
+    }
+
+    public static void main(String[] args) {
+
+        Consumer consumer = new Consumer();
+        consumer.start();
+    }
+
+    public final void shutdown() {
+        if (crossDcConsumer != null) {
+            crossDcConsumer.shutdown();
+        }
+    }
+
+    /**
+     * Abstract class for defining cross-dc consumer
+     */
+    public abstract static class CrossDcConsumer implements Runnable {
+        SolrMessageProcessor messageProcessor;
+        abstract void shutdown();
+
+    }
+
+}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
new file mode 100644
index 0000000..451253b
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -0,0 +1,269 @@
+package org.apache.solr.crossdc.consumer;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.SharedMetricRegistries;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.common.util.IOUtils;
+import org.apache.solr.crossdc.common.*;
+import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Class to run the consumer thread for Kafka. This also contains the implementation for retries and
+ * resubmitting to the queue in case of temporary failures.
+ */
+public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final MetricRegistry metrics = SharedMetricRegistries.getOrCreate("metrics");
+
+  private final KafkaConsumer<String, MirroredSolrRequest> consumer;
+  private final KafkaMirroringSink kafkaMirroringSink;
+
+  private final static int KAFKA_CONSUMER_POLL_TIMEOUT_MS = 5000;
+  private final String topicName;
+  private final SolrMessageProcessor messageProcessor;
+
+  private final CloudSolrClient solrClient;
+
+  /**
+   * @param conf The Kafka consumer configuration
+   */
+  public KafkaCrossDcConsumer(KafkaCrossDcConf conf) {
+    this.topicName = conf.get(KafkaCrossDcConf.TOPIC_NAME);
+
+    final Properties kafkaConsumerProps = new Properties();
+
+    kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
+
+    kafkaConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, conf.get(KafkaCrossDcConf.GROUP_ID));
+
+    kafkaConsumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, conf.getInt(KafkaCrossDcConf.MAX_POLL_RECORDS));
+
+    kafkaConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+    kafkaConsumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+    kafkaConsumerProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MIN_BYTES));
+    kafkaConsumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MAX_WAIT_MS));
+
+    kafkaConsumerProps.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.FETCH_MAX_BYTES));
+    kafkaConsumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, conf.getInt(KafkaCrossDcConf.MAX_PARTITION_FETCH_BYTES));
+
+    KafkaCrossDcConf.addSecurityProps(conf, kafkaConsumerProps);
+
+    kafkaConsumerProps.putAll(conf.getAdditionalProperties());
+
+    solrClient =
+        new CloudSolrClient.Builder(Collections.singletonList(conf.get(KafkaCrossDcConf.ZK_CONNECT_STRING)),
+            Optional.empty()).build();
+
+    messageProcessor = new SolrMessageProcessor(solrClient, resubmitRequest -> 0L);
+
+    log.info("Creating Kafka consumer with configuration {}", kafkaConsumerProps);
+    consumer = createConsumer(kafkaConsumerProps);
+
+    // Create producer for resubmitting failed requests
+    log.info("Creating Kafka resubmit producer");
+    this.kafkaMirroringSink = new KafkaMirroringSink(conf);
+    log.info("Created Kafka resubmit producer");
+
+  }
+
+  public static KafkaConsumer<String, MirroredSolrRequest> createConsumer(Properties properties) {
+    return new KafkaConsumer<>(properties, new StringDeserializer(),
+        new MirroredSolrRequestSerializer());
+  }
+
+  /**
+   * This is where the magic happens.
+   * 1. Polls and gets the packets from the queue
+   * 2. Extract the MirroredSolrRequest objects
+   * 3. Send the request to the MirroredSolrRequestHandler that has the processing, retry, error handling logic.
+   */
+  @Override public void run() {
+    log.info("About to start Kafka consumer thread, topic={}", topicName);
+
+    try {
+
+      consumer.subscribe(Collections.singleton(topicName));
+
+      while (pollAndProcessRequests()) {
+        //no-op within this loop: everything is done in pollAndProcessRequests method defined above.
+      }
+
+      log.info("Closed kafka consumer. Exiting now.");
+      try {
+        consumer.close();
+      } catch (Exception e) {
+        log.warn("Failed to close kafka consumer", e);
+      }
+
+      try {
+        kafkaMirroringSink.close();
+      } catch (Exception e) {
+        log.warn("Failed to close kafka mirroring sink", e);
+      }
+    } finally {
+      IOUtils.closeQuietly(solrClient);
+    }
+
+  }
+
+  /**
+   * Polls and processes the requests from Kafka. This method returns false when the consumer needs to be
+   * shutdown i.e. when there's a wakeup exception.
+   */
+  boolean pollAndProcessRequests() {
+    log.trace("Entered pollAndProcessRequests loop");
+    try {
+      ConsumerRecords<String, MirroredSolrRequest> records =
+          consumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_TIMEOUT_MS));
+      for (TopicPartition partition : records.partitions()) {
+        List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords =
+            records.records(partition);
+        try {
+          for (ConsumerRecord<String, MirroredSolrRequest> record : partitionRecords) {
+            if (log.isTraceEnabled()) {
+              log.trace("Fetched record from topic={} partition={} key={} value={}", record.topic(),
+                  record.partition(), record.key(), record.value());
+            }
+            IQueueHandler.Result<MirroredSolrRequest> result = messageProcessor.handleItem(record.value());
+            switch (result.status()) {
+              case FAILED_RESUBMIT:
+                // currently, we use a strategy taken from an earlier working implementation
+                // of just resubmitting back to the queue - note that in rare cases, this could
+                // allow for incorrect update reorders
+                if (log.isTraceEnabled()) {
+                  log.trace("result=failed-resubmit");
+                }
+                metrics.counter("failed-resubmit").inc();
+                kafkaMirroringSink.submit(record.value());
+                break;
+              case HANDLED:
+                // no-op
+                if (log.isTraceEnabled()) {
+                  log.trace("result=handled");
+                }
+                metrics.counter("handled").inc();
+                break;
+              case NOT_HANDLED_SHUTDOWN:
+                if (log.isTraceEnabled()) {
+                  log.trace("result=nothandled_shutdown");
+                }
+                metrics.counter("nothandled_shutdown").inc();
+              case FAILED_RETRY:
+                log.error("Unexpected response while processing request. We never expect {}.",
+                    result.status().toString());
+                metrics.counter("failed-retry").inc();
+                break;
+              default:
+                if (log.isTraceEnabled()) {
+                  log.trace("result=no matching case");
+                }
+                // no-op
+            }
+          }
+          updateOffset(partition, partitionRecords);
+
+          // handleItem sets the thread interrupt, let's exit if there has been an interrupt set
+          if (Thread.currentThread().isInterrupted()) {
+            log.info("Kafka Consumer thread interrupted, shutting down Kafka consumer.");
+            return false;
+          }
+        } catch (MirroringException e) {
+          // We don't really know what to do here, so it's wiser to just break out.
+          log.error(
+              "Mirroring exception occurred while resubmitting to Kafka. We are going to stop the consumer thread now.",
+              e);
+          return false;
+        } catch (WakeupException e) {
+          log.info("Caught wakeup exception, shutting down KafkaSolrRequestConsumer.");
+          return false;
+        } catch (Exception e) {
+          // If there is any exception returned by handleItem, then reset the offset.
+
+          if (e instanceof ClassCastException || e instanceof SerializationException) { // TODO: optional
+            log.error("Non retryable error", e);
+            break;
+          }
+          log.warn("Exception occurred in Kafka consumer thread, but we will continue.", e);
+          resetOffsetForPartition(partition, partitionRecords);
+          break;
+        }
+      }
+    } catch (WakeupException e) {
+      log.info("Caught wakeup exception, shutting down KafkaSolrRequestConsumer");
+      return false;
+    } catch (Exception e) {
+
+      if (e instanceof ClassCastException || e instanceof SerializationException) { // TODO: optional
+        log.error("Non retryable error", e);
+        return false;
+      }
+
+      log.error("Exception occurred in Kafka consumer thread, but we will continue.", e);
+    }
+    return true;
+  }
+
+  /**
+   * Reset the local offset so that the consumer reads the records from Kafka again.
+   *
+   * @param partition        The TopicPartition to reset the offset for
+   * @param partitionRecords PartitionRecords for the specified partition
+   */
+  private void resetOffsetForPartition(TopicPartition partition,
+      List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords) {
+    if (log.isTraceEnabled()) {
+      log.trace("Resetting offset to: {}", partitionRecords.get(0).offset());
+    }
+    long resetOffset = partitionRecords.get(0).offset();
+    consumer.seek(partition, resetOffset);
+  }
+
+  /**
+   * Logs and updates the commit point for the partition that has been processed.
+   *
+   * @param partition        The TopicPartition to update the offset for
+   * @param partitionRecords PartitionRecords for the specified partition
+   */
+  private void updateOffset(TopicPartition partition,
+      List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords) {
+    long nextOffset = partitionRecords.get(partitionRecords.size() - 1).offset() + 1;
+
+    if (log.isTraceEnabled()) {
+      log.trace("Updated offset for topic={} partition={} to offset={}", partition.topic(),
+          partition.partition(), nextOffset);
+    }
+
+    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(nextOffset)));
+  }
+
+  /**
+   * Shutdown the Kafka consumer by calling wakeup.
+   */
+  public final void shutdown() {
+    log.info("Shutdown called on KafkaCrossDcConsumer");
+    try {
+      solrClient.close();
+    } catch (Exception e) {
+      log.warn("Exception closing Solr client on shutdown");
+    }
+    consumer.wakeup();
+  }
+
+}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/helpers/SendDummyUpdates.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/helpers/SendDummyUpdates.java
new file mode 100644
index 0000000..818369c
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/helpers/SendDummyUpdates.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc.helpers;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
+
+import java.util.Properties;
+
+public class SendDummyUpdates {
+    public static void main(String[] args) {
+        String TOPIC = "Trial";
+        Properties properties = new Properties();
+        properties.put("bootstrap.servers", "localhost:9092");
+        properties.put("acks", "all");
+        properties.put("retries", 3);
+        properties.put("batch.size", 16384);
+        properties.put("buffer.memory", 33554432);
+        properties.put("linger.ms", 1);
+        properties.put("key.serializer", StringSerializer.class.getName());
+        properties.put("value.serializer", MirroredSolrRequestSerializer.class.getName());
+        Producer<String, MirroredSolrRequest> producer = new KafkaProducer(properties);
+        UpdateRequest updateRequest = new UpdateRequest();
+        updateRequest.add("id", String.valueOf(System.currentTimeMillis()));
+        MirroredSolrRequest mirroredSolrRequest = new MirroredSolrRequest(updateRequest);
+        System.out.println("About to send producer record");
+        producer.send(new ProducerRecord(TOPIC, mirroredSolrRequest));
+        System.out.println("Sent producer record");
+        producer.close();
+        System.out.println("Closed producer");
+    }
+}
diff --git a/crossdc-consumer/build.gradle b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/MessageProcessor.java
similarity index 64%
copy from crossdc-consumer/build.gradle
copy to crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/MessageProcessor.java
index 8c44542..0620e69 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/MessageProcessor.java
@@ -14,17 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-plugins {
-    id 'java'
-}
+package org.apache.solr.crossdc.messageprocessor;
 
-description = 'Cross-DC Consumer package'
+import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
 
-version '1.0-SNAPSHOT'
+public abstract class MessageProcessor {
 
-repositories {
-    mavenCentral()
-}
+  private final ResubmitBackoffPolicy resubmitBackoffPolicy;
+
+  public MessageProcessor(ResubmitBackoffPolicy resubmitBackoffPolicy) {
+    this.resubmitBackoffPolicy = resubmitBackoffPolicy;
+  }
 
-dependencies {
+  public ResubmitBackoffPolicy getResubmitBackoffPolicy() {
+    return resubmitBackoffPolicy;
+  }
 }
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
new file mode 100644
index 0000000..b1a428e
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc.messageprocessor;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.SharedMetricRegistries;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.SolrResponseBase;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.SolrInputField;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
+import org.apache.solr.crossdc.common.CrossDcConstants;
+import org.apache.solr.crossdc.common.IQueueHandler;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.SolrExceptionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Message processor implements all the logic to process a MirroredSolrRequest.
+ * It handles:
+ *  1. Sending the update request to Solr
+ *  2. Discarding or retrying failed requests
+ *  3. Flagging requests for resubmission by the underlying consumer implementation.
+ */
+public class SolrMessageProcessor extends MessageProcessor implements IQueueHandler<MirroredSolrRequest>  {
+    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+    private final MetricRegistry metrics = SharedMetricRegistries.getOrCreate("metrics");
+
+    final CloudSolrClient client;
+
+    private static final String VERSION_FIELD = "_version_";
+
+    public SolrMessageProcessor(CloudSolrClient client, ResubmitBackoffPolicy resubmitBackoffPolicy) {
+        super(resubmitBackoffPolicy);
+        this.client = client;
+    }
+
+    @Override
+    public Result<MirroredSolrRequest> handleItem(MirroredSolrRequest mirroredSolrRequest) {
+        connectToSolrIfNeeded();
+
+        // preventCircularMirroring(mirroredSolrRequest); TODO: isn't this handled by the mirroring handler?
+
+        return processMirroredRequest(mirroredSolrRequest);
+    }
+
+    private Result<MirroredSolrRequest> processMirroredRequest(MirroredSolrRequest request) {
+        final Result<MirroredSolrRequest> result = handleSolrRequest(request);
+        // Back-off before returning
+        backoffIfNeeded(result);
+        return result;
+    }
+
+    private Result<MirroredSolrRequest> handleSolrRequest(MirroredSolrRequest mirroredSolrRequest) {
+
+        SolrRequest request = mirroredSolrRequest.getSolrRequest();
+        final SolrParams requestParams = request.getParams();
+
+        if (log.isTraceEnabled()) {
+            log.trace("handleSolrRequest params={}", requestParams);
+        }
+
+        // TODO: isn't this handled by the mirroring handler?
+//        final String shouldMirror = requestParams.get("shouldMirror");
+//
+//        if ("false".equalsIgnoreCase(shouldMirror)) {
+//            log.warn("Skipping mirrored request because shouldMirror is set to false. request={}", requestParams);
+//            return new Result<>(ResultStatus.FAILED_NO_RETRY);
+//        }
+        logFirstAttemptLatency(mirroredSolrRequest);
+
+        Result<MirroredSolrRequest> result;
+        try {
+            prepareIfUpdateRequest(request);
+            logRequest(request);
+            result = processMirroredSolrRequest(request);
+        } catch (Exception e) {
+            result = handleException(mirroredSolrRequest, e);
+        }
+
+        return result;
+    }
+
+    private Result<MirroredSolrRequest> handleException(MirroredSolrRequest mirroredSolrRequest, Exception e) {
+        final SolrException solrException = SolrExceptionUtil.asSolrException(e);
+        logIf4xxException(solrException);
+        if (!isRetryable(e)) {
+            logFailure(mirroredSolrRequest, e, solrException, false);
+            return new Result<>(ResultStatus.FAILED_NO_RETRY, e);
+        } else {
+            logFailure(mirroredSolrRequest, e, solrException, true);
+            mirroredSolrRequest.setAttempt(mirroredSolrRequest.getAttempt() + 1);
+            maybeBackoff(solrException);
+            return new Result<>(ResultStatus.FAILED_RESUBMIT, e, mirroredSolrRequest);
+        }
+    }
+
+    private void maybeBackoff(SolrException solrException) {
+        if (solrException == null) {
+            return;
+        }
+        long sleepTimeMs = 1000;
+        String backoffTimeSuggested = solrException.getMetadata("backoffTime-ms");
+        if (backoffTimeSuggested != null && !"0".equals(backoffTimeSuggested)) {
+            // If backoff policy is not configured (returns "0" by default), then sleep 1 second. If configured, do as it says.
+            sleepTimeMs = Math.max(1, Long.parseLong(backoffTimeSuggested));
+        }
+        log.info("Consumer backoff. sleepTimeMs={}", sleepTimeMs);
+        uncheckedSleep(sleepTimeMs);
+    }
+
+    private boolean isRetryable(Exception e) {
+        SolrException se = SolrExceptionUtil.asSolrException(e);
+
+        if (se != null) {
+            int code = se.code();
+            if (code == SolrException.ErrorCode.CONFLICT.code) {
+                return false;
+            }
+        }
+        // Everything other than version conflict exceptions should be retried.
+        log.warn("Unexpected exception, will resubmit the request to the queue", e);
+        return true;
+    }
+
+    private void logIf4xxException(SolrException solrException) {
+        // This shouldn't really happen but if it doesn, it most likely requires fixing in the return code from Solr.
+        if (solrException != null && 400 <= solrException.code() && solrException.code() < 500) {
+            log.error("Exception occurred with 4xx response. {}", solrException.code(), solrException);
+        }
+    }
+
+    private void logFailure(MirroredSolrRequest mirroredSolrRequest, Exception e, SolrException solrException, boolean retryable) {
+        // This shouldn't really happen.
+        if (solrException != null && 400 <= solrException.code() && solrException.code() < 500) {
+            log.error("Exception occurred with 4xx response. {}", solrException.code(), solrException);
+            return;
+        }
+
+        log.warn("Resubmitting mirrored solr request after failure errorCode={} retryCount={}", solrException != null ? solrException.code() : -1, mirroredSolrRequest.getAttempt(), e);
+    }
+
+    /**
+     *
+     * Process the SolrRequest. If not, this method throws an exception.
+     */
+    private Result<MirroredSolrRequest> processMirroredSolrRequest(SolrRequest request) throws Exception {
+        if (log.isTraceEnabled()) {
+            log.trace("Sending request to Solr at ZK address={} with params {}", client.getZkStateReader().getZkClient().getZkServerAddress(), request.getParams());
+        }
+        Result<MirroredSolrRequest> result;
+
+        SolrResponseBase response = (SolrResponseBase) request.process(client);
+
+        int status = response.getStatus();
+
+        if (log.isTraceEnabled()) {
+            log.trace("result status={}", status);
+        }
+
+        if (status != 0) {
+            metrics.counter("processedErrors").inc();
+            throw new SolrException(SolrException.ErrorCode.getErrorCode(status), "response=" + response);
+        }
+
+        metrics.counter("processed").inc();
+
+        result = new Result<>(ResultStatus.HANDLED);
+        return result;
+    }
+
+    private void logRequest(SolrRequest request) {
+        if(request instanceof UpdateRequest) {
+            final StringBuilder rmsg = new StringBuilder(64);
+            rmsg.append("Submitting update request");
+            if(((UpdateRequest) request).getDeleteById() != null) {
+                final int numDeleteByIds = ((UpdateRequest) request).getDeleteById().size();
+                metrics.counter("numDeleteByIds").inc(numDeleteByIds);
+                rmsg.append(" numDeleteByIds=").append(numDeleteByIds);
+            }
+            if(((UpdateRequest) request).getDocuments() != null) {
+                final int numUpdates = ((UpdateRequest) request).getDocuments().size();
+                metrics.counter("numUpdates").inc(numUpdates);
+                rmsg.append(" numUpdates=").append(numUpdates);
+            }
+            if(((UpdateRequest) request).getDeleteQuery() != null) {
+                final int numDeleteByQuery = ((UpdateRequest) request).getDeleteQuery().size();
+                metrics.counter("numDeleteByQuery").inc(numDeleteByQuery);
+                rmsg.append(" numDeleteByQuery=").append(numDeleteByQuery);
+            }
+            log.info(rmsg.toString());
+        }
+    }
+
+    /**
+     * Clean up the Solr request to be submitted locally.
+     * @param request The SolrRequest to be cleaned up for submitting locally.
+     */
+    private void prepareIfUpdateRequest(SolrRequest request) {
+        if (request instanceof UpdateRequest) {
+            // Remove versions from add requests
+            UpdateRequest updateRequest = (UpdateRequest) request;
+
+            List<SolrInputDocument> documents = updateRequest.getDocuments();
+            if (log.isTraceEnabled()) {
+                log.trace("update request docs={} deletebyid={} deletebyquery={}", documents, updateRequest.getDeleteById(), updateRequest.getDeleteQuery());
+            }
+            if (documents != null) {
+                for (SolrInputDocument doc : documents) {
+                    sanitizeDocument(doc);
+                }
+            }
+            removeVersionFromDeleteByIds(updateRequest);
+        }
+    }
+
+    /**
+     * Strips fields that are problematic for replication.
+     */
+    private void sanitizeDocument(SolrInputDocument doc) {
+        SolrInputField field = doc.getField(VERSION_FIELD);
+        if (log.isTraceEnabled()) {
+            log.trace("Removing {} value={}", VERSION_FIELD,
+                field == null ? "null" : field.getValue());
+        }
+        doc.remove(VERSION_FIELD);
+    }
+
+    private void removeVersionFromDeleteByIds(UpdateRequest updateRequest) {
+        if (log.isTraceEnabled()) {
+            log.trace("remove versions from deletebyids");
+        }
+        Map<String, Map<String, Object>> deleteIds = updateRequest.getDeleteByIdMap();
+        if (deleteIds != null) {
+            for (Map<String, Object> idParams : deleteIds.values()) {
+                if (idParams != null) {
+                    idParams.put(UpdateRequest.VER, null);
+                }
+            }
+        }
+    }
+
+    private void logFirstAttemptLatency(MirroredSolrRequest mirroredSolrRequest) {
+        // Only record the latency of the first attempt, essentially measuring the latency from submitting on the
+        // primary side until the request is eligible to be consumed on the buddy side (or vice versa).
+        if (mirroredSolrRequest.getAttempt() == 1) {
+            final long latency = System.currentTimeMillis() - TimeUnit.NANOSECONDS.toMillis(mirroredSolrRequest.getSubmitTimeNanos());
+            log.debug("First attempt latency = {}", latency);
+            metrics.timer("latency").update(latency, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    /**
+     * Adds {@link CrossDcConstants#SHOULD_MIRROR}=false to the params if it's not already specified.
+     * Logs a warning if it is specified and NOT set to false. (i.e. circular mirror may occur)
+     *
+     * @param mirroredSolrRequest MirroredSolrRequest object that is being processed.
+     */
+    private void preventCircularMirroring(MirroredSolrRequest mirroredSolrRequest) {
+        if (mirroredSolrRequest.getSolrRequest() instanceof UpdateRequest) {
+            UpdateRequest updateRequest = (UpdateRequest) mirroredSolrRequest.getSolrRequest();
+            ModifiableSolrParams params = updateRequest.getParams();
+            String shouldMirror = (params == null ? null : params.get(CrossDcConstants.SHOULD_MIRROR));
+            if (shouldMirror == null) {
+                log.warn(CrossDcConstants.SHOULD_MIRROR + " param is missing - setting to false. Request={}", mirroredSolrRequest);
+                updateRequest.setParam(CrossDcConstants.SHOULD_MIRROR, "false");
+            } else if (!"false".equalsIgnoreCase(shouldMirror)) {
+                log.warn(CrossDcConstants.SHOULD_MIRROR + " param equal to " + shouldMirror);
+            }
+        } else {
+            SolrParams params = mirroredSolrRequest.getSolrRequest().getParams();
+            String shouldMirror = (params == null ? null : params.get(CrossDcConstants.SHOULD_MIRROR));
+            if (shouldMirror == null) {
+                if (params instanceof ModifiableSolrParams) {
+                    log.warn("{} {}", CrossDcConstants.SHOULD_MIRROR, "param is missing - setting to false");
+                    ((ModifiableSolrParams) params).set(CrossDcConstants.SHOULD_MIRROR, "false");
+                } else {
+                    log.warn("{} {}", CrossDcConstants.SHOULD_MIRROR, "param is missing and params are not modifiable");
+                }
+            } else if (!"false".equalsIgnoreCase(shouldMirror)) {
+                log.warn("{} {}", CrossDcConstants.SHOULD_MIRROR, "param is present and set to " + shouldMirror);
+            }
+        }
+    }
+
+    private void connectToSolrIfNeeded() {
+        // Don't try to consume anything if we can't connect to the solr server
+        boolean connected = false;
+        while (!connected) {
+            try {
+                client.connect(); // volatile null-check if already connected
+                connected = true;
+            } catch (Exception e) {
+                log.error("Unable to connect to solr server. Not consuming.", e);
+                uncheckedSleep(5000);
+            }
+        }
+    }
+
+    public void uncheckedSleep(long millis) {
+        try {
+            Thread.sleep(millis);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void backoffIfNeeded(Result<MirroredSolrRequest> result) {
+        if (result.status().equals(ResultStatus.FAILED_RESUBMIT)) {
+            final long backoffMs = getResubmitBackoffPolicy().getBackoffTimeMs(result.newItem());
+            if (backoffMs > 0L) {
+                try {
+                    Thread.sleep(backoffMs);
+                } catch (final InterruptedException ex) {
+                    // we're about to exit the method anyway, so just log this and return the item. Let the caller
+                    // handle it.
+                    log.warn("Thread interrupted while backing off before retry");
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+    }
+
+}
diff --git a/crossdc-consumer/src/resources/configs/cloud-minimal/conf/schema.xml b/crossdc-consumer/src/resources/configs/cloud-minimal/conf/schema.xml
new file mode 100644
index 0000000..bc4676c
--- /dev/null
+++ b/crossdc-consumer/src/resources/configs/cloud-minimal/conf/schema.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<schema name="minimal" version="1.1">
+    <fieldType name="boolean" class="solr.BoolField" sortMissingLast="true"/>
+    <fieldType name="string" class="solr.StrField" docValues="true"/>
+    <fieldType name="int" class="org.apache.solr.schema.IntPointField" docValues="true" omitNorms="true"
+               positionIncrementGap="0"/>
+    <fieldType name="long" class="org.apache.solr.schema.LongPointField" docValues="true" omitNorms="true"
+               positionIncrementGap="0"/>
+    <fieldType name="float" class="org.apache.solr.schema.FloatPointField" docValues="true" omitNorms="true"
+               positionIncrementGap="0"/>
+    <fieldType name="double" class="org.apache.solr.schema.DoublePointField" docValues="true" omitNorms="true"
+               positionIncrementGap="0"/>
+    <fieldType name="date" class="org.apache.solr.schema.DatePointField" docValues="true" omitNorms="true"
+               positionIncrementGap="0"/>
+    <fieldType name="text" class="solr.TextField">
+        <analyzer>
+            <tokenizer class="solr.StandardTokenizerFactory"/>
+            <filter class="solr.LowerCaseFilterFactory"/>
+        </analyzer>
+    </fieldType>
+
+    <!-- for versioning -->
+    <field name="_version_" type="long" indexed="true" stored="true"/>
+    <field name="_root_" type="string" indexed="true" stored="true" multiValued="false" required="false"/>
+    <field name="id" type="string" indexed="true" stored="true"/>
+    <field name="text" type="text" indexed="true" stored="false"/>
+
+    <dynamicField name="*_b" type="boolean" indexed="true" stored="true"/>
+    <dynamicField name="*_s" type="string" indexed="true" stored="false"/>
+    <dynamicField name="*_t" type="text" indexed="true" stored="false"/>
+    <dynamicField name="*_i" type="int" indexed="false" stored="false"/>
+    <dynamicField name="*_l" type="long" indexed="false" stored="false"/>
+    <dynamicField name="*_f" type="float" indexed="false" stored="false"/>
+    <dynamicField name="*_d" type="double" indexed="false" stored="false"/>
+    <dynamicField name="*_dt" type="date" indexed="false" stored="false"/>
+
+    <uniqueKey>id</uniqueKey>
+</schema>
diff --git a/crossdc-consumer/src/resources/configs/cloud-minimal/conf/solrconfig.xml b/crossdc-consumer/src/resources/configs/cloud-minimal/conf/solrconfig.xml
new file mode 100644
index 0000000..20caf3b
--- /dev/null
+++ b/crossdc-consumer/src/resources/configs/cloud-minimal/conf/solrconfig.xml
@@ -0,0 +1,112 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- Minimal solrconfig.xml with /select, /admin and /update only -->
+
+<config>
+
+  <dataDir>${solr.data.dir:}</dataDir>
+
+  <directoryFactory name="DirectoryFactory"
+                    class="${directoryFactory:solr.NRTCachingDirectoryFactory}"/>
+  <schemaFactory class="ClassicIndexSchemaFactory"/>
+
+  <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+  <indexConfig>
+    <mergePolicyFactory class="${mergePolicyFactory:org.apache.solr.index.TieredMergePolicyFactory}">
+      <int name="maxMergeAtOnce">${maxMergeAtOnce:10}</int>
+      <int name="segmentsPerTier">${segmentsPerTier:10}</int>
+      <double name="noCFSRatio">${noCFSRatio:.1}</double>
+    </mergePolicyFactory>
+
+    <useCompoundFile>${useCompoundFile:true}</useCompoundFile>
+
+    <ramBufferSizeMB>${ramBufferSizeMB:160}</ramBufferSizeMB>
+    <maxBufferedDocs>${maxBufferedDocs:250000}</maxBufferedDocs>     <!-- Force the common case to flush by doc count  -->
+    <!-- <ramPerThreadHardLimitMB>60</ramPerThreadHardLimitMB> -->
+
+    <!-- <mergeScheduler class="org.apache.lucene.index.ConcurrentMergeScheduler">
+      <int name="maxThreadCount">6</int>
+      <int name="maxMergeCount">8</int>
+      <bool name="ioThrottle">false</bool>
+    </mergeScheduler> -->
+
+    <writeLockTimeout>1000</writeLockTimeout>
+    <commitLockTimeout>10000</commitLockTimeout>
+
+    <!-- this sys property is not set by SolrTestCaseJ4 because almost all tests should
+         use the single process lockType for speed - but tests that explicitly need
+         to vary the lockType canset it as needed.
+    -->
+    <lockType>${lockType:single}</lockType>
+
+    <infoStream>${infostream:false}</infoStream>
+
+  </indexConfig>
+
+  <updateHandler class="solr.DirectUpdateHandler2">
+    <commitWithin>
+      <softCommit>${commitwithin.softcommit:true}</softCommit>
+    </commitWithin>
+    <autoCommit>
+      <maxTime>${autoCommit.maxTime:60000}</maxTime>
+    </autoCommit>
+    <updateLog class="${ulog:solr.UpdateLog}" enable="${enable.update.log:true}"/>
+  </updateHandler>
+
+  <requestHandler name="/select" class="solr.SearchHandler">
+    <lst name="defaults">
+      <str name="echoParams">explicit</str>
+      <str name="indent">true</str>
+      <str name="df">text</str>
+    </lst>
+
+  </requestHandler>
+
+  <query>
+    <queryResultCache
+            enabled="${queryResultCache.enabled:false}"
+            class="${queryResultCache.class:solr.CaffeineCache}"
+            size="${queryResultCache.size:0}"
+            initialSize="${queryResultCache.initialSize:0}"
+            autowarmCount="${queryResultCache.autowarmCount:0}"/>
+      <documentCache
+              enabled="${documentCache.enabled:false}"
+              class="${documentCache.class:solr.CaffeineCache}"
+              size="${documentCache.size:0}"
+              initialSize="${documentCache.initialSize:0}"
+              autowarmCount="${documentCache.autowarmCount:0}"/>
+      <filterCache
+              enabled ="${filterCache.enabled:false}"
+              class="${filterCache.class:solr.CaffeineCache}"
+              size="${filterCache.size:1}"
+              initialSize="${filterCache.initialSize:1}"
+              autowarmCount="${filterCache.autowarmCount:0}"
+              async="${filterCache.async:false}"/>
+    <cache name="myPerSegmentCache"
+           enabled="${myPerSegmentCache.enabled:false}"
+           class="${myPerSegmentCache.class:solr.CaffeineCache}"
+           size="${myPerSegmentCache.size:0}"
+           initialSize="${myPerSegmentCache.initialSize:0}"
+           autowarmCount="${myPerSegmentCache.autowarmCount:0}"/>
+  </query>
+
+</config>
+
diff --git a/crossdc-consumer/src/resources/log4j2.xml b/crossdc-consumer/src/resources/log4j2.xml
new file mode 100644
index 0000000..96f69f1
--- /dev/null
+++ b/crossdc-consumer/src/resources/log4j2.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<!-- We're configuring testing to be synchronous due to "logging polution", see SOLR-13268 -->
+<Configuration>
+  <Appenders>
+    <Console name="STDERR" target="SYSTEM_ERR">
+      <PatternLayout>
+        <Pattern>
+          %maxLen{%-4r %-5p (%t) [%notEmpty{n:%X{node_name}}%notEmpty{ c:%X{collection}}%notEmpty{ s:%X{shard}}%notEmpty{ r:%X{replica}}%notEmpty{ x:%X{core}}%notEmpty{ t:%X{trace_id}}] %c{1.} %m%notEmpty{
+          =>%ex{short}}}{10240}%n
+        </Pattern>
+      </PatternLayout>
+    </Console>
+  </Appenders>
+  <Loggers>
+    <!-- Use <AsyncLogger/<AsyncRoot and <Logger/<Root for asynchronous logging or synchonous logging respectively -->
+    <Logger name="org.apache.zookeeper" level="WARN"/>
+    <Logger name="org.apache.hadoop" level="WARN"/>
+    <Logger name="org.apache.directory" level="WARN"/>
+    <Logger name="org.apache.solr.hadoop" level="INFO"/>
+    <Logger name="org.eclipse.jetty" level="INFO"/>
+
+    <Root level="INFO">
+      <AppenderRef ref="STDERR"/>
+    </Root>
+  </Loggers>
+</Configuration>
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
new file mode 100644
index 0000000..ebd102d
--- /dev/null
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
@@ -0,0 +1,91 @@
+package org.apache.solr.crossdc;
+
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.util.Map;
+
+import static org.mockito.Mockito.spy;
+
+public class SimpleSolrIntegrationTest extends SolrCloudTestCase {
+  static final String VERSION_FIELD = "_version_";
+
+
+  protected static volatile MiniSolrCloudCluster cluster1;
+
+  private static SolrMessageProcessor processor;
+
+  @BeforeClass
+  public static void beforeSimpleSolrIntegrationTest() throws Exception {
+
+    cluster1 =
+        new SolrCloudTestCase.Builder(2, createTempDir())
+            .addConfig("conf", getFile("src/test/resources/configs/cloud-minimal/conf").toPath())
+            .configure();
+
+    String collection = "collection1";
+    CloudSolrClient cloudClient1 = cluster1.getSolrClient();
+
+    processor = new SolrMessageProcessor(cloudClient1, null);
+
+    CollectionAdminRequest.Create create =
+        CollectionAdminRequest.createCollection(collection, "conf", 1, 1);
+    cloudClient1.request(create);
+    cluster1.waitForActiveCollection(collection, 1, 1);
+
+    cloudClient1.setDefaultCollection(collection);
+  }
+
+  @AfterClass
+  public static void afterSimpleSolrIntegrationTest() throws Exception {
+    if (cluster1 != null) {
+      cluster1.shutdown();
+    }
+  }
+
+  public void testDocumentSanitization() {
+    UpdateRequest request = spy(new UpdateRequest());
+
+    // Add docs with and without version
+    request.add(new SolrInputDocument() {
+      {
+        setField("id", 1);
+        setField(VERSION_FIELD, 1);
+      }
+    });
+    request.add(new SolrInputDocument() {
+      {
+        setField("id", 2);
+      }
+    });
+
+    // Delete by id with and without version
+    request.deleteById("1");
+    request.deleteById("2", 10L);
+
+    request.setParam("shouldMirror", "true");
+
+    processor.handleItem(new MirroredSolrRequest(request));
+
+    // After processing, check that all version fields are stripped
+    for (SolrInputDocument doc : request.getDocuments()) {
+      assertNull("Doc still has version", doc.getField(VERSION_FIELD));
+    }
+
+    // Check versions in delete by id
+    for (Map<String, Object> idParams : request.getDeleteByIdMap().values()) {
+      if (idParams != null) {
+        idParams.put(UpdateRequest.VER, null);
+        assertNull("Delete still has version", idParams.get(UpdateRequest.VER));
+      }
+    }
+  }
+}
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java
new file mode 100644
index 0000000..9dc7073
--- /dev/null
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.crossdc;
+
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.crossdc.common.IQueueHandler;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
+import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.*;
+
+public class TestMessageProcessor {
+    static final String VERSION_FIELD = "_version_";
+
+    static class NoOpResubmitBackoffPolicy implements ResubmitBackoffPolicy {
+        @Override
+        public long getBackoffTimeMs(MirroredSolrRequest resubmitRequest) {
+            return 0;
+        }
+    }
+
+    @Mock
+    private CloudSolrClient solrClient;
+    private SolrMessageProcessor processor;
+
+    private ResubmitBackoffPolicy backoffPolicy = spy(new NoOpResubmitBackoffPolicy());
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+
+        processor = Mockito.spy(new SolrMessageProcessor(solrClient,
+                backoffPolicy));
+        Mockito.doNothing().when(processor).uncheckedSleep(anyLong());
+    }
+
+    @Test
+    public void testDocumentSanitization() {
+        UpdateRequest request = spy(new UpdateRequest());
+
+        // Add docs with and without version
+        request.add(new SolrInputDocument() {
+            {
+                setField("id", 1);
+                setField(VERSION_FIELD, 1);
+            }
+        });
+        request.add(new SolrInputDocument() {
+            {
+                setField("id", 2);
+            }
+        });
+
+        // Delete by id with and without version
+        request.deleteById("1");
+        request.deleteById("2", 10L);
+
+        request.setParam("shouldMirror", "true");
+        // The response is irrelevant, but it will fail because mocked server returns null when processing
+        processor.handleItem(new MirroredSolrRequest(request));
+
+        // After processing, check that all version fields are stripped
+        for (SolrInputDocument doc : request.getDocuments()) {
+            assertNull("Doc still has version", doc.getField(VERSION_FIELD));
+        }
+
+        // Check versions in delete by id
+        for (Map<String, Object> idParams : request.getDeleteByIdMap().values()) {
+            if (idParams != null) {
+                idParams.put(UpdateRequest.VER, null);
+                assertNull("Delete still has version", idParams.get(UpdateRequest.VER));
+            }
+        }
+    }
+
+    @Test
+    @Ignore // needs to be modified to fully support request.process
+    public void testSuccessNoBackoff() throws Exception {
+        final UpdateRequest request = spy(new UpdateRequest());
+
+        when(solrClient.request(eq(request), anyString())).thenReturn(new NamedList<>());
+
+        when(request.process(eq(solrClient))).thenReturn(new UpdateResponse());
+
+        processor.handleItem(new MirroredSolrRequest(request));
+
+        verify(backoffPolicy, times(0)).getBackoffTimeMs(any());
+    }
+
+    @Test
+    public void testClientErrorNoRetries() throws Exception {
+        final UpdateRequest request = new UpdateRequest();
+        request.setParam("shouldMirror", "true");
+        when(solrClient.request(eq(request), anyString())).thenThrow(
+                new SolrException(
+                        SolrException.ErrorCode.BAD_REQUEST, "err msg"));
+
+        IQueueHandler.Result<MirroredSolrRequest> result = processor.handleItem(new MirroredSolrRequest(request));
+        assertEquals(IQueueHandler.ResultStatus.FAILED_RESUBMIT, result.status());
+    }
+}
diff --git a/crossdc-consumer/src/test/resources/configs/cloud-minimal/conf/schema.xml b/crossdc-consumer/src/test/resources/configs/cloud-minimal/conf/schema.xml
new file mode 100644
index 0000000..bc4676c
--- /dev/null
+++ b/crossdc-consumer/src/test/resources/configs/cloud-minimal/conf/schema.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<schema name="minimal" version="1.1">
+    <fieldType name="boolean" class="solr.BoolField" sortMissingLast="true"/>
+    <fieldType name="string" class="solr.StrField" docValues="true"/>
+    <fieldType name="int" class="org.apache.solr.schema.IntPointField" docValues="true" omitNorms="true"
+               positionIncrementGap="0"/>
+    <fieldType name="long" class="org.apache.solr.schema.LongPointField" docValues="true" omitNorms="true"
+               positionIncrementGap="0"/>
+    <fieldType name="float" class="org.apache.solr.schema.FloatPointField" docValues="true" omitNorms="true"
+               positionIncrementGap="0"/>
+    <fieldType name="double" class="org.apache.solr.schema.DoublePointField" docValues="true" omitNorms="true"
+               positionIncrementGap="0"/>
+    <fieldType name="date" class="org.apache.solr.schema.DatePointField" docValues="true" omitNorms="true"
+               positionIncrementGap="0"/>
+    <fieldType name="text" class="solr.TextField">
+        <analyzer>
+            <tokenizer class="solr.StandardTokenizerFactory"/>
+            <filter class="solr.LowerCaseFilterFactory"/>
+        </analyzer>
+    </fieldType>
+
+    <!-- for versioning -->
+    <field name="_version_" type="long" indexed="true" stored="true"/>
+    <field name="_root_" type="string" indexed="true" stored="true" multiValued="false" required="false"/>
+    <field name="id" type="string" indexed="true" stored="true"/>
+    <field name="text" type="text" indexed="true" stored="false"/>
+
+    <dynamicField name="*_b" type="boolean" indexed="true" stored="true"/>
+    <dynamicField name="*_s" type="string" indexed="true" stored="false"/>
+    <dynamicField name="*_t" type="text" indexed="true" stored="false"/>
+    <dynamicField name="*_i" type="int" indexed="false" stored="false"/>
+    <dynamicField name="*_l" type="long" indexed="false" stored="false"/>
+    <dynamicField name="*_f" type="float" indexed="false" stored="false"/>
+    <dynamicField name="*_d" type="double" indexed="false" stored="false"/>
+    <dynamicField name="*_dt" type="date" indexed="false" stored="false"/>
+
+    <uniqueKey>id</uniqueKey>
+</schema>
diff --git a/crossdc-consumer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml b/crossdc-consumer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml
new file mode 100644
index 0000000..20caf3b
--- /dev/null
+++ b/crossdc-consumer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml
@@ -0,0 +1,112 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- Minimal solrconfig.xml with /select, /admin and /update only -->
+
+<config>
+
+  <dataDir>${solr.data.dir:}</dataDir>
+
+  <directoryFactory name="DirectoryFactory"
+                    class="${directoryFactory:solr.NRTCachingDirectoryFactory}"/>
+  <schemaFactory class="ClassicIndexSchemaFactory"/>
+
+  <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+  <indexConfig>
+    <mergePolicyFactory class="${mergePolicyFactory:org.apache.solr.index.TieredMergePolicyFactory}">
+      <int name="maxMergeAtOnce">${maxMergeAtOnce:10}</int>
+      <int name="segmentsPerTier">${segmentsPerTier:10}</int>
+      <double name="noCFSRatio">${noCFSRatio:.1}</double>
+    </mergePolicyFactory>
+
+    <useCompoundFile>${useCompoundFile:true}</useCompoundFile>
+
+    <ramBufferSizeMB>${ramBufferSizeMB:160}</ramBufferSizeMB>
+    <maxBufferedDocs>${maxBufferedDocs:250000}</maxBufferedDocs>     <!-- Force the common case to flush by doc count  -->
+    <!-- <ramPerThreadHardLimitMB>60</ramPerThreadHardLimitMB> -->
+
+    <!-- <mergeScheduler class="org.apache.lucene.index.ConcurrentMergeScheduler">
+      <int name="maxThreadCount">6</int>
+      <int name="maxMergeCount">8</int>
+      <bool name="ioThrottle">false</bool>
+    </mergeScheduler> -->
+
+    <writeLockTimeout>1000</writeLockTimeout>
+    <commitLockTimeout>10000</commitLockTimeout>
+
+    <!-- this sys property is not set by SolrTestCaseJ4 because almost all tests should
+         use the single process lockType for speed - but tests that explicitly need
+         to vary the lockType canset it as needed.
+    -->
+    <lockType>${lockType:single}</lockType>
+
+    <infoStream>${infostream:false}</infoStream>
+
+  </indexConfig>
+
+  <updateHandler class="solr.DirectUpdateHandler2">
+    <commitWithin>
+      <softCommit>${commitwithin.softcommit:true}</softCommit>
+    </commitWithin>
+    <autoCommit>
+      <maxTime>${autoCommit.maxTime:60000}</maxTime>
+    </autoCommit>
+    <updateLog class="${ulog:solr.UpdateLog}" enable="${enable.update.log:true}"/>
+  </updateHandler>
+
+  <requestHandler name="/select" class="solr.SearchHandler">
+    <lst name="defaults">
+      <str name="echoParams">explicit</str>
+      <str name="indent">true</str>
+      <str name="df">text</str>
+    </lst>
+
+  </requestHandler>
+
+  <query>
+    <queryResultCache
+            enabled="${queryResultCache.enabled:false}"
+            class="${queryResultCache.class:solr.CaffeineCache}"
+            size="${queryResultCache.size:0}"
+            initialSize="${queryResultCache.initialSize:0}"
+            autowarmCount="${queryResultCache.autowarmCount:0}"/>
+      <documentCache
+              enabled="${documentCache.enabled:false}"
+              class="${documentCache.class:solr.CaffeineCache}"
+              size="${documentCache.size:0}"
+              initialSize="${documentCache.initialSize:0}"
+              autowarmCount="${documentCache.autowarmCount:0}"/>
+      <filterCache
+              enabled ="${filterCache.enabled:false}"
+              class="${filterCache.class:solr.CaffeineCache}"
+              size="${filterCache.size:1}"
+              initialSize="${filterCache.initialSize:1}"
+              autowarmCount="${filterCache.autowarmCount:0}"
+              async="${filterCache.async:false}"/>
+    <cache name="myPerSegmentCache"
+           enabled="${myPerSegmentCache.enabled:false}"
+           class="${myPerSegmentCache.class:solr.CaffeineCache}"
+           size="${myPerSegmentCache.size:0}"
+           initialSize="${myPerSegmentCache.initialSize:0}"
+           autowarmCount="${myPerSegmentCache.autowarmCount:0}"/>
+  </query>
+
+</config>
+
diff --git a/crossdc-consumer/src/test/resources/log4j2.xml b/crossdc-consumer/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..c63e736
--- /dev/null
+++ b/crossdc-consumer/src/test/resources/log4j2.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<!-- We're configuring testing to be synchronous due to "logging polution", see SOLR-13268 -->
+<Configuration>
+  <Appenders>
+    <Console name="STDERR" target="SYSTEM_ERR">
+      <PatternLayout>
+        <Pattern>
+          %maxLen{%-4r %-5p (%t) [%notEmpty{n:%X{node_name}}%notEmpty{ c:%X{collection}}%notEmpty{ s:%X{shard}}%notEmpty{ r:%X{replica}}%notEmpty{ x:%X{core}}%notEmpty{ t:%X{trace_id}}] %c{1.} %m%notEmpty{
+          =>%ex{short}}}{10240}%n
+        </Pattern>
+      </PatternLayout>
+    </Console>
+
+    <RollingRandomAccessFile
+            name="MainLogFile"
+            fileName="${sys:log.dir:-logs}/${sys:log.name:-solr}.log"
+            filePattern="${sys:log.dir:-logs}/${sys:log.name:-solr}.log.%i">
+      <PatternLayout>
+        <Pattern>
+          %maxLen{%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p (%t) [%notEmpty{c:%X{collection}}%notEmpty{ s:%X{shard}}%notEmpty{ r:%X{replica}}%notEmpty{ x:%X{core}}] %c{1.}
+          %m%notEmpty{ =>%ex{short}}}{10240}%n
+        </Pattern>
+      </PatternLayout>
+      <Policies>
+        <OnStartupTriggeringPolicy/>
+        <SizeBasedTriggeringPolicy size="128 MB"/>
+      </Policies>
+      <DefaultRolloverStrategy max="10"/>
+    </RollingRandomAccessFile>
+  </Appenders>
+  <Loggers>
+
+
+    <Logger name="kafka.server.KafkaConfig" level="WARN"/>
+    <Logger name="org.apache.kafka.clients.producer.ProducerConfig" level="WARN"/>
+    <Logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="WARN"/>
+    <Logger name="org.apache.kafka.clients.admin.AdminClientConfig" level="WARN"/>
+
+
+    <Logger name="org.apache.zookeeper" level="WARN"/>
+    <Logger name="org.apache.hadoop" level="WARN"/>
+    <Logger name="org.apache.directory" level="WARN"/>
+    <Logger name="org.apache.solr.hadoop" level="INFO"/>
+    <Logger name="org.eclipse.jetty" level="INFO"/>
+
+    <Root level="INFO">
+      <AppenderRef ref="MainLogFile"/>
+      <AppenderRef ref="STDERR"/>
+    </Root>
+  </Loggers>
+</Configuration>
diff --git a/crossdc-producer/build.gradle b/crossdc-producer/build.gradle
new file mode 100644
index 0000000..4a989e4
--- /dev/null
+++ b/crossdc-producer/build.gradle
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+plugins {
+    id 'java'
+    id 'com.github.johnrengelman.shadow' version '7.1.2'
+}
+
+description = 'Cross-DC Producer package'
+
+repositories {
+    mavenCentral()
+}
+
+configurations {
+    provided
+}
+
+sourceSets {
+    main { compileClasspath += configurations.provided }
+}
+
+dependencies {
+    implementation project(':crossdc-consumer')
+    implementation project(path: ':crossdc-commons', configuration: 'shadow')
+
+    provided  group: 'org.apache.solr', name: 'solr-core', version: '8.11.2'
+
+    testImplementation 'org.slf4j:slf4j-api'
+    testImplementation 'org.hamcrest:hamcrest:2.2'
+    testImplementation 'junit:junit:4.13.2'
+    testImplementation('org.mockito:mockito-core:4.3.1', {
+        exclude group: "net.bytebuddy", module: "byte-buddy-agent"
+    })
+    testImplementation group: 'org.apache.solr', name: 'solr-core', version: '8.11.2'
+    testImplementation group: 'org.apache.solr', name: 'solr-test-framework', version: '8.11.2'
+
+    testImplementation 'org.apache.kafka:kafka_2.13:2.8.1'
+    testImplementation 'org.apache.kafka:kafka-streams:2.8.1'
+    testImplementation 'org.apache.kafka:kafka_2.13:2.8.1:test'
+    testImplementation 'org.apache.kafka:kafka-streams:2.8.1:test'
+
+    testImplementation 'org.apache.kafka:kafka-clients:2.8.1:test'
+}
+
+jar.enabled = false
+
+shadowJar {
+    archiveBaseName.set('crossdc-producer')
+    configurations = [project.configurations.compileClasspath]
+}
+
+jar.dependsOn(shadowJar)
+
+artifacts {
+    shadowJar;
+}
+
+test {
+    jvmArgs '-Djava.security.egd=file:/dev/./urandom'
+}
diff --git a/crossdc-producer/gradle.properties b/crossdc-producer/gradle.properties
new file mode 100644
index 0000000..0df7afe
--- /dev/null
+++ b/crossdc-producer/gradle.properties
@@ -0,0 +1,2 @@
+group=org.apache.solr
+version=0.1-SNAPSHOT
\ No newline at end of file
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
new file mode 100644
index 0000000..df26c43
--- /dev/null
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.update.processor;
+
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.crossdc.common.KafkaMirroringSink;
+import org.apache.solr.crossdc.common.MirroringException;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.TimeUnit;
+
+public class KafkaRequestMirroringHandler implements RequestMirroringHandler {
+
+    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+    final KafkaMirroringSink sink;
+
+    public KafkaRequestMirroringHandler(KafkaMirroringSink sink) {
+        log.debug("create KafkaRequestMirroringHandler");
+        this.sink = sink;
+    }
+
+    /**
+     * When called, should handle submitting the request to the queue
+     *
+     * @param request
+     */
+    @Override
+    public void mirror(UpdateRequest request) throws MirroringException {
+        if (log.isTraceEnabled()) {
+            log.trace("submit update to sink docs={}, deletes={}, params={}", request.getDocuments(), request.getDeleteById(), request.getParams());
+        }
+        // TODO: Enforce external version constraint for consistent update replication (cross-cluster)
+        sink.submit(new MirroredSolrRequest(1, request, TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis())));
+    }
+}
diff --git a/crossdc-consumer/build.gradle b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringException.java
similarity index 63%
copy from crossdc-consumer/build.gradle
copy to crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringException.java
index 8c44542..6f00df9 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringException.java
@@ -14,17 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-plugins {
-    id 'java'
-}
+package org.apache.solr.update.processor;
 
-description = 'Cross-DC Consumer package'
+/**
+ * Wrapper class for Mirroring exceptions.
+ */
+public class MirroringException extends Exception {
+    public MirroringException() {
+        super();
+    }
 
-version '1.0-SNAPSHOT'
+    public MirroringException(String message) {
+        super(message);
+    }
 
-repositories {
-    mavenCentral()
-}
+    public MirroringException(String message, Throwable cause) {
+        super(message, cause);
+    }
 
-dependencies {
+    public MirroringException(Throwable cause) {
+        super(cause);
+    }
 }
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
new file mode 100644
index 0000000..74ee1eb
--- /dev/null
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
@@ -0,0 +1,372 @@
+package org.apache.solr.update.processor;
+
+import org.apache.http.client.HttpClient;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.SolrInputField;
+import org.apache.solr.common.cloud.*;
+import org.apache.solr.common.params.*;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.CommitUpdateCommand;
+import org.apache.solr.update.DeleteUpdateCommand;
+import org.apache.solr.update.RollbackUpdateCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
+
+public class MirroringUpdateProcessor extends UpdateRequestProcessor {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * Flag indicating whether this instance creates and submits a mirrored request. This override is
+   * necessary to prevent circular mirroring between coupled cluster running this processor.
+   */
+  private final boolean doMirroring;
+  private final RequestMirroringHandler requestMirroringHandler;
+
+  /**
+   * The mirrored request starts as null, gets created and appended to at each process() call,
+   * then submitted on finish().
+   */
+  private UpdateRequest mirrorRequest;
+  private long mirrorRequestBytes;
+  private final SolrParams mirrorParams;
+
+
+  /**
+   * Controls whether docs exceeding the max-size (and thus cannot be mirrored) are indexed locally.
+   */
+  private final boolean indexUnmirrorableDocs;
+  private final long maxMirroringBatchSizeBytes;
+
+
+  /**
+   * The distributed processor downstream from us so we can establish if we're running on a leader shard
+   */
+  //private DistributedUpdateProcessor distProc;
+
+  /**
+   * Distribution phase of the incoming requests
+   */
+  private DistributedUpdateProcessor.DistribPhase distribPhase;
+
+  public MirroringUpdateProcessor(final UpdateRequestProcessor next, boolean doMirroring,
+      final boolean indexUnmirrorableDocs,
+      final long maxMirroringBatchSizeBytes,
+      final SolrParams mirroredReqParams,
+      final DistributedUpdateProcessor.DistribPhase distribPhase,
+      final RequestMirroringHandler requestMirroringHandler) {
+    super(next);
+    this.doMirroring = doMirroring;
+    this.indexUnmirrorableDocs = indexUnmirrorableDocs;
+    this.maxMirroringBatchSizeBytes = maxMirroringBatchSizeBytes;
+    this.mirrorParams = mirroredReqParams;
+    this.distribPhase = distribPhase;
+    this.requestMirroringHandler = requestMirroringHandler;
+
+    // Find the downstream distributed update processor
+
+  }
+
+  private UpdateRequest createAndOrGetMirrorRequest() {
+    if (mirrorRequest == null) {
+      mirrorRequest = new UpdateRequest();
+      mirrorRequest.setParams(new ModifiableSolrParams(mirrorParams));
+      mirrorRequestBytes = 0L;
+    }
+    if (log.isDebugEnabled())
+      log.debug("createOrGetMirrorRequest={}",
+          mirrorRequest);
+    return mirrorRequest;
+  }
+
+  @Override public void processAdd(final AddUpdateCommand cmd) throws IOException {
+
+    final SolrInputDocument doc = cmd.getSolrInputDocument().deepCopy();
+    doc.removeField(CommonParams.VERSION_FIELD); // strip internal doc version
+    final long estimatedDocSizeInBytes = ObjectSizeEstimator.estimate(doc);
+    final boolean tooLargeForKafka = estimatedDocSizeInBytes > maxMirroringBatchSizeBytes;
+    if (tooLargeForKafka && !indexUnmirrorableDocs) {
+      log.warn("Skipping indexing of doc {} as it exceeds the doc-size limit ({} bytes) and is unmirrorable.", cmd.getPrintableId(), maxMirroringBatchSizeBytes);
+    } else {
+      super.processAdd(cmd); // let this throw to prevent mirroring invalid reqs
+    }
+
+    // submit only from the leader shards so we mirror each doc once
+    boolean isLeader = isLeader(cmd.getReq(),  cmd.getIndexedIdStr(), null, cmd.getSolrInputDocument());
+    if (doMirroring && isLeader) {
+      if (tooLargeForKafka) {
+        log.error("Skipping mirroring of doc {} because estimated size exceeds batch size limit {} bytes", cmd.getPrintableId(), maxMirroringBatchSizeBytes);
+      } else {
+        createAndOrGetMirrorRequest().add(doc, cmd.commitWithin, cmd.overwrite);
+        mirrorRequestBytes += estimatedDocSizeInBytes;
+      }
+    }
+
+    if (log.isDebugEnabled())
+      log.debug("processAdd isLeader={} cmd={}", isLeader, cmd);
+  }
+
+  @Override public void processDelete(final DeleteUpdateCommand cmd) throws IOException {
+    if (doMirroring && !cmd.isDeleteById() && !"*:*".equals(cmd.query)) {
+
+      CloudDescriptor cloudDesc =
+          cmd.getReq().getCore().getCoreDescriptor().getCloudDescriptor();
+      String collection = cloudDesc.getCollectionName();
+
+      HttpClient httpClient = cmd.getReq().getCore().getCoreContainer().getUpdateShardHandler().getDefaultHttpClient();
+
+      try (HttpSolrClient client =
+          new HttpSolrClient.Builder(cmd.getReq().getCore().getCoreContainer().getZkController().getBaseUrl()).withHttpClient(httpClient).build()) {
+
+        String uniqueField = cmd.getReq().getSchema().getUniqueKeyField().getName();
+
+        int rows = Integer.getInteger("solr.crossdc.dbq_rows", 1000);
+        SolrQuery q = new SolrQuery(cmd.query).setRows(rows).setSort(SolrQuery.SortClause.asc(uniqueField)).setFields(uniqueField);
+        String cursorMark = CursorMarkParams.CURSOR_MARK_START;
+
+        int cnt = 1;
+        boolean done = false;
+        while (!done) {
+          q.set(CursorMarkParams.CURSOR_MARK_PARAM, cursorMark);
+          QueryResponse rsp =
+              client.query(collection, q);
+          String nextCursorMark = rsp.getNextCursorMark();
+
+          if (log.isDebugEnabled()) {
+            log.debug("resp: cm={}, ncm={}, cnt={}, results={} ", cursorMark, nextCursorMark, cnt,
+                rsp.getResults());
+            cnt++;
+          }
+
+          processDBQResults(client, collection, uniqueField, rsp);
+          if (cursorMark.equals(nextCursorMark)) {
+            done = true;
+          }
+          cursorMark = nextCursorMark;
+        }
+      } catch (SolrServerException e) {
+        throw new SolrException(SERVER_ERROR, e);
+      }
+
+      return;
+    }
+    super.processDelete(cmd); // let this throw to prevent mirroring invalid requests
+
+    if (doMirroring) {
+      boolean isLeader = false;
+      if (cmd.isDeleteById()) {
+        // deleteById requests runs once per leader, so we just submit the request from the leader shard
+        isLeader = isLeader(cmd.getReq(),  ((DeleteUpdateCommand)cmd).getId(), null != cmd.getRoute() ? cmd.getRoute() : cmd.getReq().getParams().get(
+            ShardParams._ROUTE_), null);
+        if (isLeader) {
+          createAndOrGetMirrorRequest().deleteById(cmd.getId()); // strip versions from deletes
+        }
+        if (log.isDebugEnabled())
+          log.debug("processDelete doMirroring={} isLeader={} cmd={}", true, isLeader, cmd);
+      } else {
+        // DBQs are sent to each shard leader, so we mirror from the original node to only mirror once
+        // In general there's no way to guarantee that these run identically on the mirror since there are no
+        // external doc versions.
+        // TODO: Can we actually support this considering DBQs aren't versioned.
+
+        if (distribPhase == DistributedUpdateProcessor.DistribPhase.NONE) {
+          createAndOrGetMirrorRequest().deleteByQuery(cmd.query);
+        }
+        if (log.isDebugEnabled())
+          log.debug("processDelete doMirroring={} cmd={}", true, cmd);
+      }
+
+    }
+  }
+
+  private static void processDBQResults(SolrClient client, String collection, String uniqueField,
+      QueryResponse rsp)
+      throws SolrServerException, IOException {
+    SolrDocumentList results = rsp.getResults();
+    List<String> ids = new ArrayList<>(results.size());
+    results.forEach(entries -> {
+      String id = entries.getFirstValue(uniqueField).toString();
+      ids.add(id);
+    });
+    if (ids.size() > 0) {
+      client.deleteById(collection, ids);
+    }
+  }
+
+  private boolean isLeader(SolrQueryRequest req, String id, String route, SolrInputDocument doc) {
+    CloudDescriptor cloudDesc =
+        req.getCore().getCoreDescriptor().getCloudDescriptor();
+    String collection = cloudDesc.getCollectionName();
+    ClusterState clusterState =
+        req.getCore().getCoreContainer().getZkController().getClusterState();
+    DocCollection coll = clusterState.getCollection(collection);
+    Slice slice = coll.getRouter().getTargetSlice(id, doc, route, req.getParams(), coll);
+
+    if (slice == null) {
+      // No slice found.  Most strict routers will have already thrown an exception, so a null return is
+      // a signal to use the slice of this core.
+      // TODO: what if this core is not in the targeted collection?
+      String shardId = cloudDesc.getShardId();
+      slice = coll.getSlice(shardId);
+      if (slice == null) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No shard " + shardId + " in " + coll);
+      }
+    }
+    String shardId = slice.getName();
+    Replica leaderReplica = null;
+    try {
+      leaderReplica = req.getCore().getCoreContainer().getZkController().getZkStateReader().getLeaderRetry(collection, shardId);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+    }
+    return leaderReplica.getName().equals(cloudDesc.getCoreNodeName());
+  }
+
+  @Override public void processRollback(final RollbackUpdateCommand cmd) throws IOException {
+    super.processRollback(cmd);
+    // TODO: We can't/shouldn't support this ?
+  }
+
+  public void processCommit(CommitUpdateCommand cmd) throws IOException {
+    log.debug("process commit cmd={}", cmd);
+    if (next != null) next.processCommit(cmd);
+  }
+
+  @Override public final void finish() throws IOException {
+    super.finish();
+
+    if (doMirroring && mirrorRequest != null) {
+      // We are configured to mirror, but short-circuit on batches we already know will fail (because they cumulatively
+      // exceed the mirroring max-size)
+      if (mirrorRequestBytes > maxMirroringBatchSizeBytes) {
+        final String batchedIds = mirrorRequest.getDocuments().stream()
+                .map(doc -> doc.getField("id").getValue().toString())
+                .collect(Collectors.joining(", "));
+        log.warn("Mirroring skipped for request because batch size {} bytes exceeds limit {} bytes.  IDs: {}",
+                mirrorRequestBytes, maxMirroringBatchSizeBytes, batchedIds);
+        mirrorRequest = null;
+        mirrorRequestBytes = 0L;
+        return;
+      }
+
+      try {
+        requestMirroringHandler.mirror(mirrorRequest);
+        mirrorRequest = null; // so we don't accidentally submit it again
+      } catch (Exception e) {
+        log.error("mirror submit failed", e);
+        throw new SolrException(SERVER_ERROR, "mirror submit failed", e);
+      }
+    }
+  }
+
+  // package private for testing
+  static class ObjectSizeEstimator {
+    /**
+     * Sizes of primitive classes.
+     */
+    private static final Map<Class<?>,Integer> primitiveSizes = new IdentityHashMap<>();
+    static {
+      primitiveSizes.put(boolean.class, 1);
+      primitiveSizes.put(Boolean.class, 1);
+      primitiveSizes.put(byte.class, 1);
+      primitiveSizes.put(Byte.class, 1);
+      primitiveSizes.put(char.class, Character.BYTES);
+      primitiveSizes.put(Character.class, Character.BYTES);
+      primitiveSizes.put(short.class, Short.BYTES);
+      primitiveSizes.put(Short.class, Short.BYTES);
+      primitiveSizes.put(int.class, Integer.BYTES);
+      primitiveSizes.put(Integer.class, Integer.BYTES);
+      primitiveSizes.put(float.class, Float.BYTES);
+      primitiveSizes.put(Float.class, Float.BYTES);
+      primitiveSizes.put(double.class, Double.BYTES);
+      primitiveSizes.put(Double.class, Double.BYTES);
+      primitiveSizes.put(long.class, Long.BYTES);
+      primitiveSizes.put(Long.class, Long.BYTES);
+    }
+
+    public static long estimate(SolrInputDocument doc) {
+      if (doc == null) return 0L;
+      long size = 0;
+      for (SolrInputField inputField : doc.values()) {
+        size += primitiveEstimate(inputField.getName(), 0L);
+        size += estimate(inputField.getValue());
+      }
+
+      if (doc.hasChildDocuments()) {
+        for (SolrInputDocument childDoc : doc.getChildDocuments()) {
+          size += estimate(childDoc);
+        }
+      }
+      return size;
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    static long estimate(Object obj) {
+      if (obj instanceof SolrInputDocument) {
+        return estimate((SolrInputDocument) obj);
+      }
+
+      if (obj instanceof Map) {
+        return estimate((Map) obj);
+      }
+
+      if (obj instanceof Collection) {
+        return estimate((Collection) obj);
+      }
+
+      return primitiveEstimate(obj, 0L);
+    }
+
+    private static long primitiveEstimate(Object obj, long def) {
+      Class<?> clazz = obj.getClass();
+      if (clazz.isPrimitive()) {
+        return primitiveSizes.get(clazz);
+      }
+      if (obj instanceof String) {
+        return ((String) obj).length() * Character.BYTES;
+      }
+      return def;
+    }
+
+    private static long estimate(Map<Object, Object> map) {
+      if (map.isEmpty()) return 0;
+      long size = 0;
+      for (Map.Entry<Object, Object> entry : map.entrySet()) {
+        size += primitiveEstimate(entry.getKey(), 0L);
+        size += estimate(entry.getValue());
+      }
+      return size;
+    }
+
+    private static long estimate(@SuppressWarnings({"rawtypes"})Collection collection) {
+      if (collection.isEmpty()) return 0;
+      long size = 0;
+      for (Object obj : collection) {
+        size += estimate(obj);
+      }
+      return size;
+    }
+  }
+}
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
new file mode 100644
index 0000000..06f7dba
--- /dev/null
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.update.processor;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CloseHook;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.crossdc.common.ConfigProperty;
+import org.apache.solr.crossdc.common.CrossDcConf;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
+import org.apache.solr.crossdc.common.KafkaMirroringSink;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.util.plugin.SolrCoreAware;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.*;
+
+import static org.apache.solr.update.processor.DistributedUpdateProcessor.*;
+import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
+
+import static org.apache.solr.crossdc.common.KafkaCrossDcConf.*;
+
+/**
+ * An update processor that works with the {@link UpdateRequestProcessorFactory} to mirror update requests by
+ * submitting them to a sink that implements a queue producer.
+ *
+ * ADDs and DeleteByIDs are mirrored from leader shards and have internal _version_ fields stripped.
+ * node.
+ *
+ * A single init arg is required, <b>requestMirroringHandler</b>, which specifies the plugin class used for mirroring
+ * requests. This class must implement {@link RequestMirroringHandler}.
+ *
+ * It is recommended to use the {@link DocBasedVersionConstraintsProcessorFactory} upstream of this factory to ensure
+ * doc consistency between this cluster and the mirror(s).
+ */
+public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcessorFactory
+        implements SolrCoreAware, UpdateRequestProcessorFactory.RunAlways {
+
+    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+    public static final NoOpUpdateRequestProcessor NO_OP_UPDATE_REQUEST_PROCESSOR =
+        new NoOpUpdateRequestProcessor();
+
+    // Flag for mirroring requests
+    public static final String SERVER_SHOULD_MIRROR = "shouldMirror";
+
+    /** This is instantiated in inform(SolrCore) and then shared by all processor instances - visible for testing */
+    private volatile KafkaRequestMirroringHandler mirroringHandler;
+
+
+    private boolean enabled = true;
+    private boolean indexUnmirrorableDocs = false;
+    private KafkaCrossDcConf conf;
+
+    private final Map<String,Object> properties = new HashMap<>();
+
+    @Override
+    public void init(final NamedList args) {
+        super.init(args);
+
+        Boolean enabled = args.getBooleanArg("enabled");
+        if (enabled != null && !enabled) {
+            this.enabled = false;
+        }
+
+        final Boolean indexUnmirrorableDocsArg = args.getBooleanArg("indexUnmirrorableDocs");
+        if (indexUnmirrorableDocsArg != null && indexUnmirrorableDocsArg) {
+            this.indexUnmirrorableDocs = true;
+        }
+
+        for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
+            String val = args._getStr(configKey.getKey(), null);
+            if (val != null && !val.isBlank()) {
+                properties.put(configKey.getKey(), val);
+            }
+        }
+    }
+
+    private static class MyCloseHook extends CloseHook {
+        private final Closer closer;
+
+        public MyCloseHook(Closer closer) {
+            this.closer = closer;
+        }
+
+        @Override public void preClose(SolrCore core) {
+
+        }
+
+        @Override public void postClose(SolrCore core) {
+            closer.close();
+        }
+    }
+
+    private static class Closer {
+        private final KafkaMirroringSink sink;
+
+        public Closer(KafkaMirroringSink sink) {
+            this.sink = sink;
+        }
+
+        public final void close() {
+            try {
+                this.sink.close();
+            } catch (IOException e) {
+                log.error("Exception closing sink", e);
+            }
+        }
+
+    }
+
+    @Override
+    public void inform(SolrCore core) {
+        log.info("KafkaRequestMirroringHandler inform enabled={}", this.enabled);
+
+        if (!enabled) {
+            return;
+        }
+
+        log.info("Producer startup config properties before adding additional properties from Zookeeper={}", properties);
+
+        Properties zkProps = null;
+        try {
+            if (core.getCoreContainer().getZkController()
+                .getZkClient().exists(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
+                    CrossDcConf.CROSSDC_PROPERTIES), true)) {
+                byte[] data = core.getCoreContainer().getZkController().getZkClient().getData(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
+                    CrossDcConf.CROSSDC_PROPERTIES), null, null, true);
+
+                if (data == null) {
+                    log.error(CrossDcConf.CROSSDC_PROPERTIES + " file in Zookeeper has no data");
+                    throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, CrossDcConf.CROSSDC_PROPERTIES
+                        + " file in Zookeeper has no data");
+                }
+
+                zkProps = new Properties();
+                zkProps.load(new ByteArrayInputStream(data));
+
+                KafkaCrossDcConf.readZkProps(properties, zkProps);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.error("Interrupted looking for CrossDC configuration in Zookeeper", e);
+            throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
+        } catch (Exception e) {
+            log.error("Exception looking for CrossDC configuration in Zookeeper", e);
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception looking for CrossDC configuration in Zookeeper", e);
+        }
+
+        if (properties.get(BOOTSTRAP_SERVERS) == null) {
+            log.error(
+                "boostrapServers not specified for producer in CrossDC configuration props={} zkProps={}",
+                properties, zkProps);
+           throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "boostrapServers not specified for producer");
+       }
+        
+        if (properties.get(TOPIC_NAME) == null) {
+            log.error(
+                "topicName not specified for producer in CrossDC configuration props={} zkProps={}",
+                properties, zkProps);
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "topicName not specified for producer");
+        }
+
+        // load the request mirroring sink class and instantiate.
+       // mirroringHandler = core.getResourceLoader().newInstance(RequestMirroringHandler.class.getName(), KafkaRequestMirroringHandler.class);
+
+        conf = new KafkaCrossDcConf(properties);
+
+
+        KafkaMirroringSink sink = new KafkaMirroringSink(conf);
+
+        Closer closer = new Closer(sink);
+        core.addCloseHook(new MyCloseHook(closer));
+
+        mirroringHandler = new KafkaRequestMirroringHandler(sink);
+    }
+
+    private static Integer getIntegerPropValue(String name, Properties props) {
+        String value = props.getProperty(name);
+        if (value == null) {
+            return null;
+        }
+        return Integer.parseInt(value);
+    }
+
+    @Override
+    public UpdateRequestProcessor getInstance(final SolrQueryRequest req, final SolrQueryResponse rsp,
+                                                final UpdateRequestProcessor next) {
+
+        if (!enabled) {
+            return NO_OP_UPDATE_REQUEST_PROCESSOR;
+        }
+
+        // if the class fails to initialize
+        if (mirroringHandler == null) {
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "mirroringHandler is null");
+        }
+
+        // Check if mirroring is disabled in request params, defaults to true
+        boolean doMirroring = req.getParams().getBool(SERVER_SHOULD_MIRROR, true);
+        final long maxMirroringBatchSizeBytes = conf.getInt(MAX_REQUEST_SIZE_BYTES);
+
+        ModifiableSolrParams mirroredParams = null;
+        if (doMirroring) {
+            // Get the collection name for the core so we can be explicit in the mirrored request
+            CoreDescriptor coreDesc = req.getCore().getCoreDescriptor();
+            String collection = coreDesc.getCollectionName();
+            if (collection == null) {
+                throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not determine collection name for "
+                        + MirroringUpdateProcessor.class.getSimpleName() + ". Solr may not be running in cloud mode.");
+            }
+
+            mirroredParams = new ModifiableSolrParams(req.getParams());
+            mirroredParams.set("collection", collection);
+            // remove internal version parameter
+            mirroredParams.remove(CommonParams.VERSION_FIELD);
+            // remove fields added by distributed update proc
+            mirroredParams.remove(DISTRIB_UPDATE_PARAM);
+            mirroredParams.remove(DISTRIB_FROM_COLLECTION);
+            mirroredParams.remove(DISTRIB_INPLACE_PREVVERSION);
+            mirroredParams.remove(COMMIT_END_POINT);
+            mirroredParams.remove(DISTRIB_FROM_SHARD);
+            mirroredParams.remove(DISTRIB_FROM_PARENT);
+            mirroredParams.remove(DISTRIB_FROM);
+            // prevent circular mirroring
+            mirroredParams.set(SERVER_SHOULD_MIRROR, Boolean.FALSE.toString());
+        }
+        if (log.isTraceEnabled()) {
+            log.trace("Create MirroringUpdateProcessor with mirroredParams={}", mirroredParams);
+        }
+
+        return new MirroringUpdateProcessor(next, doMirroring, indexUnmirrorableDocs, maxMirroringBatchSizeBytes, mirroredParams,
+                DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)), doMirroring ? mirroringHandler : null);
+    }
+
+    private static class NoOpUpdateRequestProcessor extends UpdateRequestProcessor {
+        NoOpUpdateRequestProcessor() {
+            super(null);
+        }
+    }
+
+
+}
\ No newline at end of file
diff --git a/crossdc-consumer/settings.gradle b/crossdc-producer/src/main/java/org/apache/solr/update/processor/RequestMirroringHandler.java
similarity index 67%
rename from crossdc-consumer/settings.gradle
rename to crossdc-producer/src/main/java/org/apache/solr/update/processor/RequestMirroringHandler.java
index 8c7c712..16ca862 100644
--- a/crossdc-consumer/settings.gradle
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/RequestMirroringHandler.java
@@ -14,11 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.solr.update.processor;
 
-rootProject.name = 'crossdc-consumer'
+import org.apache.solr.client.solrj.request.UpdateRequest;
 
-description = 'Module for Apache Solr Cross DC Consumer'
-
-subprojects {
-    group "org.apache.solr.crossdc"
+/** Plugin classes must implement this interface to be usable as the handlers for request mirroring */
+public interface RequestMirroringHandler {
+    /** When called, should handle submitting the request to the replica clusters  */
+    void mirror(UpdateRequest request) throws Exception;
 }
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
new file mode 100644
index 0000000..cf25ebe
--- /dev/null
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
@@ -0,0 +1,197 @@
+package org.apache.solr.crossdc;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.lucene.util.QuickPatchThreadsFilter;
+import org.apache.solr.SolrIgnoredThreadsFilter;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
+import org.apache.solr.crossdc.consumer.Consumer;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+@ThreadLeakFilters(defaultFilters = true, filters = { SolrIgnoredThreadsFilter.class,
+    QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class })
+@Ignore
+@ThreadLeakLingering(linger = 5000) public class DeleteByQueryToIdTest extends
+    SolrTestCaseJ4 {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  static final String VERSION_FIELD = "_version_";
+
+  private static final int NUM_BROKERS = 1;
+  public static EmbeddedKafkaCluster kafkaCluster;
+
+  protected static volatile MiniSolrCloudCluster solrCluster1;
+  protected static volatile MiniSolrCloudCluster solrCluster2;
+
+  protected static volatile Consumer consumer = new Consumer();
+
+  private static String TOPIC = "topic1";
+
+  private static String COLLECTION = "collection1";
+
+  @BeforeClass
+  public static void beforeSolrAndKafkaIntegrationTest() throws Exception {
+
+    System.setProperty("solr.crossdc.dbq_rows", "1");
+
+    Properties config = new Properties();
+    config.put("unclean.leader.election.enable", "true");
+    config.put("enable.partition.eof", "false");
+
+    kafkaCluster = new EmbeddedKafkaCluster(NUM_BROKERS, config) {
+      public String bootstrapServers() {
+        return super.bootstrapServers().replaceAll("localhost", "127.0.0.1");
+      }
+    };
+    kafkaCluster.start();
+
+    kafkaCluster.createTopic(TOPIC, 1, 1);
+
+    // System.setProperty("topicName", null);
+    // System.setProperty("bootstrapServers", null);
+
+    Properties props = new Properties();
+
+    solrCluster1 = new SolrCloudTestCase.Builder(1, createTempDir()).addConfig("conf",
+        getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
+
+    props.setProperty("topicName", TOPIC);
+    props.setProperty("bootstrapServers", kafkaCluster.bootstrapServers());
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    props.store(baos, "");
+    byte[] data = baos.toByteArray();
+    solrCluster1.getSolrClient().getZkStateReader().getZkClient().makePath("/crossdc.properties", data, true);
+
+    CollectionAdminRequest.Create create =
+        CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
+    solrCluster1.getSolrClient().request(create);
+    solrCluster1.waitForActiveCollection(COLLECTION, 1, 1);
+
+    solrCluster1.getSolrClient().setDefaultCollection(COLLECTION);
+
+    solrCluster2 = new SolrCloudTestCase.Builder(1, createTempDir()).addConfig("conf",
+        getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
+
+    solrCluster2.getSolrClient().getZkStateReader().getZkClient().makePath("/crossdc.properties", data, true);
+
+    CollectionAdminRequest.Create create2 =
+        CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
+    solrCluster2.getSolrClient().request(create2);
+    solrCluster2.waitForActiveCollection(COLLECTION, 1, 1);
+
+    solrCluster2.getSolrClient().setDefaultCollection(COLLECTION);
+
+    String bootstrapServers = kafkaCluster.bootstrapServers();
+    log.info("bootstrapServers={}", bootstrapServers);
+
+    Map<String, Object> properties = new HashMap<>();
+    properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
+    properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING, solrCluster2.getZkServer().getZkAddress());
+    properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
+    properties.put(KafkaCrossDcConf.GROUP_ID, "group1");
+    consumer.start(properties);
+
+  }
+
+  @AfterClass
+  public static void afterSolrAndKafkaIntegrationTest() throws Exception {
+    ObjectReleaseTracker.clear();
+
+    consumer.shutdown();
+
+    try {
+      kafkaCluster.stop();
+    } catch (Exception e) {
+      log.error("Exception stopping Kafka cluster", e);
+    }
+
+    if (solrCluster1 != null) {
+      solrCluster1.getZkServer().getZkClient().printLayoutToStdOut();
+      solrCluster1.shutdown();
+    }
+    if (solrCluster2 != null) {
+      solrCluster2.getZkServer().getZkClient().printLayoutToStdOut();
+      solrCluster2.shutdown();
+    }
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    super.tearDown();
+    solrCluster1.getSolrClient().deleteByQuery("*:*");
+    solrCluster2.getSolrClient().deleteByQuery("*:*");
+    solrCluster1.getSolrClient().commit();
+    solrCluster2.getSolrClient().commit();
+  }
+
+  public void testDBQ() throws Exception {
+
+    CloudSolrClient client = solrCluster1.getSolrClient();
+    SolrInputDocument doc = new SolrInputDocument();
+    doc.addField("id", String.valueOf(System.nanoTime()));
+    doc.addField("text", "some test");
+    client.add(doc);
+
+    SolrInputDocument doc2 = new SolrInputDocument();
+    doc2.addField("id", String.valueOf(System.nanoTime()));
+    doc2.addField("text", "some test two");
+    client.add(doc2);
+
+    SolrInputDocument doc3= new SolrInputDocument();
+    doc3.addField("id", String.valueOf(System.nanoTime()));
+    doc3.addField("text", "two of a kind");
+    client.add(doc3);
+
+    SolrInputDocument doc4= new SolrInputDocument();
+    doc4.addField("id", String.valueOf(System.nanoTime()));
+    doc4.addField("text", "one two three");
+    client.add(doc4);
+
+    client.commit(COLLECTION);
+
+    client.deleteByQuery("two");
+
+    client.commit(COLLECTION);
+
+    QueryResponse results = null;
+    boolean foundUpdates = false;
+    for (int i = 0; i < 50; i++) {
+      solrCluster2.getSolrClient().commit(COLLECTION);
+      solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+      results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+      if (results.getResults().getNumFound() == 1) {
+        foundUpdates = true;
+      } else {
+        Thread.sleep(500);
+      }
+    }
+
+    assertTrue("results=" + results, foundUpdates);
+    System.out.println("Rest: " + results);
+
+  }
+
+}
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
new file mode 100644
index 0000000..8cbdae2
--- /dev/null
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
@@ -0,0 +1,228 @@
+package org.apache.solr.crossdc;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.lucene.util.QuickPatchThreadsFilter;
+import org.apache.solr.SolrIgnoredThreadsFilter;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.embedded.JettyConfig;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.ZkClientClusterStateProvider;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.cloud.ZkTestServer;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
+import org.apache.solr.crossdc.consumer.Consumer;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+@ThreadLeakFilters(defaultFilters = true, filters = { SolrIgnoredThreadsFilter.class,
+    QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class })
+@ThreadLeakLingering(linger = 5000) public class RetryQueueIntegrationTest extends
+    SolrTestCaseJ4 {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  static final String VERSION_FIELD = "_version_";
+
+  private static final int NUM_BROKERS = 1;
+  public static EmbeddedKafkaCluster kafkaCluster;
+
+  protected static volatile MiniSolrCloudCluster solrCluster1;
+  protected static volatile MiniSolrCloudCluster solrCluster2;
+
+  protected static volatile Consumer consumer = new Consumer();
+
+  private static String TOPIC = "topic1";
+
+  private static String COLLECTION = "collection1";
+  private static Path baseDir1;
+  private static Path baseDir2;
+  private static ZkTestServer zkTestServer1;
+  private static ZkTestServer zkTestServer2;
+
+  @BeforeClass
+  public static void beforeRetryQueueIntegrationTestTest() throws Exception {
+
+    Properties config = new Properties();
+    config.put("unclean.leader.election.enable", "true");
+    config.put("enable.partition.eof", "false");
+
+    kafkaCluster = new EmbeddedKafkaCluster(NUM_BROKERS, config) {
+      public String bootstrapServers() {
+        return super.bootstrapServers().replaceAll("localhost", "127.0.0.1");
+      }
+    };
+    kafkaCluster.start();
+
+    kafkaCluster.createTopic(TOPIC, 1, 1);
+
+    System.setProperty("topicName", TOPIC);
+    System.setProperty("bootstrapServers", kafkaCluster.bootstrapServers());
+
+    baseDir1 = createTempDir();
+    Path zkDir1 = baseDir1.resolve("zookeeper/server1/data");
+    zkTestServer1 = new ZkTestServer(zkDir1);
+    try {
+      zkTestServer1.run();
+    } catch (Exception e) {
+      log.error("Error starting Zk Test Server, trying again ...");
+      zkTestServer1.shutdown();
+      zkTestServer1 = new ZkTestServer(zkDir1);
+      zkTestServer1.run();
+    }
+
+    baseDir2 = createTempDir();
+    Path zkDir2 = baseDir2.resolve("zookeeper/server1/data");
+    zkTestServer2 = new ZkTestServer(zkDir2);
+    try {
+      zkTestServer2.run();
+    } catch (Exception e) {
+      log.error("Error starting Zk Test Server, trying again ...");
+      zkTestServer2.shutdown();
+      zkTestServer2 = new ZkTestServer(zkDir2);
+      zkTestServer2.run();
+    }
+
+    solrCluster1 = startCluster(solrCluster1, zkTestServer1, baseDir1);
+    solrCluster2 = startCluster(solrCluster2, zkTestServer2, baseDir2);
+
+    CloudSolrClient client = solrCluster1.getSolrClient();
+
+    String bootstrapServers = kafkaCluster.bootstrapServers();
+    log.info("bootstrapServers={}", bootstrapServers);
+
+    Map<String,Object> properties = new HashMap<>();
+    properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
+    properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING, solrCluster2.getZkServer().getZkAddress());
+    properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
+    properties.put(KafkaCrossDcConf.GROUP_ID, "group1");
+    consumer.start(properties);
+  }
+
+  private static MiniSolrCloudCluster startCluster(MiniSolrCloudCluster solrCluster, ZkTestServer zkTestServer, Path baseDir) throws Exception {
+    MiniSolrCloudCluster cluster =
+        new MiniSolrCloudCluster(1, baseDir, MiniSolrCloudCluster.DEFAULT_CLOUD_SOLR_XML,
+            JettyConfig.builder().setContext("/solr")
+                .withSSLConfig(sslConfig.buildServerSSLConfig()).build(), zkTestServer);
+
+        //new SolrCloudTestCase.Builder(1, baseDir).addConfig("conf",
+        //getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
+    CloudSolrClient client = cluster.getSolrClient();
+    ((ZkClientClusterStateProvider)client.getClusterStateProvider()).uploadConfig(getFile("src/test/resources/configs/cloud-minimal/conf").toPath(), "conf");
+
+    CollectionAdminRequest.Create create2 =
+        CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
+    cluster.getSolrClient().request(create2);
+    cluster.waitForActiveCollection(COLLECTION, 1, 1);
+    cluster.getSolrClient().setDefaultCollection(COLLECTION);
+    return cluster;
+  }
+
+  @AfterClass
+  public static void afterRetryQueueIntegrationTest() throws Exception {
+    ObjectReleaseTracker.clear();
+
+    consumer.shutdown();
+
+    try {
+      kafkaCluster.stop();
+    } catch (Exception e) {
+      log.error("Exception stopping Kafka cluster", e);
+    }
+
+    if (solrCluster1 != null) {
+      solrCluster1.shutdown();
+    }
+    if (solrCluster2 != null) {
+      solrCluster2.shutdown();
+    }
+
+    if (zkTestServer1 != null) {
+      zkTestServer1.shutdown();
+    }
+    if (zkTestServer2 != null) {
+      zkTestServer2.shutdown();
+    }
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    super.tearDown();
+    solrCluster1.getSolrClient().deleteByQuery("*:*");
+    solrCluster2.getSolrClient().deleteByQuery("*:*");
+    solrCluster1.getSolrClient().commit();
+    solrCluster2.getSolrClient().commit();
+  }
+
+  @Test
+  public void testRetryQueue() throws Exception {
+    Path zkDir = zkTestServer2.getZkDir();
+    int zkPort = zkTestServer2.getPort();
+    zkTestServer2.shutdown();
+
+    CloudSolrClient client = solrCluster1.getSolrClient();
+    SolrInputDocument doc = new SolrInputDocument();
+    doc.addField("id", String.valueOf(System.nanoTime()));
+    doc.addField("text", "some test");
+
+    client.add(doc);
+
+    SolrInputDocument doc2 = new SolrInputDocument();
+    doc2.addField("id", String.valueOf(System.nanoTime()));
+    doc2.addField("text", "some test");
+
+    client.add(doc2);
+
+    SolrInputDocument doc3 = new SolrInputDocument();
+    doc3.addField("id", String.valueOf(System.nanoTime()));
+    doc3.addField("text", "some test");
+
+    client.add(doc3);
+
+    client.commit(COLLECTION);
+
+    System.out.println("Sent producer record");
+
+    Thread.sleep(5000);
+
+    zkTestServer2 = new ZkTestServer(zkDir, zkPort);
+    zkTestServer2.run(false);
+
+    QueryResponse results = null;
+    boolean foundUpdates = false;
+    for (int i = 0; i < 200; i++) {
+      solrCluster2.getSolrClient().commit(COLLECTION);
+      solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+      results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+      if (results.getResults().getNumFound() == 3) {
+        foundUpdates = true;
+      } else {
+        Thread.sleep(100);
+      }
+    }
+
+    System.out.println("Closed producer");
+
+    assertTrue("results=" + results, foundUpdates);
+    System.out.println("Rest: " + results);
+
+  }
+}
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
new file mode 100644
index 0000000..7cba50a
--- /dev/null
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
@@ -0,0 +1,298 @@
+package org.apache.solr.crossdc;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakAction;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.lucene.util.QuickPatchThreadsFilter;
+import org.apache.solr.SolrIgnoredThreadsFilter;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.BaseHttpSolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
+import org.apache.solr.crossdc.consumer.Consumer;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.sys.Prop;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.solr.crossdc.common.KafkaCrossDcConf.DEFAULT_MAX_REQUEST_SIZE;
+import static org.mockito.Mockito.spy;
+
+@ThreadLeakFilters(defaultFilters = true, filters = { SolrIgnoredThreadsFilter.class,
+    QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class })
+@ThreadLeakLingering(linger = 5000) public class SolrAndKafkaIntegrationTest extends
+    SolrTestCaseJ4 {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static final int MAX_MIRROR_BATCH_SIZE_BYTES = Integer.valueOf(DEFAULT_MAX_REQUEST_SIZE);
+  private static final int MAX_DOC_SIZE_BYTES = MAX_MIRROR_BATCH_SIZE_BYTES;
+
+  static final String VERSION_FIELD = "_version_";
+
+  private static final int NUM_BROKERS = 1;
+  public static EmbeddedKafkaCluster kafkaCluster;
+
+  protected static volatile MiniSolrCloudCluster solrCluster1;
+  protected static volatile MiniSolrCloudCluster solrCluster2;
+
+  protected static volatile Consumer consumer = new Consumer();
+
+  private static String TOPIC = "topic1";
+
+  private static String COLLECTION = "collection1";
+  private static String ALT_COLLECTION = "collection2";
+
+  @BeforeClass
+  public static void beforeSolrAndKafkaIntegrationTest() throws Exception {
+
+    Properties config = new Properties();
+    config.put("unclean.leader.election.enable", "true");
+    config.put("enable.partition.eof", "false");
+
+    kafkaCluster = new EmbeddedKafkaCluster(NUM_BROKERS, config) {
+      public String bootstrapServers() {
+        return super.bootstrapServers().replaceAll("localhost", "127.0.0.1");
+      }
+    };
+    kafkaCluster.start();
+
+    kafkaCluster.createTopic(TOPIC, 1, 1);
+
+    System.setProperty("topicName", TOPIC);
+    System.setProperty("bootstrapServers", kafkaCluster.bootstrapServers());
+
+    solrCluster1 = new SolrCloudTestCase.Builder(1, createTempDir()).addConfig("conf",
+        getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
+
+    CollectionAdminRequest.Create create =
+        CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
+    solrCluster1.getSolrClient().request(create);
+    solrCluster1.waitForActiveCollection(COLLECTION, 1, 1);
+
+    solrCluster1.getSolrClient().setDefaultCollection(COLLECTION);
+
+    solrCluster2 = new SolrCloudTestCase.Builder(1, createTempDir()).addConfig("conf",
+        getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
+
+    CollectionAdminRequest.Create create2 =
+        CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
+    solrCluster2.getSolrClient().request(create2);
+    solrCluster2.waitForActiveCollection(COLLECTION, 1, 1);
+
+    solrCluster2.getSolrClient().setDefaultCollection(COLLECTION);
+
+    String bootstrapServers = kafkaCluster.bootstrapServers();
+    log.info("bootstrapServers={}", bootstrapServers);
+
+    Map<String, Object> properties = new HashMap<>();
+    properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
+    properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING, solrCluster2.getZkServer().getZkAddress());
+    properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
+    properties.put(KafkaCrossDcConf.GROUP_ID, "group1");
+    properties.put(KafkaCrossDcConf.MAX_REQUEST_SIZE_BYTES, MAX_DOC_SIZE_BYTES);
+    consumer.start(properties);
+
+  }
+
+  @AfterClass
+  public static void afterSolrAndKafkaIntegrationTest() throws Exception {
+    ObjectReleaseTracker.clear();
+
+    consumer.shutdown();
+
+    try {
+      kafkaCluster.stop();
+    } catch (Exception e) {
+      log.error("Exception stopping Kafka cluster", e);
+    }
+
+    if (solrCluster1 != null) {
+      solrCluster1.getZkServer().getZkClient().printLayoutToStdOut();
+      solrCluster1.shutdown();
+    }
+    if (solrCluster2 != null) {
+      solrCluster2.getZkServer().getZkClient().printLayoutToStdOut();
+      solrCluster2.shutdown();
+    }
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    super.tearDown();
+    solrCluster1.getSolrClient().deleteByQuery(COLLECTION, "*:*");
+    solrCluster2.getSolrClient().deleteByQuery(COLLECTION, "*:*");
+    solrCluster1.getSolrClient().commit(COLLECTION);
+    solrCluster2.getSolrClient().commit(COLLECTION);
+
+    // Delete alternate collection in case it was created by any tests.
+    if (CollectionAdminRequest.listCollections(solrCluster1.getSolrClient()).contains(ALT_COLLECTION)) {
+      solrCluster1.getSolrClient().request(CollectionAdminRequest.deleteCollection(ALT_COLLECTION));
+      solrCluster2.getSolrClient().request(CollectionAdminRequest.deleteCollection(ALT_COLLECTION));
+    }
+  }
+
+  public void testFullCloudToCloud() throws Exception {
+    CloudSolrClient client = solrCluster1.getSolrClient();
+    SolrInputDocument doc = new SolrInputDocument();
+    doc.addField("id", String.valueOf(System.currentTimeMillis()));
+    doc.addField("text", "some test");
+
+    client.add(doc);
+
+    client.commit(COLLECTION);
+
+    System.out.println("Sent producer record");
+
+    assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 1);
+  }
+
+  public void testProducerToCloud() throws Exception {
+    Properties properties = new Properties();
+    properties.put("bootstrap.servers", kafkaCluster.bootstrapServers());
+    properties.put("acks", "all");
+    properties.put("retries", 0);
+    properties.put("batch.size", 1);
+    properties.put("buffer.memory", 33554432);
+    properties.put("linger.ms", 1);
+    properties.put("key.serializer", StringSerializer.class.getName());
+    properties.put("value.serializer", MirroredSolrRequestSerializer.class.getName());
+    Producer<String, MirroredSolrRequest> producer = new KafkaProducer(properties);
+    UpdateRequest updateRequest = new UpdateRequest();
+    updateRequest.setParam("shouldMirror", "true");
+    updateRequest.add("id", String.valueOf(System.currentTimeMillis()), "text", "test");
+    updateRequest.add("id", String.valueOf(System.currentTimeMillis() + 22), "text", "test2");
+    updateRequest.setParam("collection", COLLECTION);
+    MirroredSolrRequest mirroredSolrRequest = new MirroredSolrRequest(updateRequest);
+    System.out.println("About to send producer record");
+    producer.send(new ProducerRecord(TOPIC, mirroredSolrRequest), (metadata, exception) -> {
+      log.info("Producer finished sending metadata={}, exception={}", metadata, exception);
+    });
+    producer.flush();
+
+    System.out.println("Sent producer record");
+
+    solrCluster2.getSolrClient().commit(COLLECTION);
+
+    assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 2);
+
+    producer.close();
+  }
+
+  @Test
+  public void testMirroringUpdateProcessor() throws Exception {
+    final SolrInputDocument tooLargeDoc = new SolrInputDocument();
+    tooLargeDoc.addField("id", "tooLarge-" + String.valueOf(System.currentTimeMillis()));
+    tooLargeDoc.addField("text", new String(new byte[2 * MAX_DOC_SIZE_BYTES]));
+    final SolrInputDocument normalDoc = new SolrInputDocument();
+    normalDoc.addField("id", "normalDoc-" + String.valueOf(System.currentTimeMillis()));
+    normalDoc.addField("text", "Hello world");
+    final List<SolrInputDocument> docsToIndex = new ArrayList<>();
+    docsToIndex.add(tooLargeDoc);
+    docsToIndex.add(normalDoc);
+
+    final CloudSolrClient cluster1Client = solrCluster1.getSolrClient();
+    cluster1Client.add(docsToIndex);
+    cluster1Client.commit(COLLECTION);
+
+    // Primary and secondary should each only index 'normalDoc'
+    final String normalDocQuery = "id:" + normalDoc.get("id").getFirstValue();
+    assertCluster2EventuallyHasDocs(COLLECTION, normalDocQuery, 1);
+    assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 1);
+    assertClusterEventuallyHasDocs(cluster1Client, COLLECTION, normalDocQuery, 1);
+    assertClusterEventuallyHasDocs(cluster1Client, COLLECTION, "*:*", 1);
+
+    // Create new primary+secondary collection where 'tooLarge' docs ARE indexed on the primary
+    CollectionAdminRequest.Create create =
+            CollectionAdminRequest.createCollection(ALT_COLLECTION, "conf", 1, 1)
+                    .withProperty("indexUnmirrorableDocs", "true");
+    solrCluster1.getSolrClient().request(create);
+    solrCluster2.getSolrClient().request(create);
+    solrCluster1.waitForActiveCollection(ALT_COLLECTION, 1, 1);
+    solrCluster2.waitForActiveCollection(ALT_COLLECTION, 1, 1);
+
+    cluster1Client.add(ALT_COLLECTION, docsToIndex);
+    cluster1Client.commit(ALT_COLLECTION);
+
+    // Primary should have both 'normal' and 'large' docs; secondary should only have 'normal' doc.
+    assertClusterEventuallyHasDocs(cluster1Client, ALT_COLLECTION, "*:*", 2);
+    assertCluster2EventuallyHasDocs(ALT_COLLECTION, normalDocQuery, 1);
+    assertCluster2EventuallyHasDocs(ALT_COLLECTION, "*:*", 1);
+
+    // Index batch of docs that will exceed the max mirroring batch size cumulatively (but not individually)
+    // Batch consists of 100 docs each roughly 1/100th of the max-batch-size
+    docsToIndex.clear();
+    for (int i = 0; i < 100; i++) {
+      final SolrInputDocument doc = new SolrInputDocument();
+      doc.addField("id", "cumulativelyTooLarge-" + System.currentTimeMillis() + "-" + i);
+      doc.addField("cumulativelyTooLarge_b", "true");
+      doc.addField("text", new String(new byte[MAX_MIRROR_BATCH_SIZE_BYTES / 100]));
+      docsToIndex.add(doc);
+    }
+    cluster1Client.add(ALT_COLLECTION, docsToIndex);
+    cluster1Client.commit(ALT_COLLECTION);
+
+    final String cumulativelyTooLargeQuery = "cumulativelyTooLarge_b:true";
+    // Primary (but not secondary) should have 100 additional docs
+    assertClusterEventuallyHasDocs(cluster1Client, ALT_COLLECTION, cumulativelyTooLargeQuery, 100);
+    assertCluster2EventuallyHasDocs(ALT_COLLECTION, cumulativelyTooLargeQuery, 0);
+  }
+
+  private void assertCluster2EventuallyHasDocs(String collection, String query, int expectedNumDocs) throws Exception {
+    assertClusterEventuallyHasDocs(solrCluster2.getSolrClient(), collection, query, expectedNumDocs);
+  }
+
+  private void createCollection(CloudSolrClient client, CollectionAdminRequest.Create createCmd) throws Exception {
+    final String stashedDefault = client.getDefaultCollection();
+    try {
+      //client.setDefaultCollection(null);
+      client.request(createCmd);
+    } finally {
+      //client.setDefaultCollection(stashedDefault);
+    }
+  }
+
+  private void assertClusterEventuallyHasDocs(SolrClient client, String collection, String query, int expectedNumDocs) throws Exception {
+    QueryResponse results = null;
+    boolean foundUpdates = false;
+    for (int i = 0; i < 100; i++) {
+      client.commit(collection);
+      results = client.query(collection, new SolrQuery(query));
+      if (results.getResults().getNumFound() == expectedNumDocs) {
+        foundUpdates = true;
+      } else {
+        Thread.sleep(100);
+      }
+    }
+
+    assertTrue("results=" + results, foundUpdates);
+  }
+}
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
new file mode 100644
index 0000000..e25ac83
--- /dev/null
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
@@ -0,0 +1,271 @@
+package org.apache.solr.crossdc;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.lucene.util.QuickPatchThreadsFilter;
+import org.apache.solr.SolrIgnoredThreadsFilter;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
+import org.apache.solr.crossdc.consumer.Consumer;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.*;
+
+@ThreadLeakFilters(defaultFilters = true, filters = { SolrIgnoredThreadsFilter.class,
+    QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class })
+@ThreadLeakLingering(linger = 5000) public class SolrAndKafkaReindexTest extends
+    SolrTestCaseJ4 {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  static final String VERSION_FIELD = "_version_";
+
+  private static final int NUM_BROKERS = 1;
+  public static EmbeddedKafkaCluster kafkaCluster;
+
+  protected static volatile MiniSolrCloudCluster solrCluster1;
+  protected static volatile MiniSolrCloudCluster solrCluster2;
+
+  protected static volatile Consumer consumer = new Consumer();
+
+  private static String TOPIC = "topic1";
+
+  private static String COLLECTION = "collection1";
+
+  @BeforeClass
+  public static void beforeSolrAndKafkaIntegrationTest() throws Exception {
+
+    Properties config = new Properties();
+    config.put("unclean.leader.election.enable", "true");
+    config.put("enable.partition.eof", "false");
+
+    kafkaCluster = new EmbeddedKafkaCluster(NUM_BROKERS, config) {
+      public String bootstrapServers() {
+        return super.bootstrapServers().replaceAll("localhost", "127.0.0.1");
+      }
+    };
+    kafkaCluster.start();
+
+    kafkaCluster.createTopic(TOPIC, 1, 1);
+
+    System.setProperty("topicName", TOPIC);
+    System.setProperty("bootstrapServers", kafkaCluster.bootstrapServers());
+
+    solrCluster1 = new SolrCloudTestCase.Builder(3, createTempDir()).addConfig("conf",
+        getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
+
+    CollectionAdminRequest.Create create =
+        CollectionAdminRequest.createCollection(COLLECTION, "conf", 3, 2).setMaxShardsPerNode(10);;
+    solrCluster1.getSolrClient().request(create);
+    solrCluster1.waitForActiveCollection(COLLECTION, 3, 6);
+
+    solrCluster1.getSolrClient().setDefaultCollection(COLLECTION);
+
+    solrCluster2 = new SolrCloudTestCase.Builder(3, createTempDir()).addConfig("conf",
+        getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
+
+    CollectionAdminRequest.Create create2 =
+        CollectionAdminRequest.createCollection(COLLECTION, "conf", 2, 3).setMaxShardsPerNode(10);
+    solrCluster2.getSolrClient().request(create2);
+    solrCluster2.waitForActiveCollection(COLLECTION, 2, 6);
+
+    solrCluster2.getSolrClient().setDefaultCollection(COLLECTION);
+
+    String bootstrapServers = kafkaCluster.bootstrapServers();
+    log.info("bootstrapServers={}", bootstrapServers);
+
+
+    Map<String, Object> properties = new HashMap<>();
+    properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
+    properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING, solrCluster2.getZkServer().getZkAddress());
+    properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
+    properties.put(KafkaCrossDcConf.GROUP_ID, "group1");
+    properties.put(KafkaCrossDcConf.MAX_POLL_RECORDS, 3);
+    consumer.start(properties);
+
+  }
+
+  @AfterClass
+  public static void afterSolrAndKafkaIntegrationTest() throws Exception {
+    ObjectReleaseTracker.clear();
+
+    consumer.shutdown();
+
+    try {
+      if (kafkaCluster != null) {
+        kafkaCluster.stop();
+      }
+    } catch (Exception e) {
+      log.error("Exception stopping Kafka cluster", e);
+    }
+
+    if (solrCluster1 != null) {
+      solrCluster1.getZkServer().getZkClient().printLayoutToStdOut();
+      solrCluster1.shutdown();
+    }
+    if (solrCluster2 != null) {
+      solrCluster2.getZkServer().getZkClient().printLayoutToStdOut();
+      solrCluster2.shutdown();
+    }
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    super.tearDown();
+    solrCluster1.getSolrClient().deleteByQuery("*:*");
+    solrCluster2.getSolrClient().deleteByQuery("*:*");
+    solrCluster1.getSolrClient().commit();
+    solrCluster2.getSolrClient().commit();
+  }
+
+  public void testFullCloudToCloud() throws Exception {
+    CloudSolrClient client = solrCluster1.getSolrClient();
+
+    addDocs(client, "first");
+
+    QueryResponse results = null;
+    boolean foundUpdates = false;
+    for (int i = 0; i < 1000; i++) {
+      solrCluster2.getSolrClient().commit(COLLECTION);
+      solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+      results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+      if (results.getResults().getNumFound() == 7) {
+        foundUpdates = true;
+      } else {
+        Thread.sleep(100);
+      }
+    }
+
+    assertTrue("results=" + results, foundUpdates);
+
+    QueryResponse results1 =solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("first"));
+    QueryResponse results2 = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("first"));
+
+    assertEquals("results=" + results1, 7, results1.getResults().getNumFound());
+    assertEquals("results=" + results2, 7, results2.getResults().getNumFound());
+
+    addDocs(client, "second");
+
+    foundUpdates = false;
+    for (int i = 0; i < 1000; i++) {
+      solrCluster2.getSolrClient().commit(COLLECTION);
+      solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+      results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+      if (results.getResults().getNumFound() == 7) {
+        foundUpdates = true;
+      } else {
+        Thread.sleep(100);
+      }
+    }
+
+    System.out.println("Closed producer");
+
+    assertTrue("results=" + results, foundUpdates);
+    System.out.println("Rest: " + results);
+
+    results1 =solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("second"));
+    results2 = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("second"));
+
+    assertEquals("results=" + results1, 7, results1.getResults().getNumFound());
+    assertEquals("results=" + results2, 7, results2.getResults().getNumFound());
+
+    addDocs(client, "third");
+
+    foundUpdates = false;
+    for (int i = 0; i < 1000; i++) {
+      solrCluster2.getSolrClient().commit(COLLECTION);
+      solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+      results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+      if (results.getResults().getNumFound() == 7) {
+        foundUpdates = true;
+      } else {
+        Thread.sleep(100);
+      }
+    }
+
+    System.out.println("Closed producer");
+
+    assertTrue("results=" + results, foundUpdates);
+    System.out.println("Rest: " + results);
+
+    results1 =solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("third"));
+    results2 = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("third"));
+
+    assertEquals("results=" + results1, 7, results1.getResults().getNumFound());
+    assertEquals("results=" + results2, 7, results2.getResults().getNumFound());
+
+
+
+  }
+
+  private void addDocs(CloudSolrClient client, String tag) throws SolrServerException, IOException {
+    String id1 = "1";
+    String id2 = "2";
+    String id3 = "3";
+    String id4 = "4";
+    String id5 = "5";
+    String id6 = "6";
+    String id7 = "7";
+
+    SolrInputDocument doc1 = new SolrInputDocument();
+    doc1.addField("id", id1);
+    doc1.addField("text", "some test one " + tag);
+
+    SolrInputDocument doc2 = new SolrInputDocument();
+    doc2.addField("id", id2);
+    doc2.addField("text", "some test two " + tag);
+
+    List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(2);
+    docs.add(doc1);
+    docs.add(doc2);
+
+    client.add(docs);
+
+    client.commit(COLLECTION);
+
+    SolrInputDocument doc3 = new SolrInputDocument();
+    doc3.addField("id", id3);
+    doc3.addField("text", "some test three " + tag);
+
+    SolrInputDocument doc4 = new SolrInputDocument();
+    doc4.addField("id", id4);
+    doc4.addField("text", "some test four " + tag);
+
+    SolrInputDocument doc5 = new SolrInputDocument();
+    doc5.addField("id", id5);
+    doc5.addField("text", "some test five " + tag);
+
+    SolrInputDocument doc6 = new SolrInputDocument();
+    doc6.addField("id", id6);
+    doc6.addField("text", "some test six " + tag);
+
+    SolrInputDocument doc7 = new SolrInputDocument();
+    doc7.addField("id", id7);
+    doc7.addField("text", "some test seven " + tag);
+
+    client.add(doc3);
+    client.add(doc4);
+    client.add(doc5);
+    client.add(doc6);
+    client.add(doc7);
+
+    client.commit(COLLECTION);
+  }
+
+}
diff --git a/crossdc-consumer/build.gradle b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java
similarity index 54%
copy from crossdc-consumer/build.gradle
copy to crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java
index 8c44542..747b06f 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java
@@ -14,17 +14,36 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-plugins {
-    id 'java'
-}
+package org.apache.solr.crossdc;
 
-description = 'Cross-DC Consumer package'
+import org.apache.lucene.search.TimeLimitingCollector.TimerThread;
 
-version '1.0-SNAPSHOT'
+import com.carrotsearch.randomizedtesting.ThreadFilter;
+
+
+/**
+ * This ignores those threads in Solr for which there is no way to
+ * clean up after a suite.
+ */
+public class SolrKafkaTestsIgnoredThreadsFilter implements ThreadFilter {
+  @Override
+  public boolean reject(Thread t) {
+
+    String threadName = t.getName();
+
+    if (threadName.startsWith("metrics-meter-tick-thread")) {
+      return true;
+    }
+
+    if (threadName.startsWith("pool-")) {
+      return true;
+    }
+
+    if (threadName.startsWith("kafka-")) { // TODO
+      return true;
+    }
 
-repositories {
-    mavenCentral()
-}
 
-dependencies {
+    return false;
+  }
 }
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
new file mode 100644
index 0000000..f6c8844
--- /dev/null
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
@@ -0,0 +1,228 @@
+package org.apache.solr.crossdc;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.lucene.util.QuickPatchThreadsFilter;
+import org.apache.solr.SolrIgnoredThreadsFilter;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
+import org.apache.solr.crossdc.consumer.Consumer;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+@ThreadLeakFilters(defaultFilters = true, filters = { SolrIgnoredThreadsFilter.class,
+    QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class })
+@ThreadLeakLingering(linger = 5000) public class ZkConfigIntegrationTest extends
+    SolrTestCaseJ4 {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  static final String VERSION_FIELD = "_version_";
+
+  private static final int NUM_BROKERS = 1;
+  public static EmbeddedKafkaCluster kafkaCluster;
+
+  protected static volatile MiniSolrCloudCluster solrCluster1;
+  protected static volatile MiniSolrCloudCluster solrCluster2;
+
+  protected static volatile Consumer consumer1 = new Consumer();
+  protected static volatile Consumer consumer2 = new Consumer();
+
+  private static String TOPIC1 = "topicSrc";
+  private static String TOPIC2 = "topicDst";
+
+  private static String COLLECTION = "collection1";
+
+  @BeforeClass
+  public static void beforeSolrAndKafkaIntegrationTest() throws Exception {
+
+    Properties config = new Properties();
+    config.put("unclean.leader.election.enable", "true");
+    config.put("enable.partition.eof", "false");
+
+    kafkaCluster = new EmbeddedKafkaCluster(NUM_BROKERS, config) {
+      public String bootstrapServers() {
+        return super.bootstrapServers().replaceAll("localhost", "127.0.0.1");
+      }
+    };
+    kafkaCluster.start();
+
+    kafkaCluster.createTopic(TOPIC1, 1, 1);
+    kafkaCluster.createTopic(TOPIC2, 1, 1);
+
+    // System.setProperty("topicName", null);
+    // System.setProperty("bootstrapServers", null);
+
+    Properties props = new Properties();
+
+    solrCluster1 = new SolrCloudTestCase.Builder(1, createTempDir()).addConfig("conf",
+        getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
+
+    props.setProperty(KafkaCrossDcConf.TOPIC_NAME, TOPIC2);
+    props.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, kafkaCluster.bootstrapServers());
+
+    System.setProperty(KafkaCrossDcConf.TOPIC_NAME, TOPIC2);
+    System.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, kafkaCluster.bootstrapServers());
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    props.store(baos, "");
+    byte[] data = baos.toByteArray();
+    solrCluster1.getSolrClient().getZkStateReader().getZkClient().makePath("/crossdc.properties", data, true);
+
+    CollectionAdminRequest.Create create =
+        CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
+    solrCluster1.getSolrClient().request(create);
+    solrCluster1.waitForActiveCollection(COLLECTION, 1, 1);
+
+    solrCluster1.getSolrClient().setDefaultCollection(COLLECTION);
+
+    solrCluster2 = new SolrCloudTestCase.Builder(1, createTempDir()).addConfig("conf",
+        getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
+
+
+    CollectionAdminRequest.Create create2 =
+        CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
+    solrCluster2.getSolrClient().request(create2);
+    solrCluster2.waitForActiveCollection(COLLECTION, 1, 1);
+
+    solrCluster2.getSolrClient().setDefaultCollection(COLLECTION);
+
+    props = new Properties();
+    props.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, kafkaCluster.bootstrapServers());
+
+
+    baos = new ByteArrayOutputStream();
+    props.store(baos, "");
+    data = baos.toByteArray();
+    solrCluster2.getSolrClient().getZkStateReader().getZkClient().makePath("/crossdc.properties", data, true);
+
+
+    String bootstrapServers = kafkaCluster.bootstrapServers();
+    log.info("bootstrapServers={}", bootstrapServers);
+
+    Map<String, Object> properties = new HashMap<>();
+    Object put = properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING, solrCluster2.getZkServer().getZkAddress());
+
+    System.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, kafkaCluster.bootstrapServers());
+
+    consumer1.start(properties);
+
+    System.setProperty(KafkaCrossDcConf.ZK_CONNECT_STRING, solrCluster1.getZkServer().getZkAddress());
+    System.setProperty(KafkaCrossDcConf.TOPIC_NAME, TOPIC2);
+    System.setProperty("port", "8383");
+    consumer2.start();
+  }
+
+  @AfterClass
+  public static void afterSolrAndKafkaIntegrationTest() throws Exception {
+    ObjectReleaseTracker.clear();
+
+    consumer1.shutdown();
+    consumer2.shutdown();
+
+    try {
+      kafkaCluster.stop();
+    } catch (Exception e) {
+      log.error("Exception stopping Kafka cluster", e);
+    }
+
+    if (solrCluster1 != null) {
+      solrCluster1.getZkServer().getZkClient().printLayoutToStdOut();
+      solrCluster1.shutdown();
+    }
+    if (solrCluster2 != null) {
+      solrCluster2.getZkServer().getZkClient().printLayoutToStdOut();
+      solrCluster2.shutdown();
+    }
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    super.tearDown();
+    solrCluster1.getSolrClient().deleteByQuery("*:*");
+    solrCluster2.getSolrClient().deleteByQuery("*:*");
+    solrCluster1.getSolrClient().commit();
+    solrCluster2.getSolrClient().commit();
+  }
+
+  public void testConfigFromZkPickedUp() throws Exception {
+    CloudSolrClient client = solrCluster1.getSolrClient();
+    SolrInputDocument doc = new SolrInputDocument();
+    doc.addField("id", String.valueOf(System.currentTimeMillis()));
+    doc.addField("text", "some test");
+
+    client.add(doc);
+
+    client.commit(COLLECTION);
+
+    System.out.println("Sent producer record");
+
+    QueryResponse results = null;
+    boolean foundUpdates = false;
+    for (int i = 0; i < 100; i++) {
+      solrCluster2.getSolrClient().commit(COLLECTION);
+      solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+      results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+      if (results.getResults().getNumFound() == 1) {
+        foundUpdates = true;
+      } else {
+        Thread.sleep(100);
+      }
+    }
+
+    System.out.println("Closed producer");
+
+    assertTrue("results=" + results, foundUpdates);
+    System.out.println("Rest: " + results);
+
+
+
+    client = solrCluster2.getSolrClient();
+    doc = new SolrInputDocument();
+    doc.addField("id", String.valueOf(System.currentTimeMillis()));
+    doc.addField("text", "some test2");
+
+    client.add(doc);
+
+    client.commit(COLLECTION);
+
+    System.out.println("Sent producer record");
+
+    results = null;
+    foundUpdates = false;
+    for (int i = 0; i < 100; i++) {
+      solrCluster1.getSolrClient().commit(COLLECTION);
+      solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+      results = solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+      if (results.getResults().getNumFound() == 1) {
+        foundUpdates = true;
+      } else {
+        Thread.sleep(100);
+      }
+    }
+
+    System.out.println("Closed producer");
+
+    assertTrue("results=" + results, foundUpdates);
+    System.out.println("Rest: " + results);
+  }
+
+}
diff --git a/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/schema.xml b/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/schema.xml
new file mode 100644
index 0000000..bc4676c
--- /dev/null
+++ b/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/schema.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<schema name="minimal" version="1.1">
+    <fieldType name="boolean" class="solr.BoolField" sortMissingLast="true"/>
+    <fieldType name="string" class="solr.StrField" docValues="true"/>
+    <fieldType name="int" class="org.apache.solr.schema.IntPointField" docValues="true" omitNorms="true"
+               positionIncrementGap="0"/>
+    <fieldType name="long" class="org.apache.solr.schema.LongPointField" docValues="true" omitNorms="true"
+               positionIncrementGap="0"/>
+    <fieldType name="float" class="org.apache.solr.schema.FloatPointField" docValues="true" omitNorms="true"
+               positionIncrementGap="0"/>
+    <fieldType name="double" class="org.apache.solr.schema.DoublePointField" docValues="true" omitNorms="true"
+               positionIncrementGap="0"/>
+    <fieldType name="date" class="org.apache.solr.schema.DatePointField" docValues="true" omitNorms="true"
+               positionIncrementGap="0"/>
+    <fieldType name="text" class="solr.TextField">
+        <analyzer>
+            <tokenizer class="solr.StandardTokenizerFactory"/>
+            <filter class="solr.LowerCaseFilterFactory"/>
+        </analyzer>
+    </fieldType>
+
+    <!-- for versioning -->
+    <field name="_version_" type="long" indexed="true" stored="true"/>
+    <field name="_root_" type="string" indexed="true" stored="true" multiValued="false" required="false"/>
+    <field name="id" type="string" indexed="true" stored="true"/>
+    <field name="text" type="text" indexed="true" stored="false"/>
+
+    <dynamicField name="*_b" type="boolean" indexed="true" stored="true"/>
+    <dynamicField name="*_s" type="string" indexed="true" stored="false"/>
+    <dynamicField name="*_t" type="text" indexed="true" stored="false"/>
+    <dynamicField name="*_i" type="int" indexed="false" stored="false"/>
+    <dynamicField name="*_l" type="long" indexed="false" stored="false"/>
+    <dynamicField name="*_f" type="float" indexed="false" stored="false"/>
+    <dynamicField name="*_d" type="double" indexed="false" stored="false"/>
+    <dynamicField name="*_dt" type="date" indexed="false" stored="false"/>
+
+    <uniqueKey>id</uniqueKey>
+</schema>
diff --git a/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml b/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml
new file mode 100644
index 0000000..e1791ea
--- /dev/null
+++ b/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml
@@ -0,0 +1,122 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- Minimal solrconfig.xml with /select, /admin and /update only -->
+
+<config>
+
+  <dataDir>${solr.data.dir:}</dataDir>
+
+  <directoryFactory name="DirectoryFactory"
+                    class="${directoryFactory:solr.NRTCachingDirectoryFactory}"/>
+  <schemaFactory class="ClassicIndexSchemaFactory"/>
+
+  <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+  <indexConfig>
+    <mergePolicyFactory class="${mergePolicyFactory:org.apache.solr.index.TieredMergePolicyFactory}">
+      <int name="maxMergeAtOnce">${maxMergeAtOnce:10}</int>
+      <int name="segmentsPerTier">${segmentsPerTier:10}</int>
+      <double name="noCFSRatio">${noCFSRatio:.1}</double>
+    </mergePolicyFactory>
+
+    <useCompoundFile>${useCompoundFile:true}</useCompoundFile>
+
+    <ramBufferSizeMB>${ramBufferSizeMB:160}</ramBufferSizeMB>
+    <maxBufferedDocs>${maxBufferedDocs:250000}</maxBufferedDocs>     <!-- Force the common case to flush by doc count  -->
+    <!-- <ramPerThreadHardLimitMB>60</ramPerThreadHardLimitMB> -->
+
+    <!-- <mergeScheduler class="org.apache.lucene.index.ConcurrentMergeScheduler">
+      <int name="maxThreadCount">6</int>
+      <int name="maxMergeCount">8</int>
+      <bool name="ioThrottle">false</bool>
+    </mergeScheduler> -->
+
+    <writeLockTimeout>1000</writeLockTimeout>
+    <commitLockTimeout>10000</commitLockTimeout>
+
+    <!-- this sys property is not set by SolrTestCaseJ4 because almost all tests should
+         use the single process lockType for speed - but tests that explicitly need
+         to vary the lockType canset it as needed.
+    -->
+    <lockType>${lockType:single}</lockType>
+
+    <infoStream>${infostream:false}</infoStream>
+
+  </indexConfig>
+
+  <updateHandler class="solr.DirectUpdateHandler2">
+    <commitWithin>
+      <softCommit>${commitwithin.softcommit:true}</softCommit>
+    </commitWithin>
+    <autoCommit>
+      <maxTime>${autoCommit.maxTime:60000}</maxTime>
+    </autoCommit>
+    <updateLog class="${ulog:solr.UpdateLog}" enable="${enable.update.log:true}"/>
+  </updateHandler>
+
+  <requestHandler name="/select" class="solr.SearchHandler">
+    <lst name="defaults">
+      <str name="echoParams">explicit</str>
+      <str name="indent">true</str>
+      <str name="df">text</str>
+    </lst>
+
+  </requestHandler>
+
+  <query>
+    <queryResultCache
+            enabled="${queryResultCache.enabled:false}"
+            class="${queryResultCache.class:solr.CaffeineCache}"
+            size="${queryResultCache.size:0}"
+            initialSize="${queryResultCache.initialSize:0}"
+            autowarmCount="${queryResultCache.autowarmCount:0}"/>
+      <documentCache
+              enabled="${documentCache.enabled:false}"
+              class="${documentCache.class:solr.CaffeineCache}"
+              size="${documentCache.size:0}"
+              initialSize="${documentCache.initialSize:0}"
+              autowarmCount="${documentCache.autowarmCount:0}"/>
+      <filterCache
+              enabled ="${filterCache.enabled:false}"
+              class="${filterCache.class:solr.CaffeineCache}"
+              size="${filterCache.size:1}"
+              initialSize="${filterCache.initialSize:1}"
+              autowarmCount="${filterCache.autowarmCount:0}"
+              async="${filterCache.async:false}"/>
+    <cache name="myPerSegmentCache"
+           enabled="${myPerSegmentCache.enabled:false}"
+           class="${myPerSegmentCache.class:solr.CaffeineCache}"
+           size="${myPerSegmentCache.size:0}"
+           initialSize="${myPerSegmentCache.initialSize:0}"
+           autowarmCount="${myPerSegmentCache.autowarmCount:0}"/>
+  </query>
+
+  <updateRequestProcessorChain  name="mirrorUpdateChain" default="true">
+    <processor class="org.apache.solr.update.processor.MirroringUpdateRequestProcessorFactory">
+      <bool name="enabled">${enabled:true}</bool>
+      <bool name="indexUnmirrorableDocs">${indexUnmirrorableDocs:false}</bool>
+      <str name="bootstrapServers">${bootstrapServers:}</str>
+      <str name="topicName">${topicName:}</str>
+    </processor>
+    <processor class="solr.LogUpdateProcessorFactory" />
+    <processor class="solr.RunUpdateProcessorFactory" />
+  </updateRequestProcessorChain>
+
+</config>
diff --git a/crossdc-producer/src/test/resources/log4j2.xml b/crossdc-producer/src/test/resources/log4j2.xml
new file mode 100644
index 0000000..98c24fc
--- /dev/null
+++ b/crossdc-producer/src/test/resources/log4j2.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<!-- We're configuring testing to be synchronous due to "logging polution", see SOLR-13268 -->
+<Configuration>
+  <Appenders>
+    <Console name="STDERR" target="SYSTEM_ERR">
+      <PatternLayout>
+        <Pattern>
+          %maxLen{%-4r %-5p (%t) [%notEmpty{n:%X{node_name}}%notEmpty{ c:%X{collection}}%notEmpty{ s:%X{shard}}%notEmpty{ r:%X{replica}}%notEmpty{ x:%X{core}}%notEmpty{ t:%X{trace_id}}] %c{1.} %m%notEmpty{
+          =>%ex{short}}}{10240}%n
+        </Pattern>
+      </PatternLayout>
+    </Console>
+
+    <RollingRandomAccessFile
+            name="MainLogFile"
+            fileName="${sys:log.dir:-build/logs}/${sys:log.name:-crossdc}.log"
+            filePattern="${sys:log.dir:-build/logs}/${sys:log.name:-crossdc}.log.%i">
+      <PatternLayout>
+        <Pattern>
+          %maxLen{%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p (%t) [%notEmpty{c:%X{collection}}%notEmpty{ s:%X{shard}}%notEmpty{ r:%X{replica}}%notEmpty{ x:%X{core}}] %c{1.}
+          %m%notEmpty{ =>%ex{short}}}{10240}%n
+        </Pattern>
+      </PatternLayout>
+      <Policies>
+        <OnStartupTriggeringPolicy/>
+        <SizeBasedTriggeringPolicy size="128 MB"/>
+      </Policies>
+      <DefaultRolloverStrategy max="30"/>
+    </RollingRandomAccessFile>
+  </Appenders>
+  <Loggers>
+
+
+    <Logger name="kafka.server.KafkaConfig" level="WARN"/>
+    <Logger name="org.apache.kafka.clients.producer.ProducerConfig" level="WARN"/>
+    <Logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="WARN"/>
+    <Logger name="org.apache.kafka.clients.admin.AdminClientConfig" level="WARN"/>
+
+
+    <Logger name="org.apache.zookeeper" level="WARN"/>
+    <Logger name="org.apache.hadoop" level="WARN"/>
+    <Logger name="org.apache.directory" level="WARN"/>
+    <Logger name="org.apache.solr.hadoop" level="INFO"/>
+    <Logger name="org.eclipse.jetty" level="INFO"/>
+
+    <Logger name="org.apache.solr.crossdc.consumer.KafkaCrossDcConsumer" level="INFO"/>
+    <Logger name="org.apache.solr.update.processor.MirroringUpdateProcessor" level="INFO"/>
+    <Logger name="org.apache.solr.update.processor.KafkaRequestMirroringHandler" level="INFO"/>
+    <Logger name="org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor" level="INFO"/>
+    <Logger name="org.apache.solr.crossdc.common.KafkaMirroringSink" level="INFO"/>
+
+
+    <Root level="INFO">
+      <AppenderRef ref="MainLogFile"/>
+      <AppenderRef ref="STDERR"/>
+    </Root>
+  </Loggers>
+</Configuration>
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 4d9ca16..aa991fc 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,5 +1,5 @@
 distributionBase=GRADLE_USER_HOME
 distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-6.7.1-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-7.4.2-bin.zip
 zipStoreBase=GRADLE_USER_HOME
 zipStorePath=wrapper/dists
diff --git a/gradlew b/gradlew
old mode 100755
new mode 100644
diff --git a/log4j2.xml b/log4j2.xml
new file mode 100644
index 0000000..c63e736
--- /dev/null
+++ b/log4j2.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<!-- We're configuring testing to be synchronous due to "logging polution", see SOLR-13268 -->
+<Configuration>
+  <Appenders>
+    <Console name="STDERR" target="SYSTEM_ERR">
+      <PatternLayout>
+        <Pattern>
+          %maxLen{%-4r %-5p (%t) [%notEmpty{n:%X{node_name}}%notEmpty{ c:%X{collection}}%notEmpty{ s:%X{shard}}%notEmpty{ r:%X{replica}}%notEmpty{ x:%X{core}}%notEmpty{ t:%X{trace_id}}] %c{1.} %m%notEmpty{
+          =>%ex{short}}}{10240}%n
+        </Pattern>
+      </PatternLayout>
+    </Console>
+
+    <RollingRandomAccessFile
+            name="MainLogFile"
+            fileName="${sys:log.dir:-logs}/${sys:log.name:-solr}.log"
+            filePattern="${sys:log.dir:-logs}/${sys:log.name:-solr}.log.%i">
+      <PatternLayout>
+        <Pattern>
+          %maxLen{%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p (%t) [%notEmpty{c:%X{collection}}%notEmpty{ s:%X{shard}}%notEmpty{ r:%X{replica}}%notEmpty{ x:%X{core}}] %c{1.}
+          %m%notEmpty{ =>%ex{short}}}{10240}%n
+        </Pattern>
+      </PatternLayout>
+      <Policies>
+        <OnStartupTriggeringPolicy/>
+        <SizeBasedTriggeringPolicy size="128 MB"/>
+      </Policies>
+      <DefaultRolloverStrategy max="10"/>
+    </RollingRandomAccessFile>
+  </Appenders>
+  <Loggers>
+
+
+    <Logger name="kafka.server.KafkaConfig" level="WARN"/>
+    <Logger name="org.apache.kafka.clients.producer.ProducerConfig" level="WARN"/>
+    <Logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="WARN"/>
+    <Logger name="org.apache.kafka.clients.admin.AdminClientConfig" level="WARN"/>
+
+
+    <Logger name="org.apache.zookeeper" level="WARN"/>
+    <Logger name="org.apache.hadoop" level="WARN"/>
+    <Logger name="org.apache.directory" level="WARN"/>
+    <Logger name="org.apache.solr.hadoop" level="INFO"/>
+    <Logger name="org.eclipse.jetty" level="INFO"/>
+
+    <Root level="INFO">
+      <AppenderRef ref="MainLogFile"/>
+      <AppenderRef ref="STDERR"/>
+    </Root>
+  </Loggers>
+</Configuration>
diff --git a/manual-test.sh b/manual-test.sh
new file mode 100644
index 0000000..29796c1
--- /dev/null
+++ b/manual-test.sh
@@ -0,0 +1,23 @@
+#!/bin/bash
+
+pid="$$"
+
+echo "pid=${pid}"
+
+base="${PWD}/cluster"
+
+kafkaBase="https://archive.apache.org/dist/kafka/2.8.1"
+solrBase="https://dlcdn.apache.org/lucene/solr/8.11.1"
+
+kafka="kafka_2.12-2.8.1"
+solr="solr-8.11.1"
+
+trap 'echo exittrap;cd ${base}/${kafka};bin/kafka-server-stop.sh config/server.properties;bin/zookeeper-server-stop.sh config/zookeeper.properties;cd ${base}/${solr};bin/solr stop -all;pkill -TERM -P ${pid}' EXIT
+
+bash cluster.sh
+
+echo "send update"
+curl -X POST -d '{"add":{"doc":{"id":"1","text":"datalicious"},"commitWithin":10}}' -H "Content-Type: application/json" http://127.0.0.1:8983/solr/collection1/update
+
+echo "stop cluster"
+bash cluster-stop.sh
\ No newline at end of file
diff --git a/settings.gradle b/settings.gradle
index 825c91f..924bbaf 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -7,4 +7,8 @@
  * in the user manual at https://docs.gradle.org/6.7.1/userguide/multi_project_builds.html
  */
 
-rootProject.name = 'lucene-solr-sandbox'
+rootProject.name = 'solr-sandbox'
+
+include 'crossdc-consumer'
+include 'crossdc-producer'
+include 'crossdc-commons'
diff --git a/version.props b/version.props
new file mode 100644
index 0000000..e69de29