You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ma...@apache.org on 2022/05/31 19:12:43 UTC

[solr-sandbox] branch crossdc-wip updated: Working out remaining issues via manual testing. (#15)

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/crossdc-wip by this push:
     new 05f0ad8  Working out remaining issues via manual testing. (#15)
05f0ad8 is described below

commit 05f0ad8a0b590ef559b6f54a16b22c3671d6668f
Author: Mark Robert Miller <ma...@apache.org>
AuthorDate: Tue May 31 14:12:38 2022 -0500

    Working out remaining issues via manual testing. (#15)
    
    * Tie up
    
    * Fix a variety of issues.
---
 cluster-stop.sh                                    |  23 +++
 cluster.sh                                         |  95 ++++++++++++
 crossdc-commons/build.gradle                       |   6 +-
 .../solr/crossdc/common}/KafkaMirroringSink.java   |  18 ++-
 .../common/MirroredSolrRequestSerializer.java      |  57 +++++--
 .../solr/crossdc/common}/MirroringException.java   |   2 +-
 .../solr/crossdc/common}/RequestMirroringSink.java |   4 +-
 .../crossdc/common}/ResubmitBackoffPolicy.java     |   2 +-
 crossdc-consumer/build.gradle                      |  32 ++--
 .../org/apache/solr/crossdc/consumer/Consumer.java |  86 +++++++----
 .../crossdc/messageprocessor/MessageProcessor.java |   3 +-
 .../messageprocessor/SolrMessageProcessor.java     |  46 +++---
 .../org/apache/solr/crossdc/IntegrationTest.java   |   6 +-
 .../solr/crossdc/SimpleSolrIntegrationTest.java    |   9 +-
 .../apache/solr/crossdc/TestMessageProcessor.java  |   1 +
 crossdc-consumer/src/test/resources/log4j2.xml     |  27 +++-
 crossdc-producer/build.gradle                      |  34 ++--
 .../processor/KafkaRequestMirroringHandler.java    |  21 +--
 .../MirroringUpdateRequestProcessorFactory.java    |  64 +++++++-
 .../solr/crossdc/SolrAndKafkaIntegrationTest.java  | 171 +++++++++++----------
 .../SolrKafkaTestsIgnoredThreadsFilter.java        |   0
 .../configs/cloud-minimal/conf/schema.xml          |  54 +++++++
 .../configs/cloud-minimal/conf/solrconfig.xml      | 120 +++++++++++++++
 .../src/test/resources/log4j2.xml                  |  27 +++-
 gradle/wrapper/gradle-wrapper.properties           |   2 +-
 .../src/test/resources/log4j2.xml => log4j2.xml    |  27 +++-
 26 files changed, 722 insertions(+), 215 deletions(-)

diff --git a/cluster-stop.sh b/cluster-stop.sh
new file mode 100644
index 0000000..91a6b21
--- /dev/null
+++ b/cluster-stop.sh
@@ -0,0 +1,23 @@
+#!/bin/bash
+
+kafkaBase="https://archive.apache.org/dist/kafka/2.8.1"
+solrBase="https://dlcdn.apache.org/lucene/solr/8.11.1"
+
+kafka="kafka_2.12-2.8.1"
+solr="solr-8.11.1"
+
+
+cd cluster
+
+(
+  cd "${kafka}" || exit
+
+  bin/zookeeper-server-stop.sh config/zookeeper.properties
+  bin/kafka-server-stop.sh config/server.properties
+)
+
+(
+  cd "${solr}" || exit
+
+  bin/solr stop -all
+)
\ No newline at end of file
diff --git a/cluster.sh b/cluster.sh
new file mode 100644
index 0000000..e7439df
--- /dev/null
+++ b/cluster.sh
@@ -0,0 +1,95 @@
+#!/bin/bash
+
+kafkaBase="https://archive.apache.org/dist/kafka/2.8.1"
+solrBase="https://dlcdn.apache.org/lucene/solr/8.11.1"
+
+kafka="kafka_2.12-2.8.1"
+solr="solr-8.11.1"
+
+if [ ! -d cluster ]
+then
+  mkdir cluster
+fi
+
+cd cluster || exit
+
+if [ ! -f ${kafka}.tgz ]
+then
+  wget "${kafkaBase}/${kafka}.tgz"
+fi
+
+if [ ! -d ${kafka} ]
+then
+  tar -xvzf ${kafka}.tgz
+fi
+
+if [ ! -f ${solr}.tgz ]
+then
+  wget "${solrBase}/${solr}.tgz"
+fi
+
+if [ ! -d ${solr} ]
+then
+  tar -xvzf ${solr}.tgz
+fi
+
+(
+  cd "${kafka}" || exit
+
+
+bin/zookeeper-server-start.sh config/zookeeper.properties > ../kafka_zk.log &
+
+bin/kafka-server-start.sh config/server.properties > ../kafka_server.log &
+
+# for kafka 2.x zk port of 2181, for 3.x broker of 9093
+
+# bin/kafka-topics.sh --create --topic my-kafka-topic --bootstrap-server localhost:9093 --partitions 3 --replication-factor 2
+
+# bin/kafka-topics.sh --list --bootstrap-server localhost:9093
+
+# bin/kafka-console-producer.sh --broker-list localhost:9093,localhost:9094,localhost:9095 --topic my-kafka-topic
+
+# bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-kafka-topic --from-beginning
+
+# bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-kafka-topic --from-beginning --group group2
+)
+
+# need to go to lib folder - I can't believe there is no shared lib folder by default - crazy
+mkdir "${solr}/server/solr/lib"
+cp ../crossdc-commons/build/libs/crossdc-commons-*.jar "${solr}"/server/solr/lib
+cp ../crossdc-producer/build/libs/crossdc-producer-*.jar "${solr}"/server/solr/lib
+cp ../crossdc-producer/build/libs/kafka-clients-*.jar "${solr}"/server/solr/lib
+
+(
+  cd "${solr}" || exit
+
+  echo -e "SOLR_OPTS=\"$SOLR_OPTS -Dsolr.sharedLib=lib -DbootstrapServers=127.0.0.1:2181 -DtopicName=crossdc\"" >>  bin/solr.in.sh
+
+  bin/solr start -cloud > ../solr.log
+
+  # for kafka 2.x ZK is on 2181, for Solr ZK is on 9983
+  # for the moment we upload the config set used in crossdc-producer tests
+
+  if [ ! -d "../../crossdc-producer/src/test/resources/configs/cloud-minimal/conf" ]
+  then
+    echo "Could not find configset folder to upload"
+    exit 1
+  fi
+  bin/solr zk upconfig -z 127.0.0.1:9983 -n crossdc -d ../../crossdc-producer/src/test/resources/configs/cloud-minimal/conf
+
+  bin/solr create -c collection1 -n crossdc
+
+  bin/solr status
+)
+
+cp ../crossdc-consumer/build/distributions/crossdc-consumer-*.tar .
+
+tar -xvf crossdc-consumer-*.tar
+rm crossdc-consumer-*.tar
+
+(
+  cd crossdc-consumer* || exit
+  CROSSDC_CONSUMER_OPTS="-Dlog4j2.configurationFile=../log4j2.xml -DbootstrapServers=127.0.0.1:2181 -DzkConnectString=127.0.0.1:2181 -DtopicName=crossdc" bin/crossdc-consumer > ../crossdc_consumer.log
+)
+
+curl -X POST -d '{"add":{"doc":{"id":"1","text":"datalicious"},"commitWithin":10}}' -H "Content-Type: application/json" http://127.0.0.1:8983/solr/collection1/update
\ No newline at end of file
diff --git a/crossdc-commons/build.gradle b/crossdc-commons/build.gradle
index b491b1b..7ac0fd1 100644
--- a/crossdc-commons/build.gradle
+++ b/crossdc-commons/build.gradle
@@ -28,9 +28,9 @@ repositories {
 }
 
 dependencies {
-    compile group: 'org.apache.solr', name: 'solr-solrj', version: '8.11.1'
-    compile 'org.apache.kafka:kafka-clients:2.8.0'
-    compile group: 'com.google.guava', name: 'guava', version: '14.0'
+    implementation 'org.apache.solr:solr-solrj:8.11.1'
+    implementation 'org.apache.kafka:kafka-clients:2.8.1'
+    implementation 'com.google.guava:guava:14.0'
 }
 
 subprojects {
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
similarity index 88%
rename from crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java
rename to crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
index d6dcd50..b062392 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
@@ -14,14 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.solr.crossdc;
+package org.apache.solr.crossdc.common;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.solr.crossdc.common.KafkaCrossDcConf;
-import org.apache.solr.crossdc.common.MirroredSolrRequest;
-import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -92,7 +89,14 @@ public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
         props.put("key.serializer", StringSerializer.class.getName());
         props.put("value.serializer", MirroredSolrRequestSerializer.class.getName());
 
-        Producer<String, MirroredSolrRequest> producer = new KafkaProducer(props);
+        ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader();
+        Thread.currentThread().setContextClassLoader(null);
+        Producer<String, MirroredSolrRequest> producer;
+        try {
+            producer = new KafkaProducer(props);
+        } finally {
+            Thread.currentThread().setContextClassLoader(originalContextClassLoader);
+        }
         return producer;
     }
 
@@ -103,6 +107,8 @@ public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
     }
 
     @Override public void close() throws IOException {
-        producer.close();
+        if (producer != null) {
+            producer.close();
+        }
     }
 }
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
index bb3104e..221f12f 100644
--- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
@@ -21,6 +21,10 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
 import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.params.MapSolrParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.MultiMapSolrParams;
+import org.apache.solr.common.util.JavaBinCodec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,6 +32,8 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class MirroredSolrRequestSerializer implements Serializer<MirroredSolrRequest>, Deserializer<MirroredSolrRequest> {
@@ -48,23 +54,39 @@ public class MirroredSolrRequestSerializer implements Serializer<MirroredSolrReq
 
     @Override
     public MirroredSolrRequest deserialize(String topic, byte[] data) {
-        UpdateRequest solrRequest;
+        Map solrRequest;
 
-        JavaBinUpdateRequestCodec codec = new JavaBinUpdateRequestCodec();
+        JavaBinCodec codec = new JavaBinCodec();
         ByteArrayInputStream bais = new ByteArrayInputStream(data);
 
         try {
-            solrRequest = codec.unmarshal(bais,
-                (document, req, commitWithin, override) -> {
+            solrRequest = (Map) codec.unmarshal(bais);
 
-                });
-        } catch (IOException e) {
+            log.info("Deserialized class={} solrRequest={}", solrRequest.getClass().getName(), solrRequest);
+
+
+        } catch (Exception e) {
+            log.error("Exception unmarshalling JavaBin", e);
             throw new RuntimeException(e);
         }
 
-        log.info("solrRequest={}, {}", solrRequest.getParams(), solrRequest.getDocuments());
+        UpdateRequest updateRequest = new UpdateRequest();
+        List docs = (List) solrRequest.get("docs");
+        if (docs != null) {
+            updateRequest.add(docs);
+        }
 
-        return new MirroredSolrRequest(solrRequest);
+        List deletes = (List) solrRequest.get("deletes");
+        if (deletes != null) {
+            updateRequest.deleteById(deletes);
+        }
+
+        Map params = (Map) solrRequest.get("params");
+        if (params != null) {
+            updateRequest.setParams(ModifiableSolrParams.of(new MapSolrParams(params)));
+        }
+
+        return new MirroredSolrRequest(updateRequest);
     }
 
     /**
@@ -77,12 +99,23 @@ public class MirroredSolrRequestSerializer implements Serializer<MirroredSolrReq
     @Override
     public byte[] serialize(String topic, MirroredSolrRequest request) {
         // TODO: add checks
-        SolrRequest solrRequest = request.getSolrRequest();
-        UpdateRequest updateRequest = (UpdateRequest)solrRequest;
-        JavaBinUpdateRequestCodec codec = new JavaBinUpdateRequestCodec();
+        UpdateRequest solrRequest = (UpdateRequest) request.getSolrRequest();
+
+        log.info("serialize request={} docs={}", solrRequest, solrRequest.getDocuments());
+
+        JavaBinCodec codec = new JavaBinCodec(null);
+
         ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream();
+        Map map = new HashMap();
+        map.put("params", solrRequest.getParams().getMap());
+        map.put("docs", solrRequest.getDocuments());
+
+        // TODO
+        //map.put("deletes", solrRequest.getDeleteByIdMap());
+        map.put("deletes", solrRequest.getDeleteById());
+
         try {
-            codec.marshal(updateRequest, baos);
+            codec.marshal(map, baos);
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/MirroringException.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroringException.java
similarity index 96%
rename from crossdc-consumer/src/main/java/org/apache/solr/crossdc/MirroringException.java
rename to crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroringException.java
index 2073750..ac633ec 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/MirroringException.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroringException.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.solr.crossdc;
+package org.apache.solr.crossdc.common;
 
 /**
  * Exception thrown during cross-dc mirroring
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/RequestMirroringSink.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/RequestMirroringSink.java
similarity index 93%
rename from crossdc-consumer/src/main/java/org/apache/solr/crossdc/RequestMirroringSink.java
rename to crossdc-commons/src/main/java/org/apache/solr/crossdc/common/RequestMirroringSink.java
index 5d09c16..e8b2c69 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/RequestMirroringSink.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/RequestMirroringSink.java
@@ -14,9 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.solr.crossdc;
-
-import org.apache.solr.crossdc.common.MirroredSolrRequest;
+package org.apache.solr.crossdc.common;
 
 public interface RequestMirroringSink {
 
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/ResubmitBackoffPolicy.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/ResubmitBackoffPolicy.java
similarity index 96%
rename from crossdc-consumer/src/main/java/org/apache/solr/crossdc/ResubmitBackoffPolicy.java
rename to crossdc-commons/src/main/java/org/apache/solr/crossdc/common/ResubmitBackoffPolicy.java
index e531cca..82babbf 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/ResubmitBackoffPolicy.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/ResubmitBackoffPolicy.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.solr.crossdc;
+package org.apache.solr.crossdc.common;
 
 import org.apache.solr.crossdc.common.MirroredSolrRequest;
 
diff --git a/crossdc-consumer/build.gradle b/crossdc-consumer/build.gradle
index e173dd3..61de292 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-consumer/build.gradle
@@ -32,34 +32,32 @@ application {
 }
 
 dependencies {
-    compile group: 'org.apache.solr', name: 'solr-solrj', version: '8.11.1'
-    compile project(':crossdc-commons')
-    implementation 'org.slf4j:slf4j-api'
-    compile 'org.eclipse.jetty:jetty-http:9.4.41.v20210516'
-    compile 'org.eclipse.jetty:jetty-server:9.4.41.v20210516'
-    compile 'org.eclipse.jetty:jetty-servlet:9.4.41.v20210516'
-    compile 'org.apache.kafka:kafka-clients:2.8.0'
-    compile group: 'com.google.guava', name: 'guava', version: '14.0'
+    implementation group: 'org.apache.solr', name: 'solr-solrj', version: '8.11.1'
+    implementation project(':crossdc-commons')
+
+    implementation 'org.slf4j:slf4j-api:1.7.36'
+    implementation 'org.eclipse.jetty:jetty-http:9.4.41.v20210516'
+    implementation 'org.eclipse.jetty:jetty-server:9.4.41.v20210516'
+    implementation 'org.eclipse.jetty:jetty-servlet:9.4.41.v20210516'
+    implementation 'org.apache.kafka:kafka-clients:2.8.1'
+    implementation group: 'com.google.guava', name: 'guava', version: '14.0'
+    implementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.17.2'
     runtimeOnly ('com.google.protobuf:protobuf-java-util:3.19.2')
     testImplementation 'org.hamcrest:hamcrest:2.2'
     testImplementation 'junit:junit:4.13.2'
     testImplementation('org.mockito:mockito-core:4.3.1', {
         exclude group: "net.bytebuddy", module: "byte-buddy-agent"
     })
+
+    testImplementation  project(':crossdc-producer')
+
     testImplementation group: 'org.apache.solr', name: 'solr-core', version: '8.11.1'
     testImplementation group: 'org.apache.solr', name: 'solr-test-framework', version: '8.11.1'
     testImplementation 'org.apache.kafka:kafka_2.13:2.8.1'
     testImplementation 'org.apache.kafka:kafka-streams:2.8.1'
-
     testImplementation 'org.apache.kafka:kafka_2.13:2.8.1:test'
-    testImplementation ('org.apache.kafka:kafka-clients:2.8.1:test') {
-        //exclude group: 'org.apache.kafka' // or finer grained, if we like
-    }
-    testImplementation ('org.apache.kafka:kafka-streams:2.8.1:test') {
-        //exclude group: 'org.apache.kafka' // or finer grained, if we like
-    }
-    // testImplementation 'org.apache.kafka:kafka-streams-test-utils:2.8.1'
-
+    testImplementation 'org.apache.kafka:kafka-clients:2.8.1:test'
+    testImplementation 'org.apache.kafka:kafka-streams:2.8.1:test'
 }
 
 subprojects {
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
index 62e0a84..5b3d01c 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -18,14 +18,14 @@ package org.apache.solr.crossdc.consumer;
 
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.crossdc.KafkaMirroringSink;
-import org.apache.solr.crossdc.MirroringException;
-import org.apache.solr.crossdc.ResubmitBackoffPolicy;
+import org.apache.solr.common.util.IOUtils;
+import org.apache.solr.crossdc.common.KafkaMirroringSink;
+import org.apache.solr.crossdc.common.MirroringException;
+import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
 import org.apache.solr.crossdc.common.IQueueHandler;
 import org.apache.solr.crossdc.common.KafkaCrossDcConf;
 import org.apache.solr.crossdc.common.MirroredSolrRequest;
@@ -62,6 +62,16 @@ public class Consumer {
     private String topicName;
 
     public void start(String bootstrapServers, String zkConnectString, String topicName, boolean enableDataEncryption, int port) {
+        if (bootstrapServers == null) {
+            throw new IllegalArgumentException("bootstrapServers config was not passed at startup");
+        }
+        if (bootstrapServers == null) {
+            throw new IllegalArgumentException("zkConnectString config was not passed at startup");
+        }
+        if (bootstrapServers == null) {
+            throw new IllegalArgumentException("topicName config was not passed at startup");
+        }
+
         this.topicName = topicName;
 
         server = new Server();
@@ -98,7 +108,9 @@ public class Consumer {
     }
 
     public void shutdown() {
-        crossDcConsumer.shutdown();
+        if (crossDcConsumer != null) {
+            crossDcConsumer.shutdown();
+        }
     }
 
     /**
@@ -124,6 +136,8 @@ public class Consumer {
         private final String topicName;
         SolrMessageProcessor messageProcessor;
 
+        CloudSolrClient solrClient;
+
         /**
          * @param conf The Kafka consumer configuration
          */
@@ -134,12 +148,9 @@ public class Consumer {
 
             kafkaConsumerProp.put("bootstrap.servers", conf.getBootStrapServers());
 
-            kafkaConsumerProp.put("key.deserializer", StringDeserializer.class.getName());
-            kafkaConsumerProp.put("value.deserializer", MirroredSolrRequestSerializer.class.getName());
-
-            kafkaConsumerProp.put("group.id", "group_1");
+            kafkaConsumerProp.put("group.id", "group_1"); // TODO
 
-            CloudSolrClient solrClient = new CloudSolrClient.Builder(Collections.singletonList(conf.getSolrZkConnectString()), Optional.empty()).build();
+            solrClient = new CloudSolrClient.Builder(Collections.singletonList(conf.getSolrZkConnectString()), Optional.empty()).build();
 
             messageProcessor = new SolrMessageProcessor(solrClient, new ResubmitBackoffPolicy() {
                 @Override public long getBackoffTimeMs(MirroredSolrRequest resubmitRequest) {
@@ -158,7 +169,7 @@ public class Consumer {
         }
 
         private KafkaConsumer<String, MirroredSolrRequest> createConsumer(Properties properties) {
-            KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
+            KafkaConsumer kafkaConsumer = new KafkaConsumer(properties, new StringDeserializer(), new MirroredSolrRequestSerializer());
             return kafkaConsumer;
         }
 
@@ -173,23 +184,29 @@ public class Consumer {
             log.info("About to start Kafka consumer thread...");
 
             log.info("Kafka consumer subscribing to topic topic={}", topicName);
-            consumer.subscribe(Collections.singleton(topicName));
 
-            while (pollAndProcessRequests()) {
-                //no-op within this loop: everything is done in pollAndProcessRequests method defined above.
-            }
-
-            log.info("Closed kafka consumer. Exiting now.");
             try {
-                consumer.close();
-            } catch (Exception e) {
-                log.warn("Failed to close kafka consumer", e);
-            }
 
-            try {
-                kafkaMirroringSink.close();
-            } catch (Exception e) {
-                log.warn("Failed to close kafka mirroring sink", e);
+                consumer.subscribe(Collections.singleton(topicName));
+
+                while (pollAndProcessRequests()) {
+                    //no-op within this loop: everything is done in pollAndProcessRequests method defined above.
+                }
+
+                log.info("Closed kafka consumer. Exiting now.");
+                try {
+                    consumer.close();
+                } catch (Exception e) {
+                    log.warn("Failed to close kafka consumer", e);
+                }
+
+                try {
+                    kafkaMirroringSink.close();
+                } catch (Exception e) {
+                    log.warn("Failed to close kafka mirroring sink", e);
+                }
+            } finally {
+                IOUtils.closeQuietly(solrClient);
             }
 
         }
@@ -240,6 +257,11 @@ public class Consumer {
                         return false;
                     } catch (Exception e) {
                         // If there is any exception returned by handleItem, then reset the offset.
+
+                        if (e instanceof ClassCastException || e instanceof ClassNotFoundException || e instanceof SerializationException) {
+                            log.error("Non retryable error", e);
+                            break;
+                        }
                         log.warn("Exception occurred in Kafka consumer thread, but we will continue.", e);
                         resetOffsetForPartition(partition, partitionRecords);
                         break;
@@ -249,6 +271,13 @@ public class Consumer {
                 log.info("Caught wakeup exception, shutting down KafkaSolrRequestConsumer");
                 return false;
             } catch (Exception e) {
+
+                e.printStackTrace();
+                if (e instanceof ClassCastException || e instanceof ClassNotFoundException || e instanceof SerializationException) {
+                    log.error("Non retryable error", e);
+                    return false;
+                }
+
                 log.error("Exception occurred in Kafka consumer thread, but we will continue.", e);
             }
             return true;
@@ -287,6 +316,11 @@ public class Consumer {
          */
         public void shutdown() {
             log.info("Shutdown called on KafkaCrossDcConsumer");
+            try {
+                solrClient.close();
+            } catch (Exception e) {
+                log.warn("Exception closing Solr client on shutdown");
+            }
             consumer.wakeup();
         }
 
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/MessageProcessor.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/MessageProcessor.java
index efbfef6..0620e69 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/MessageProcessor.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/MessageProcessor.java
@@ -16,8 +16,7 @@
  */
 package org.apache.solr.crossdc.messageprocessor;
 
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.crossdc.ResubmitBackoffPolicy;
+import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
 
 public abstract class MessageProcessor {
 
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
index 79ac32b..140d0b9 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
@@ -25,7 +25,7 @@ import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.SolrInputField;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.crossdc.ResubmitBackoffPolicy;
+import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
 import org.apache.solr.crossdc.common.CrossDcConstants;
 import org.apache.solr.crossdc.common.IQueueHandler;
 import org.apache.solr.crossdc.common.MirroredSolrRequest;
@@ -33,6 +33,7 @@ import org.apache.solr.crossdc.common.SolrExceptionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.invoke.MethodHandles;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -45,7 +46,7 @@ import java.util.concurrent.TimeUnit;
  *  3. Flagging requests for resubmission by the underlying consumer implementation.
  */
 public class SolrMessageProcessor extends MessageProcessor implements IQueueHandler<MirroredSolrRequest>  {
-    private static final Logger logger = LoggerFactory.getLogger(SolrMessageProcessor.class);
+    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
     final CloudSolrClient client;
 
     private static final String VERSION_FIELD = "_version_";
@@ -70,13 +71,14 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
     }
 
     private Result<MirroredSolrRequest> handleSolrRequest(MirroredSolrRequest mirroredSolrRequest) {
-        logger.debug("Handling Solr request");
+        log.debug("Handling Solr request");
         SolrRequest request = mirroredSolrRequest.getSolrRequest();
         final SolrParams requestParams = request.getParams();
 
         final String shouldMirror = requestParams.get("shouldMirror");
-        if (shouldMirror != null && !Boolean.parseBoolean(shouldMirror)) {
-            logger.warn("Skipping mirrored request because shouldMirror is set to false. request={}", request);
+        log.info("shouldMirror={}", shouldMirror);
+        if ("false".equalsIgnoreCase(shouldMirror)) {
+            log.warn("Skipping mirrored request because shouldMirror is set to false. request={}", requestParams);
             return new Result<>(ResultStatus.FAILED_NO_RETRY);
         }
         logFirstAttemptLatency(mirroredSolrRequest);
@@ -85,7 +87,7 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
         try {
             prepareIfUpdateRequest(request);
             logRequest(request);
-            logger.debug("About to submit Solr request {}", request);
+            log.debug("About to submit Solr request {}", request);
             result = processMirroredSolrRequest(request);
         } catch (Exception e) {
             result = handleException(mirroredSolrRequest, e);
@@ -118,7 +120,7 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
             // If backoff policy is not configured (returns "0" by default), then sleep 1 second. If configured, do as it says.
             sleepTimeMs = Math.max(1, Long.parseLong(backoffTimeSuggested));
         }
-        logger.info("Consumer backoff. sleepTimeMs={}", sleepTimeMs);
+        log.info("Consumer backoff. sleepTimeMs={}", sleepTimeMs);
         uncheckedSleep(sleepTimeMs);
     }
 
@@ -132,25 +134,25 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
             }
         }
         // Everything other than version conflict exceptions should be retried.
-        logger.warn("Unexpected exception, will resubmit the request to the queue", e);
+        log.warn("Unexpected exception, will resubmit the request to the queue", e);
         return true;
     }
 
     private void logIf4xxException(SolrException solrException) {
         // This shouldn't really happen but if it doesn, it most likely requires fixing in the return code from Solr.
         if (solrException != null && 400 <= solrException.code() && solrException.code() < 500) {
-            logger.error("Exception occurred with 4xx response. {}", solrException.code(), solrException);
+            log.error("Exception occurred with 4xx response. {}", solrException.code(), solrException);
         }
     }
 
     private void logFailure(MirroredSolrRequest mirroredSolrRequest, Exception e, SolrException solrException, boolean retryable) {
         // This shouldn't really happen.
         if (solrException != null && 400 <= solrException.code() && solrException.code() < 500) {
-            logger.error("Exception occurred with 4xx response. {}", solrException.code(), solrException);
+            log.error("Exception occurred with 4xx response. {}", solrException.code(), solrException);
             return;
         }
 
-        logger.warn("Resubmitting mirrored solr request after failure errorCode={} retryCount={}", solrException != null ? solrException.code() : -1, mirroredSolrRequest.getAttempt(), e);
+        log.warn("Resubmitting mirrored solr request after failure errorCode={} retryCount={}", solrException != null ? solrException.code() : -1, mirroredSolrRequest.getAttempt(), e);
     }
 
     /**
@@ -158,7 +160,9 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
      * Process the SolrRequest. If not, this method throws an exception.
      */
     private Result<MirroredSolrRequest> processMirroredSolrRequest(SolrRequest request) throws Exception {
+        log.info("Sending request to Solr at {} with params {}", client.getZkHost(), request.getParams());
         Result<MirroredSolrRequest> result;
+
         SolrResponseBase response = (SolrResponseBase) request.process(client);
 
         int status = response.getStatus();
@@ -183,7 +187,7 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
             if(((UpdateRequest) request).getDeleteQuery() != null) {
                 rmsg.append(" numDeleteByQuery=").append(((UpdateRequest) request).getDeleteQuery().size());
             }
-            logger.info(rmsg.toString());
+            log.info(rmsg.toString());
         }
     }
 
@@ -211,7 +215,7 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
      */
     private void sanitizeDocument(SolrInputDocument doc) {
         SolrInputField field = doc.getField(VERSION_FIELD);
-        logger.info("Removing {} value={}", VERSION_FIELD, field == null ? "null" : field.getValue());
+        log.info("Removing {} value={}", VERSION_FIELD, field == null ? "null" : field.getValue());
         doc.remove(VERSION_FIELD);
     }
 
@@ -230,7 +234,7 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
         // Only record the latency of the first attempt, essentially measuring the latency from submitting on the
         // primary side until the request is eligible to be consumed on the buddy side (or vice versa).
         if (mirroredSolrRequest.getAttempt() == 1) {
-            logger.debug("First attempt latency = {}",
+            log.debug("First attempt latency = {}",
                     System.currentTimeMillis() - TimeUnit.NANOSECONDS.toMillis(mirroredSolrRequest.getSubmitTimeNanos()));
         }
     }
@@ -247,23 +251,23 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
             ModifiableSolrParams params = updateRequest.getParams();
             String shouldMirror = (params == null ? null : params.get(CrossDcConstants.SHOULD_MIRROR));
             if (shouldMirror == null) {
-                logger.warn(CrossDcConstants.SHOULD_MIRROR + " param is missing - setting to false. Request={}", mirroredSolrRequest);
+                log.warn(CrossDcConstants.SHOULD_MIRROR + " param is missing - setting to false. Request={}", mirroredSolrRequest);
                 updateRequest.setParam(CrossDcConstants.SHOULD_MIRROR, "false");
             } else if (!"false".equalsIgnoreCase(shouldMirror)) {
-                logger.warn(CrossDcConstants.SHOULD_MIRROR + " param equal to " + shouldMirror);
+                log.warn(CrossDcConstants.SHOULD_MIRROR + " param equal to " + shouldMirror);
             }
         } else {
             SolrParams params = mirroredSolrRequest.getSolrRequest().getParams();
             String shouldMirror = (params == null ? null : params.get(CrossDcConstants.SHOULD_MIRROR));
             if (shouldMirror == null) {
                 if (params instanceof ModifiableSolrParams) {
-                    logger.warn("{} {}", CrossDcConstants.SHOULD_MIRROR, "param is missing - setting to false");
+                    log.warn("{} {}", CrossDcConstants.SHOULD_MIRROR, "param is missing - setting to false");
                     ((ModifiableSolrParams) params).set(CrossDcConstants.SHOULD_MIRROR, "false");
                 } else {
-                    logger.warn("{} {}", CrossDcConstants.SHOULD_MIRROR, "param is missing and params are not modifiable");
+                    log.warn("{} {}", CrossDcConstants.SHOULD_MIRROR, "param is missing and params are not modifiable");
                 }
             } else if (!"false".equalsIgnoreCase(shouldMirror)) {
-                logger.warn("{} {}", CrossDcConstants.SHOULD_MIRROR, "param is present and set to " + shouldMirror);
+                log.warn("{} {}", CrossDcConstants.SHOULD_MIRROR, "param is present and set to " + shouldMirror);
             }
         }
     }
@@ -276,7 +280,7 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
                 client.connect(); // volatile null-check if already connected
                 connected = true;
             } catch (Exception e) {
-                logger.error("Unable to connect to solr server. Not consuming.", e);
+                log.error("Unable to connect to solr server. Not consuming.", e);
                 uncheckedSleep(5000);
             }
         }
@@ -300,7 +304,7 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
                 } catch (final InterruptedException ex) {
                     // we're about to exit the method anyway, so just log this and return the item. Let the caller
                     // handle it.
-                    logger.warn("Thread interrupted while backing off before retry");
+                    log.warn("Thread interrupted while backing off before retry");
                     Thread.currentThread().interrupt();
                 }
             }
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/IntegrationTest.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/IntegrationTest.java
index 695ebab..1648f80 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/IntegrationTest.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/IntegrationTest.java
@@ -1,20 +1,16 @@
 package org.apache.solr.crossdc;
 
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.cloud.MiniSolrCloudCluster;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
 import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
 import org.junit.AfterClass;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.mockito.Mockito;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 
 import static org.mockito.Mockito.spy;
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
index dec570c..46188a8 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
@@ -1,4 +1,3 @@
-    }
 package org.apache.solr.crossdc;
 
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
@@ -19,15 +18,11 @@ import static org.mockito.Mockito.spy;
 public class SimpleSolrIntegrationTest extends SolrCloudTestCase {
   static final String VERSION_FIELD = "_version_";
 
-  private static final int NUM_BROKERS = 1;
 
   protected static volatile MiniSolrCloudCluster cluster1;
 
   private static SolrMessageProcessor processor;
 
-  private static ResubmitBackoffPolicy backoffPolicy = spy(new TestMessageProcessor.NoOpResubmitBackoffPolicy());
-  private static CloudSolrClient cloudClient1;
-
   @BeforeClass
   public static void setupIntegrationTest() throws Exception {
 
@@ -37,9 +32,9 @@ public class SimpleSolrIntegrationTest extends SolrCloudTestCase {
             .configure();
 
     String collection = "collection1";
-    cloudClient1 = cluster1.getSolrClient();
+    CloudSolrClient cloudClient1 = cluster1.getSolrClient();
 
-    processor = new SolrMessageProcessor(cloudClient1, backoffPolicy);
+    processor = new SolrMessageProcessor(cloudClient1, null);
 
     CollectionAdminRequest.Create create =
         CollectionAdminRequest.createCollection(collection, "conf", 1, 1);
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java
index 6158fc4..4e68f4f 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java
@@ -24,6 +24,7 @@ import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.crossdc.common.IQueueHandler;
 import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
 import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
 import org.junit.Before;
 import org.junit.Test;
diff --git a/crossdc-consumer/src/test/resources/log4j2.xml b/crossdc-consumer/src/test/resources/log4j2.xml
index 96f69f1..c63e736 100644
--- a/crossdc-consumer/src/test/resources/log4j2.xml
+++ b/crossdc-consumer/src/test/resources/log4j2.xml
@@ -26,9 +26,33 @@
         </Pattern>
       </PatternLayout>
     </Console>
+
+    <RollingRandomAccessFile
+            name="MainLogFile"
+            fileName="${sys:log.dir:-logs}/${sys:log.name:-solr}.log"
+            filePattern="${sys:log.dir:-logs}/${sys:log.name:-solr}.log.%i">
+      <PatternLayout>
+        <Pattern>
+          %maxLen{%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p (%t) [%notEmpty{c:%X{collection}}%notEmpty{ s:%X{shard}}%notEmpty{ r:%X{replica}}%notEmpty{ x:%X{core}}] %c{1.}
+          %m%notEmpty{ =>%ex{short}}}{10240}%n
+        </Pattern>
+      </PatternLayout>
+      <Policies>
+        <OnStartupTriggeringPolicy/>
+        <SizeBasedTriggeringPolicy size="128 MB"/>
+      </Policies>
+      <DefaultRolloverStrategy max="10"/>
+    </RollingRandomAccessFile>
   </Appenders>
   <Loggers>
-    <!-- Use <AsyncLogger/<AsyncRoot and <Logger/<Root for asynchronous logging or synchonous logging respectively -->
+
+
+    <Logger name="kafka.server.KafkaConfig" level="WARN"/>
+    <Logger name="org.apache.kafka.clients.producer.ProducerConfig" level="WARN"/>
+    <Logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="WARN"/>
+    <Logger name="org.apache.kafka.clients.admin.AdminClientConfig" level="WARN"/>
+
+
     <Logger name="org.apache.zookeeper" level="WARN"/>
     <Logger name="org.apache.hadoop" level="WARN"/>
     <Logger name="org.apache.directory" level="WARN"/>
@@ -36,6 +60,7 @@
     <Logger name="org.eclipse.jetty" level="INFO"/>
 
     <Root level="INFO">
+      <AppenderRef ref="MainLogFile"/>
       <AppenderRef ref="STDERR"/>
     </Root>
   </Loggers>
diff --git a/crossdc-producer/build.gradle b/crossdc-producer/build.gradle
index f361cac..728706b 100644
--- a/crossdc-producer/build.gradle
+++ b/crossdc-producer/build.gradle
@@ -28,16 +28,16 @@ repositories {
 }
 
 dependencies {
-    compile project(':crossdc-consumer')
-    compile project(':crossdc-commons')
-    compile group: 'org.apache.solr', name: 'solr-solrj', version: '8.11.1'
-    compile group: 'org.apache.solr', name: 'solr-core', version: '8.11.1'
+    implementation project(':crossdc-consumer')
+    implementation project(':crossdc-commons')
+    implementation group: 'org.apache.solr', name: 'solr-solrj', version: '8.11.1'
+    implementation group: 'org.apache.solr', name: 'solr-core', version: '8.11.1'
     implementation 'org.slf4j:slf4j-api'
-    compile 'org.eclipse.jetty:jetty-http:9.4.41.v20210516'
-    compile 'org.eclipse.jetty:jetty-server:9.4.41.v20210516'
-    compile 'org.eclipse.jetty:jetty-servlet:9.4.41.v20210516'
-    compile 'org.apache.kafka:kafka-clients:2.8.0'
-    compile group: 'com.google.guava', name: 'guava', version: '14.0'
+    implementation 'org.eclipse.jetty:jetty-http:9.4.41.v20210516'
+    implementation 'org.eclipse.jetty:jetty-server:9.4.41.v20210516'
+    implementation 'org.eclipse.jetty:jetty-servlet:9.4.41.v20210516'
+    implementation 'org.apache.kafka:kafka-clients:2.8.0'
+    implementation group: 'com.google.guava', name: 'guava', version: '14.0'
     runtimeOnly ('com.google.protobuf:protobuf-java-util:3.19.2')
     testImplementation 'org.hamcrest:hamcrest:2.2'
     testImplementation 'junit:junit:4.13.2'
@@ -50,13 +50,8 @@ dependencies {
     testImplementation 'org.apache.kafka:kafka-streams:2.8.1'
 
     testImplementation 'org.apache.kafka:kafka_2.13:2.8.1:test'
-    testImplementation ('org.apache.kafka:kafka-clients:2.8.1:test') {
-        //exclude group: 'org.apache.kafka' // or finer grained, if we like
-    }
-    testImplementation ('org.apache.kafka:kafka-streams:2.8.1:test') {
-        //exclude group: 'org.apache.kafka' // or finer grained, if we like
-    }
-    // testImplementation 'org.apache.kafka:kafka-streams-test-utils:2.8.1'
+    testImplementation 'org.apache.kafka:kafka-clients:2.8.1:test'
+    testImplementation 'org.apache.kafka:kafka-streams:2.8.1:test'
 
 }
 
@@ -64,6 +59,13 @@ subprojects {
     group "org.apache.solr"
 }
 
+task copyToLib(type: Copy) {
+    into "$buildDir/libs"
+    from configurations.runtimeClasspath
+}
+
+jar.dependsOn(copyToLib)
+
 test {
     jvmArgs '-Djava.security.egd=file:/dev/./urandom'
 }
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
index 5c4c23d..bcd620f 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
@@ -17,23 +17,25 @@
 package org.apache.solr.update.processor;
 
 import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.crossdc.KafkaMirroringSink;
-import org.apache.solr.crossdc.MirroringException;
+import org.apache.solr.crossdc.common.KafkaMirroringSink;
+import org.apache.solr.crossdc.common.MirroringException;
 import org.apache.solr.crossdc.common.KafkaCrossDcConf;
 import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.lang.invoke.MethodHandles;
 import java.util.concurrent.TimeUnit;
 
 public class KafkaRequestMirroringHandler implements RequestMirroringHandler {
-    final KafkaCrossDcConf conf;
+
+    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
     final KafkaMirroringSink sink;
 
-    public KafkaRequestMirroringHandler() {
-        // TODO: Setup Kafka properly
-        final String topicName = System.getProperty("topicname");
-        final String boostrapServers = System.getProperty("bootstrapservers");
-        conf = new KafkaCrossDcConf(boostrapServers, topicName, false, null);
-        sink = new KafkaMirroringSink(conf);
+    public KafkaRequestMirroringHandler(KafkaMirroringSink sink) {
+        log.info("create KafkaRequestMirroringHandler");
+        this.sink = sink;
     }
 
     /**
@@ -43,6 +45,7 @@ public class KafkaRequestMirroringHandler implements RequestMirroringHandler {
      */
     @Override
     public void mirror(UpdateRequest request) throws MirroringException {
+        log.info("submit update to sink {}", request.getDocuments());
             // TODO: Enforce external version constraint for consistent update replication (cross-cluster)
             sink.submit(new MirroredSolrRequest(1, request, TimeUnit.MILLISECONDS.toNanos(
                     System.currentTimeMillis())));
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
index 4097c20..cabdda8 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -23,11 +23,15 @@ import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CloseHook;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.SolrCore;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
+import org.apache.solr.crossdc.common.KafkaMirroringSink;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
 import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.CommitUpdateCommand;
 import org.apache.solr.update.DeleteUpdateCommand;
 import org.apache.solr.update.RollbackUpdateCommand;
 import org.apache.solr.util.plugin.SolrCoreAware;
@@ -35,6 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.lang.invoke.MethodHandles;
 
 import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
 import static org.apache.solr.update.processor.DistributedUpdateProcessor.*;
@@ -55,23 +60,69 @@ import static org.apache.solr.update.processor.DistributingUpdateProcessorFactor
  */
 public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcessorFactory
         implements SolrCoreAware, UpdateRequestProcessorFactory.RunAlways {
-    private static final Logger log = LoggerFactory.getLogger(MirroringUpdateRequestProcessorFactory.class);
+
+    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
     // Flag for mirroring requests
     public static String SERVER_SHOULD_MIRROR = "shouldMirror";
 
     /** This is instantiated in inform(SolrCore) and then shared by all processor instances - visible for testing */
-    volatile RequestMirroringHandler mirroringHandler;
+    volatile KafkaRequestMirroringHandler mirroringHandler;
 
     @Override
     public void init(final NamedList args) {
+
         super.init(args);
     }
 
+    private class Closer {
+        private final KafkaMirroringSink sink;
+
+        public Closer(KafkaMirroringSink sink) {
+            this.sink = sink;
+        }
+
+        public void close() {
+            try {
+                this.sink.close();
+            } catch (IOException e) {
+                log.error("Exception closing sink", sink);
+            }
+        }
+
+    }
+
     @Override
     public void inform(SolrCore core) {
+        log.info("KafkaRequestMirroringHandler inform");
         // load the request mirroring sink class and instantiate.
-        mirroringHandler = core.getResourceLoader().newInstance(RequestMirroringHandler.class.getName(), KafkaRequestMirroringHandler.class);
+       // mirroringHandler = core.getResourceLoader().newInstance(RequestMirroringHandler.class.getName(), KafkaRequestMirroringHandler.class);
+
+
+        // TODO: Setup Kafka properly
+        final String topicName = System.getProperty("topicName");
+        if (topicName == null) {
+            throw new IllegalArgumentException("topicName not specified for producer");
+        }
+        final String boostrapServers = System.getProperty("bootstrapServers");
+        if (boostrapServers == null) {
+            throw new IllegalArgumentException("boostrapServers not specified for producer");
+        }
+        KafkaCrossDcConf conf = new KafkaCrossDcConf(boostrapServers, topicName, false, null);
+        KafkaMirroringSink sink = new KafkaMirroringSink(conf);
+
+        Closer closer = new Closer(sink);
+        core.addCloseHook(new CloseHook() {
+            @Override public void preClose(SolrCore core) {
+
+            }
+
+            @Override public void postClose(SolrCore core) {
+                closer.close();
+            }
+        });
+
+        mirroringHandler = new KafkaRequestMirroringHandler(sink);
     }
 
     @Override
@@ -110,7 +161,7 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
             // prevent circular mirroring
             mirroredParams.set(SERVER_SHOULD_MIRROR, Boolean.FALSE.toString());
         }
-
+        log.info("Create MirroringUpdateProcessor");
         return new MirroringUpdateProcessor(next, doMirroring, mirroredParams,
                 DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)), doMirroring ? mirroringHandler : null);
     }
@@ -205,6 +256,11 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
             // TODO: We can't/shouldn't support this ?
         }
 
+        public void processCommit(CommitUpdateCommand cmd) throws IOException {
+            log.info("process commit cmd={}", cmd);
+            if (next != null) next.processCommit(cmd);
+        }
+
         @Override
         public void finish() throws IOException {
             super.finish();
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java b/crossdc-producer/src/test/Java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
similarity index 53%
rename from crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
rename to crossdc-producer/src/test/Java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
index 147f2be..7df4438 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
+++ b/crossdc-producer/src/test/Java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
@@ -1,8 +1,8 @@
 package org.apache.solr.crossdc;
 
-<<<<<<< refs/remotes/markrmiller/itwip
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakAction;
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -10,14 +10,19 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.lucene.util.QuickPatchThreadsFilter;
 import org.apache.solr.SolrIgnoredThreadsFilter;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.cloud.MiniSolrCloudCluster;
 import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.crossdc.common.MirroredSolrRequest;
 import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
 import org.apache.solr.crossdc.consumer.Consumer;
-import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.slf4j.Logger;
@@ -25,31 +30,13 @@ import org.slf4j.LoggerFactory;
 
 import java.lang.invoke.MethodHandles;
 import java.util.Properties;
-=======
-import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
-import org.apache.lucene.util.QuickPatchThreadsFilter;
-import org.apache.solr.SolrIgnoredThreadsFilter;
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.cloud.MiniSolrCloudCluster;
-import org.apache.solr.cloud.SolrCloudTestCase;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.crossdc.common.MirroredSolrRequest;
-import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import java.util.Map;
->>>>>>> Add back in the EmbeddedKafkaCluster.
 
 import static org.mockito.Mockito.spy;
 
-@ThreadLeakFilters(
-    defaultFilters = true,
-    filters = { SolrIgnoredThreadsFilter.class, QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class})
-<<<<<<< refs/remotes/markrmiller/itwip
-@ThreadLeakAction(ThreadLeakAction.Action.INTERRUPT)
-public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
+@ThreadLeakFilters(defaultFilters = true, filters = { SolrIgnoredThreadsFilter.class,
+    QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class })
+@ThreadLeakLingering(linger = 5000) public class SolrAndKafkaIntegrationTest extends
+    SolrTestCaseJ4 {
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
@@ -64,28 +51,14 @@ public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
   protected static volatile Consumer consumer = new Consumer();
 
   private static String TOPIC = "topic1";
-  
-  private static String COLLECTION = "collection1";
-=======
-public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
-  static final String VERSION_FIELD = "_version_";
-
-  private static final int NUM_BROKERS = 1;
-  public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
 
-  protected static volatile MiniSolrCloudCluster cluster1;
-  protected static volatile MiniSolrCloudCluster cluster2;
-  private static SolrMessageProcessor processor;
->>>>>>> Add back in the EmbeddedKafkaCluster.
+  private static String COLLECTION = "collection1";
 
-  private static ResubmitBackoffPolicy backoffPolicy = spy(new TestMessageProcessor.NoOpResubmitBackoffPolicy());
+  @BeforeClass public static void setupIntegrationTest() throws Exception {
 
-  @BeforeClass
-  public static void setupIntegrationTest() throws Exception {
-<<<<<<< refs/remotes/markrmiller/itwip
     Properties config = new Properties();
-    config.put("unclean.leader.election.enable", "true");
-    config.put("enable.partition.eof", "false");
+    //config.put("unclean.leader.election.enable", "true");
+    //config.put("enable.partition.eof", "false");
 
     kafkaCluster = new EmbeddedKafkaCluster(NUM_BROKERS, config) {
       public String bootstrapServers() {
@@ -96,10 +69,11 @@ public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
 
     kafkaCluster.createTopic(TOPIC, 1, 1);
 
-    solrCluster1 =
-        new Builder(2, createTempDir())
-            .addConfig("conf", getFile("src/test/resources/configs/cloud-minimal/conf").toPath())
-            .configure();
+    System.setProperty("topicName", TOPIC);
+    System.setProperty("bootstrapServers", kafkaCluster.bootstrapServers());
+
+    solrCluster1 = new SolrCloudTestCase.Builder(1, createTempDir()).addConfig("conf",
+        getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
 
     CollectionAdminRequest.Create create =
         CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
@@ -108,27 +82,26 @@ public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
 
     solrCluster1.getSolrClient().setDefaultCollection(COLLECTION);
 
-    String bootstrapServers = kafkaCluster.bootstrapServers();
-    log.info("bootstrapServers={}", bootstrapServers);
+    solrCluster2 = new SolrCloudTestCase.Builder(1, createTempDir()).addConfig("conf",
+        getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
 
-    consumer.start(bootstrapServers, solrCluster1.getZkServer().getZkAddress(), TOPIC, false, 0);
+    CollectionAdminRequest.Create create2 =
+        CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
+    solrCluster2.getSolrClient().request(create2);
+    solrCluster2.waitForActiveCollection(COLLECTION, 1, 1);
 
-=======
+    solrCluster2.getSolrClient().setDefaultCollection(COLLECTION);
 
-    CLUSTER.start();
+    String bootstrapServers = kafkaCluster.bootstrapServers();
+    log.info("bootstrapServers={}", bootstrapServers);
 
-    cluster1 =
-        new Builder(2, createTempDir())
-            .addConfig("conf", getFile("src/resources/configs/cloud-minimal/conf").toPath())
-            .configure();
+    consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, false, 0);
 
-    processor = new SolrMessageProcessor(cluster1.getSolrClient(), backoffPolicy);
->>>>>>> Add back in the EmbeddedKafkaCluster.
   }
 
-  @AfterClass
-  public static void tearDownIntegrationTest() throws Exception {
-<<<<<<< refs/remotes/markrmiller/itwip
+  @AfterClass public static void tearDownIntegrationTest() throws Exception {
+    ObjectReleaseTracker.clear();
+
     consumer.shutdown();
 
     try {
@@ -138,14 +111,55 @@ public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
     }
 
     if (solrCluster1 != null) {
+      solrCluster1.getZkServer().getZkClient().printLayoutToStdOut();
       solrCluster1.shutdown();
     }
     if (solrCluster2 != null) {
+      solrCluster2.getZkServer().getZkClient().printLayoutToStdOut();
       solrCluster2.shutdown();
     }
+    //ObjectReleaseTracker.clear();
+    // if (solrCluster2 != null) {
+    //   solrCluster2.shutdown();
+    //}
   }
 
-  public void test() throws InterruptedException {
+  public void testFullCloudToCloud() throws Exception {
+    Thread.sleep(10000); // TODO why?
+
+    CloudSolrClient client = solrCluster1.getSolrClient();
+    SolrInputDocument doc = new SolrInputDocument();
+    doc.addField("id", String.valueOf(System.currentTimeMillis()));
+    doc.addField("text", "some test");
+
+    client.add(doc);
+
+    client.commit(COLLECTION);
+
+    System.out.println("Sent producer record");
+
+    QueryResponse results = null;
+    boolean foundUpdates = false;
+    for (int i = 0; i < 50; i++) {
+      solrCluster2.getSolrClient().commit(COLLECTION);
+      solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+      results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+      if (results.getResults().getNumFound() == 1) {
+        foundUpdates = true;
+      } else {
+        Thread.sleep(500);
+      }
+    }
+
+    //producer.close();
+    System.out.println("Closed producer");
+
+    assertTrue("results=" + results, foundUpdates);
+    System.out.println("Rest: " + results);
+
+  }
+
+  public void testProducerToCloud() throws Exception {
     Thread.sleep(10000);
     Properties properties = new Properties();
     properties.put("bootstrap.servers", kafkaCluster.bootstrapServers());
@@ -159,8 +173,8 @@ public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
     Producer<String, MirroredSolrRequest> producer = new KafkaProducer(properties);
     UpdateRequest updateRequest = new UpdateRequest();
     updateRequest.setParam("shouldMirror", "true");
-    updateRequest.add("id", String.valueOf(System.currentTimeMillis()));
-    updateRequest.add("text", "test");
+    updateRequest.add("id", String.valueOf(System.currentTimeMillis()), "text", "test");
+    updateRequest.add("id", String.valueOf(System.currentTimeMillis() + 22), "text", "test2");
     updateRequest.setParam("collection", COLLECTION);
     MirroredSolrRequest mirroredSolrRequest = new MirroredSolrRequest(updateRequest);
     System.out.println("About to send producer record");
@@ -168,25 +182,26 @@ public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
       log.info("Producer finished sending metadata={}, exception={}", metadata, exception);
     });
     producer.flush();
-    System.out.println("Sent producer record");
-    producer.close();
-    System.out.println("Closed producer");
 
-    Thread.sleep(10000);
-=======
-
-    CLUSTER.stop();
+    System.out.println("Sent producer record");
 
-    if (cluster1 != null) {
-      cluster1.shutdown();
-    }
-    if (cluster2 != null) {
-      cluster2.shutdown();
+    QueryResponse results = null;
+    boolean foundUpdates = false;
+    for (int i = 0; i < 50; i++) {
+     // solrCluster1.getSolrClient().commit(COLLECTION);
+      results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+      if (results.getResults().getNumFound() == 1) {
+        foundUpdates = true;
+      } else {
+        Thread.sleep(500);
+      }
     }
-  }
 
-  public void test() {
+    System.out.println("Closed producer");
+
+    assertTrue("results=" + results, foundUpdates);
+    System.out.println("Rest: " + results);
 
->>>>>>> Add back in the EmbeddedKafkaCluster.
+    producer.close();
   }
 }
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java b/crossdc-producer/src/test/Java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java
similarity index 100%
rename from crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java
rename to crossdc-producer/src/test/Java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java
diff --git a/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/schema.xml b/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/schema.xml
new file mode 100644
index 0000000..bc4676c
--- /dev/null
+++ b/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/schema.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<schema name="minimal" version="1.1">
+    <fieldType name="boolean" class="solr.BoolField" sortMissingLast="true"/>
+    <fieldType name="string" class="solr.StrField" docValues="true"/>
+    <fieldType name="int" class="org.apache.solr.schema.IntPointField" docValues="true" omitNorms="true"
+               positionIncrementGap="0"/>
+    <fieldType name="long" class="org.apache.solr.schema.LongPointField" docValues="true" omitNorms="true"
+               positionIncrementGap="0"/>
+    <fieldType name="float" class="org.apache.solr.schema.FloatPointField" docValues="true" omitNorms="true"
+               positionIncrementGap="0"/>
+    <fieldType name="double" class="org.apache.solr.schema.DoublePointField" docValues="true" omitNorms="true"
+               positionIncrementGap="0"/>
+    <fieldType name="date" class="org.apache.solr.schema.DatePointField" docValues="true" omitNorms="true"
+               positionIncrementGap="0"/>
+    <fieldType name="text" class="solr.TextField">
+        <analyzer>
+            <tokenizer class="solr.StandardTokenizerFactory"/>
+            <filter class="solr.LowerCaseFilterFactory"/>
+        </analyzer>
+    </fieldType>
+
+    <!-- for versioning -->
+    <field name="_version_" type="long" indexed="true" stored="true"/>
+    <field name="_root_" type="string" indexed="true" stored="true" multiValued="false" required="false"/>
+    <field name="id" type="string" indexed="true" stored="true"/>
+    <field name="text" type="text" indexed="true" stored="false"/>
+
+    <dynamicField name="*_b" type="boolean" indexed="true" stored="true"/>
+    <dynamicField name="*_s" type="string" indexed="true" stored="false"/>
+    <dynamicField name="*_t" type="text" indexed="true" stored="false"/>
+    <dynamicField name="*_i" type="int" indexed="false" stored="false"/>
+    <dynamicField name="*_l" type="long" indexed="false" stored="false"/>
+    <dynamicField name="*_f" type="float" indexed="false" stored="false"/>
+    <dynamicField name="*_d" type="double" indexed="false" stored="false"/>
+    <dynamicField name="*_dt" type="date" indexed="false" stored="false"/>
+
+    <uniqueKey>id</uniqueKey>
+</schema>
diff --git a/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml b/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml
new file mode 100644
index 0000000..6e057ab
--- /dev/null
+++ b/crossdc-producer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml
@@ -0,0 +1,120 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- Minimal solrconfig.xml with /select, /admin and /update only -->
+
+<config>
+
+  <dataDir>${solr.data.dir:}</dataDir>
+
+  <directoryFactory name="DirectoryFactory"
+                    class="${directoryFactory:solr.NRTCachingDirectoryFactory}"/>
+  <schemaFactory class="ClassicIndexSchemaFactory"/>
+
+  <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+
+  <indexConfig>
+    <mergePolicyFactory class="${mergePolicyFactory:org.apache.solr.index.TieredMergePolicyFactory}">
+      <int name="maxMergeAtOnce">${maxMergeAtOnce:10}</int>
+      <int name="segmentsPerTier">${segmentsPerTier:10}</int>
+      <double name="noCFSRatio">${noCFSRatio:.1}</double>
+    </mergePolicyFactory>
+
+    <useCompoundFile>${useCompoundFile:true}</useCompoundFile>
+
+    <ramBufferSizeMB>${ramBufferSizeMB:160}</ramBufferSizeMB>
+    <maxBufferedDocs>${maxBufferedDocs:250000}</maxBufferedDocs>     <!-- Force the common case to flush by doc count  -->
+    <!-- <ramPerThreadHardLimitMB>60</ramPerThreadHardLimitMB> -->
+
+    <!-- <mergeScheduler class="org.apache.lucene.index.ConcurrentMergeScheduler">
+      <int name="maxThreadCount">6</int>
+      <int name="maxMergeCount">8</int>
+      <bool name="ioThrottle">false</bool>
+    </mergeScheduler> -->
+
+    <writeLockTimeout>1000</writeLockTimeout>
+    <commitLockTimeout>10000</commitLockTimeout>
+
+    <!-- this sys property is not set by SolrTestCaseJ4 because almost all tests should
+         use the single process lockType for speed - but tests that explicitly need
+         to vary the lockType canset it as needed.
+    -->
+    <lockType>${lockType:single}</lockType>
+
+    <infoStream>${infostream:false}</infoStream>
+
+  </indexConfig>
+
+  <updateHandler class="solr.DirectUpdateHandler2">
+    <commitWithin>
+      <softCommit>${commitwithin.softcommit:true}</softCommit>
+    </commitWithin>
+    <autoCommit>
+      <maxTime>${autoCommit.maxTime:60000}</maxTime>
+    </autoCommit>
+    <updateLog class="${ulog:solr.UpdateLog}" enable="${enable.update.log:true}"/>
+  </updateHandler>
+
+  <requestHandler name="/select" class="solr.SearchHandler">
+    <lst name="defaults">
+      <str name="echoParams">explicit</str>
+      <str name="indent">true</str>
+      <str name="df">text</str>
+    </lst>
+
+  </requestHandler>
+
+  <query>
+    <queryResultCache
+            enabled="${queryResultCache.enabled:false}"
+            class="${queryResultCache.class:solr.CaffeineCache}"
+            size="${queryResultCache.size:0}"
+            initialSize="${queryResultCache.initialSize:0}"
+            autowarmCount="${queryResultCache.autowarmCount:0}"/>
+      <documentCache
+              enabled="${documentCache.enabled:false}"
+              class="${documentCache.class:solr.CaffeineCache}"
+              size="${documentCache.size:0}"
+              initialSize="${documentCache.initialSize:0}"
+              autowarmCount="${documentCache.autowarmCount:0}"/>
+      <filterCache
+              enabled ="${filterCache.enabled:false}"
+              class="${filterCache.class:solr.CaffeineCache}"
+              size="${filterCache.size:1}"
+              initialSize="${filterCache.initialSize:1}"
+              autowarmCount="${filterCache.autowarmCount:0}"
+              async="${filterCache.async:false}"/>
+    <cache name="myPerSegmentCache"
+           enabled="${myPerSegmentCache.enabled:false}"
+           class="${myPerSegmentCache.class:solr.CaffeineCache}"
+           size="${myPerSegmentCache.size:0}"
+           initialSize="${myPerSegmentCache.initialSize:0}"
+           autowarmCount="${myPerSegmentCache.autowarmCount:0}"/>
+  </query>
+
+  <updateRequestProcessorChain  name="mirrorUpdateChain" default="true">
+    <processor class="org.apache.solr.update.processor.MirroringUpdateRequestProcessorFactory">
+
+    </processor>
+    <processor class="solr.LogUpdateProcessorFactory" />
+    <processor class="solr.RunUpdateProcessorFactory" />
+  </updateRequestProcessorChain>
+
+</config>
+
diff --git a/crossdc-consumer/src/test/resources/log4j2.xml b/crossdc-producer/src/test/resources/log4j2.xml
similarity index 62%
copy from crossdc-consumer/src/test/resources/log4j2.xml
copy to crossdc-producer/src/test/resources/log4j2.xml
index 96f69f1..c63e736 100644
--- a/crossdc-consumer/src/test/resources/log4j2.xml
+++ b/crossdc-producer/src/test/resources/log4j2.xml
@@ -26,9 +26,33 @@
         </Pattern>
       </PatternLayout>
     </Console>
+
+    <RollingRandomAccessFile
+            name="MainLogFile"
+            fileName="${sys:log.dir:-logs}/${sys:log.name:-solr}.log"
+            filePattern="${sys:log.dir:-logs}/${sys:log.name:-solr}.log.%i">
+      <PatternLayout>
+        <Pattern>
+          %maxLen{%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p (%t) [%notEmpty{c:%X{collection}}%notEmpty{ s:%X{shard}}%notEmpty{ r:%X{replica}}%notEmpty{ x:%X{core}}] %c{1.}
+          %m%notEmpty{ =>%ex{short}}}{10240}%n
+        </Pattern>
+      </PatternLayout>
+      <Policies>
+        <OnStartupTriggeringPolicy/>
+        <SizeBasedTriggeringPolicy size="128 MB"/>
+      </Policies>
+      <DefaultRolloverStrategy max="10"/>
+    </RollingRandomAccessFile>
   </Appenders>
   <Loggers>
-    <!-- Use <AsyncLogger/<AsyncRoot and <Logger/<Root for asynchronous logging or synchonous logging respectively -->
+
+
+    <Logger name="kafka.server.KafkaConfig" level="WARN"/>
+    <Logger name="org.apache.kafka.clients.producer.ProducerConfig" level="WARN"/>
+    <Logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="WARN"/>
+    <Logger name="org.apache.kafka.clients.admin.AdminClientConfig" level="WARN"/>
+
+
     <Logger name="org.apache.zookeeper" level="WARN"/>
     <Logger name="org.apache.hadoop" level="WARN"/>
     <Logger name="org.apache.directory" level="WARN"/>
@@ -36,6 +60,7 @@
     <Logger name="org.eclipse.jetty" level="INFO"/>
 
     <Root level="INFO">
+      <AppenderRef ref="MainLogFile"/>
       <AppenderRef ref="STDERR"/>
     </Root>
   </Loggers>
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 8cf6eb5..ffed3a2 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,5 +1,5 @@
 distributionBase=GRADLE_USER_HOME
 distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-6.8.3-all.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-7.2-bin.zip
 zipStoreBase=GRADLE_USER_HOME
 zipStorePath=wrapper/dists
diff --git a/crossdc-consumer/src/test/resources/log4j2.xml b/log4j2.xml
similarity index 62%
copy from crossdc-consumer/src/test/resources/log4j2.xml
copy to log4j2.xml
index 96f69f1..c63e736 100644
--- a/crossdc-consumer/src/test/resources/log4j2.xml
+++ b/log4j2.xml
@@ -26,9 +26,33 @@
         </Pattern>
       </PatternLayout>
     </Console>
+
+    <RollingRandomAccessFile
+            name="MainLogFile"
+            fileName="${sys:log.dir:-logs}/${sys:log.name:-solr}.log"
+            filePattern="${sys:log.dir:-logs}/${sys:log.name:-solr}.log.%i">
+      <PatternLayout>
+        <Pattern>
+          %maxLen{%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p (%t) [%notEmpty{c:%X{collection}}%notEmpty{ s:%X{shard}}%notEmpty{ r:%X{replica}}%notEmpty{ x:%X{core}}] %c{1.}
+          %m%notEmpty{ =>%ex{short}}}{10240}%n
+        </Pattern>
+      </PatternLayout>
+      <Policies>
+        <OnStartupTriggeringPolicy/>
+        <SizeBasedTriggeringPolicy size="128 MB"/>
+      </Policies>
+      <DefaultRolloverStrategy max="10"/>
+    </RollingRandomAccessFile>
   </Appenders>
   <Loggers>
-    <!-- Use <AsyncLogger/<AsyncRoot and <Logger/<Root for asynchronous logging or synchonous logging respectively -->
+
+
+    <Logger name="kafka.server.KafkaConfig" level="WARN"/>
+    <Logger name="org.apache.kafka.clients.producer.ProducerConfig" level="WARN"/>
+    <Logger name="org.apache.kafka.clients.consumer.ConsumerConfig" level="WARN"/>
+    <Logger name="org.apache.kafka.clients.admin.AdminClientConfig" level="WARN"/>
+
+
     <Logger name="org.apache.zookeeper" level="WARN"/>
     <Logger name="org.apache.hadoop" level="WARN"/>
     <Logger name="org.apache.directory" level="WARN"/>
@@ -36,6 +60,7 @@
     <Logger name="org.eclipse.jetty" level="INFO"/>
 
     <Root level="INFO">
+      <AppenderRef ref="MainLogFile"/>
       <AppenderRef ref="STDERR"/>
     </Root>
   </Loggers>