You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ma...@apache.org on 2022/05/31 22:09:19 UTC

[solr-sandbox] branch crossdc-wip updated: Work out SolrAndKafkaIntegrationTest (#16)

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/crossdc-wip by this push:
     new 9b0cced  Work out SolrAndKafkaIntegrationTest (#16)
9b0cced is described below

commit 9b0cced1b6d6eff642de8120374eb7ae76e1f3c0
Author: Mark Robert Miller <ma...@apache.org>
AuthorDate: Tue May 31 17:09:14 2022 -0500

    Work out SolrAndKafkaIntegrationTest (#16)
    
    Each of the 2 tests still needs to be run separately - need to fix up running both in same run.
---
 .../solr/crossdc/common/KafkaMirroringSink.java    |  17 ++
 .../common/MirroredSolrRequestSerializer.java      |   2 +-
 .../org/apache/solr/crossdc/consumer/Consumer.java | 225 +-----------------
 .../crossdc/consumer/KafkaCrossDcConsumer.java     | 252 +++++++++++++++++++++
 .../messageprocessor/SolrMessageProcessor.java     |  26 ++-
 .../processor/KafkaRequestMirroringHandler.java    |  12 +-
 .../update/processor/MirroringUpdateProcessor.java | 143 ++++++++++++
 .../MirroringUpdateRequestProcessorFactory.java    | 118 ----------
 .../solr/crossdc/SolrAndKafkaIntegrationTest.java  |  21 +-
 .../SolrKafkaTestsIgnoredThreadsFilter.java        |  10 +-
 crossdc-producer/src/test/resources/log4j2.xml     |  11 +-
 11 files changed, 474 insertions(+), 363 deletions(-)

diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
index b062392..3b245f9 100644
--- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
@@ -18,6 +18,7 @@ package org.apache.solr.crossdc.common;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,6 +54,12 @@ public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
 
         // Create Producer record
         try {
+
+            producer.send(new ProducerRecord(conf.getTopicName(), request), (metadata, exception) -> {
+                log.info("Producer finished sending metadata={}, exception={}", metadata, exception);
+            });
+            producer.flush();
+
             lastSuccessfulEnqueueNanos = System.nanoTime();
             // Record time since last successful enqueue as 0
             long elapsedTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - enqueueStartNanos);
@@ -86,9 +93,19 @@ public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
 
         props.put("bootstrap.servers", conf.getBootStrapServers());
 
+        props.put("acks", "all");
+        props.put("retries", 99999);
+        props.put("batch.size", 15);
+        props.put("buffer.memory", 33554432);
+        props.put("linger.ms", 1);
+
         props.put("key.serializer", StringSerializer.class.getName());
         props.put("value.serializer", MirroredSolrRequestSerializer.class.getName());
 
+        if (log.isDebugEnabled()) {
+            log.debug("Kafka Producer props={}", props);
+        }
+
         ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader();
         Thread.currentThread().setContextClassLoader(null);
         Producer<String, MirroredSolrRequest> producer;
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
index 221f12f..1007c0b 100644
--- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
@@ -107,7 +107,7 @@ public class MirroredSolrRequestSerializer implements Serializer<MirroredSolrReq
 
         ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream();
         Map map = new HashMap();
-        map.put("params", solrRequest.getParams().getMap());
+        map.put("params", solrRequest.getParams());
         map.put("docs", solrRequest.getDocuments());
 
         // TODO
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
index 5b3d01c..5bc7f6b 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -16,20 +16,7 @@
  */
 package org.apache.solr.crossdc.consumer;
 
-import org.apache.kafka.clients.consumer.*;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.SerializationException;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.common.util.IOUtils;
-import org.apache.solr.crossdc.common.KafkaMirroringSink;
-import org.apache.solr.crossdc.common.MirroringException;
-import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
-import org.apache.solr.crossdc.common.IQueueHandler;
 import org.apache.solr.crossdc.common.KafkaCrossDcConf;
-import org.apache.solr.crossdc.common.MirroredSolrRequest;
-import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
 import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.Server;
@@ -38,11 +25,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.invoke.MethodHandles;
-import java.time.Duration;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -81,6 +63,9 @@ public class Consumer {
         crossDcConsumer = getCrossDcConsumer(bootstrapServers, zkConnectString, topicName, enableDataEncryption);
 
         // Start consumer thread
+
+        log.info("Starting CrossDC Consumer bootstrapServers={}, zkConnectString={}, topicName={}, enableDataEncryption={}", bootstrapServers, zkConnectString, topicName, enableDataEncryption);
+
         consumerThreadExecutor = Executors.newSingleThreadExecutor();
         consumerThreadExecutor.submit(crossDcConsumer);
 
@@ -122,208 +107,4 @@ public class Consumer {
 
     }
 
-    /**
-     * Class to run the consumer thread for Kafka. This also contains the implementation for retries and
-     * resubmitting to the queue in case of temporary failures.
-     */
-    public static class KafkaCrossDcConsumer extends CrossDcConsumer {
-        private static final Logger log = LoggerFactory.getLogger(KafkaCrossDcConsumer.class);
-
-        private final KafkaConsumer<String, MirroredSolrRequest> consumer;
-        private final KafkaMirroringSink kafkaMirroringSink;
-
-        private final int KAFKA_CONSUMER_POLL_TIMEOUT_MS = 100;
-        private final String topicName;
-        SolrMessageProcessor messageProcessor;
-
-        CloudSolrClient solrClient;
-
-        /**
-         * @param conf The Kafka consumer configuration
-         */
-        public KafkaCrossDcConsumer(KafkaCrossDcConf conf) {
-            this.topicName = conf.getTopicName();
-
-            final Properties kafkaConsumerProp = new Properties();
-
-            kafkaConsumerProp.put("bootstrap.servers", conf.getBootStrapServers());
-
-            kafkaConsumerProp.put("group.id", "group_1"); // TODO
-
-            solrClient = new CloudSolrClient.Builder(Collections.singletonList(conf.getSolrZkConnectString()), Optional.empty()).build();
-
-            messageProcessor = new SolrMessageProcessor(solrClient, new ResubmitBackoffPolicy() {
-                @Override public long getBackoffTimeMs(MirroredSolrRequest resubmitRequest) {
-                    return 0;
-                }
-            });
-
-            log.info("Creating Kafka consumer with configuration {}", kafkaConsumerProp);
-            consumer = createConsumer(kafkaConsumerProp);
-
-            // Create producer for resubmitting failed requests
-            log.info("Creating Kafka resubmit producer");
-            this.kafkaMirroringSink = new KafkaMirroringSink(conf);
-            log.info("Created Kafka resubmit producer");
-
-        }
-
-        private KafkaConsumer<String, MirroredSolrRequest> createConsumer(Properties properties) {
-            KafkaConsumer kafkaConsumer = new KafkaConsumer(properties, new StringDeserializer(), new MirroredSolrRequestSerializer());
-            return kafkaConsumer;
-        }
-
-        /**
-         * This is where the magic happens.
-         * 1. Polls and gets the packets from the queue
-         * 2. Extract the MirroredSolrRequest objects
-         * 3. Send the request to the MirroredSolrRequestHandler that has the processing, retry, error handling logic.
-         */
-        @Override
-        public void run() {
-            log.info("About to start Kafka consumer thread...");
-
-            log.info("Kafka consumer subscribing to topic topic={}", topicName);
-
-            try {
-
-                consumer.subscribe(Collections.singleton(topicName));
-
-                while (pollAndProcessRequests()) {
-                    //no-op within this loop: everything is done in pollAndProcessRequests method defined above.
-                }
-
-                log.info("Closed kafka consumer. Exiting now.");
-                try {
-                    consumer.close();
-                } catch (Exception e) {
-                    log.warn("Failed to close kafka consumer", e);
-                }
-
-                try {
-                    kafkaMirroringSink.close();
-                } catch (Exception e) {
-                    log.warn("Failed to close kafka mirroring sink", e);
-                }
-            } finally {
-                IOUtils.closeQuietly(solrClient);
-            }
-
-        }
-
-        /**
-         * Polls and processes the requests from Kafka. This method returns false when the consumer needs to be
-         * shutdown i.e. when there's a wakeup exception.
-         */
-        boolean pollAndProcessRequests() {
-            try {
-                ConsumerRecords<String, MirroredSolrRequest> records = consumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_TIMEOUT_MS));
-                for (TopicPartition partition : records.partitions()) {
-                    List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords = records.records(partition);
-                    try {
-                        for (ConsumerRecord<String, MirroredSolrRequest> record : partitionRecords) {
-                            log.info("Fetched record from topic={} partition={} key={} value={}",
-                                    record.topic(), record.partition(), record.key(), record.value());
-                            IQueueHandler.Result result = messageProcessor.handleItem(record.value());
-                            switch (result.status()) {
-                                case FAILED_RESUBMIT:
-                                    kafkaMirroringSink.submit(record.value());
-                                    break;
-                                case HANDLED:
-                                    // no-op
-                                    break;
-                                case NOT_HANDLED_SHUTDOWN:
-                                case FAILED_RETRY:
-                                    log.error("Unexpected response while processing request. We never expect {}.",
-                                            result.status().toString());
-                                    break;
-                                default:
-                                    // no-op
-                            }
-                        }
-                        updateOffset(partition, partitionRecords);
-
-                        // handleItem sets the thread interrupt, let's exit if there has been an interrupt set
-                        if(Thread.currentThread().isInterrupted()) {
-                            log.info("Kafka Consumer thread interrupted, shutting down Kafka consumer.");
-                            return false;
-                        }
-                    } catch (MirroringException e) {
-                        // We don't really know what to do here, so it's wiser to just break out.
-                        log.error("Mirroring exception occured while resubmitting to Kafka. We are going to stop the consumer thread now.", e);
-                        return false;
-                    } catch (WakeupException e) {
-                        log.info("Caught wakeup exception, shutting down KafkaSolrRequestConsumer.");
-                        return false;
-                    } catch (Exception e) {
-                        // If there is any exception returned by handleItem, then reset the offset.
-
-                        if (e instanceof ClassCastException || e instanceof ClassNotFoundException || e instanceof SerializationException) {
-                            log.error("Non retryable error", e);
-                            break;
-                        }
-                        log.warn("Exception occurred in Kafka consumer thread, but we will continue.", e);
-                        resetOffsetForPartition(partition, partitionRecords);
-                        break;
-                    }
-                }
-            } catch (WakeupException e) {
-                log.info("Caught wakeup exception, shutting down KafkaSolrRequestConsumer");
-                return false;
-            } catch (Exception e) {
-
-                e.printStackTrace();
-                if (e instanceof ClassCastException || e instanceof ClassNotFoundException || e instanceof SerializationException) {
-                    log.error("Non retryable error", e);
-                    return false;
-                }
-
-                log.error("Exception occurred in Kafka consumer thread, but we will continue.", e);
-            }
-            return true;
-        }
-
-        /**
-         * Reset the local offset so that the consumer reads the records from Kafka again.
-         * @param partition The TopicPartition to reset the offset for
-         * @param partitionRecords PartitionRecords for the specified partition
-         */
-        private void resetOffsetForPartition(TopicPartition partition, List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords) {
-            if (log.isDebugEnabled()) {
-                log.debug("Resetting offset to: {}", partitionRecords.get(0).offset());
-            }
-            long resetOffset = partitionRecords.get(0).offset();
-            consumer.seek(partition, resetOffset);
-        }
-
-        /**
-         * Logs and updates the commit point for the partition that has been processed.
-         * @param partition The TopicPartition to update the offset for
-         * @param partitionRecords PartitionRecords for the specified partition
-         */
-        private void updateOffset(TopicPartition partition, List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords) {
-            long nextOffset = partitionRecords.get(partitionRecords.size() - 1).offset() + 1;
-            consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(nextOffset)));
-
-            if (log.isTraceEnabled()) {
-                log.trace("Updated offset for topic={} partition={} to offset={}",
-                    partition.topic(), partition.partition(), nextOffset);
-            }
-        }
-
-        /**
-         * Shutdown the Kafka consumer by calling wakeup.
-         */
-        public void shutdown() {
-            log.info("Shutdown called on KafkaCrossDcConsumer");
-            try {
-                solrClient.close();
-            } catch (Exception e) {
-                log.warn("Exception closing Solr client on shutdown");
-            }
-            consumer.wakeup();
-        }
-
-
-    }
 }
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
new file mode 100644
index 0000000..5a7e1aa
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -0,0 +1,252 @@
+package org.apache.solr.crossdc.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.common.util.IOUtils;
+import org.apache.solr.crossdc.common.*;
+import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Class to run the consumer thread for Kafka. This also contains the implementation for retries and
+ * resubmitting to the queue in case of temporary failures.
+ */
+public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final KafkaConsumer<String, MirroredSolrRequest> consumer;
+  private final KafkaMirroringSink kafkaMirroringSink;
+
+  private final int KAFKA_CONSUMER_POLL_TIMEOUT_MS = 5000;
+  private final String topicName;
+  SolrMessageProcessor messageProcessor;
+
+  CloudSolrClient solrClient;
+
+  /**
+   * @param conf The Kafka consumer configuration
+   */
+  public KafkaCrossDcConsumer(KafkaCrossDcConf conf) {
+    this.topicName = conf.getTopicName();
+
+    final Properties kafkaConsumerProp = new Properties();
+
+    kafkaConsumerProp.put("bootstrap.servers", conf.getBootStrapServers());
+
+    kafkaConsumerProp.put("group.id", "group_1"); // TODO
+
+    solrClient =
+        new CloudSolrClient.Builder(Collections.singletonList(conf.getSolrZkConnectString()),
+            Optional.empty()).build();
+
+    messageProcessor = new SolrMessageProcessor(solrClient, new ResubmitBackoffPolicy() {
+      @Override public long getBackoffTimeMs(MirroredSolrRequest resubmitRequest) {
+        return 0;
+      }
+    });
+
+    log.info("Creating Kafka consumer with configuration {}", kafkaConsumerProp);
+    consumer = createConsumer(kafkaConsumerProp);
+
+    // Create producer for resubmitting failed requests
+    log.info("Creating Kafka resubmit producer");
+    this.kafkaMirroringSink = new KafkaMirroringSink(conf);
+    log.info("Created Kafka resubmit producer");
+
+  }
+
+  private KafkaConsumer<String, MirroredSolrRequest> createConsumer(Properties properties) {
+    KafkaConsumer kafkaConsumer = new KafkaConsumer(properties, new StringDeserializer(),
+        new MirroredSolrRequestSerializer());
+    return kafkaConsumer;
+  }
+
+  /**
+   * This is where the magic happens.
+   * 1. Polls and gets the packets from the queue
+   * 2. Extract the MirroredSolrRequest objects
+   * 3. Send the request to the MirroredSolrRequestHandler that has the processing, retry, error handling logic.
+   */
+  @Override public void run() {
+    log.info("About to start Kafka consumer thread, topic={}", topicName);
+
+    try {
+
+      consumer.subscribe(Collections.singleton(topicName));
+
+      while (pollAndProcessRequests()) {
+        //no-op within this loop: everything is done in pollAndProcessRequests method defined above.
+      }
+
+      log.info("Closed kafka consumer. Exiting now.");
+      try {
+        consumer.close();
+      } catch (Exception e) {
+        log.warn("Failed to close kafka consumer", e);
+      }
+
+      try {
+        kafkaMirroringSink.close();
+      } catch (Exception e) {
+        log.warn("Failed to close kafka mirroring sink", e);
+      }
+    } finally {
+      IOUtils.closeQuietly(solrClient);
+    }
+
+  }
+
+  /**
+   * Polls and processes the requests from Kafka. This method returns false when the consumer needs to be
+   * shutdown i.e. when there's a wakeup exception.
+   */
+  boolean pollAndProcessRequests() {
+    log.trace("Entered pollAndProcessRequests loop");
+    try {
+      ConsumerRecords<String, MirroredSolrRequest> records =
+          consumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_TIMEOUT_MS));
+      for (TopicPartition partition : records.partitions()) {
+        List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords =
+            records.records(partition);
+        try {
+          for (ConsumerRecord<String, MirroredSolrRequest> record : partitionRecords) {
+            if (log.isTraceEnabled()) {
+              log.trace("Fetched record from topic={} partition={} key={} value={}", record.topic(),
+                  record.partition(), record.key(), record.value());
+            }
+            IQueueHandler.Result result = messageProcessor.handleItem(record.value());
+            switch (result.status()) {
+              case FAILED_RESUBMIT:
+                if (log.isTraceEnabled()) {
+                  log.trace("result=failed-resubmit");
+                }
+                kafkaMirroringSink.submit(record.value());
+                break;
+              case HANDLED:
+                // no-op
+                if (log.isTraceEnabled()) {
+                  log.trace("result=handled");
+                }
+                break;
+              case NOT_HANDLED_SHUTDOWN:
+                if (log.isTraceEnabled()) {
+                  log.trace("result=nothandled_shutdown");
+                }
+              case FAILED_RETRY:
+                log.error("Unexpected response while processing request. We never expect {}.",
+                    result.status().toString());
+                break;
+              default:
+                if (log.isTraceEnabled()) {
+                  log.trace("result=no matching case");
+                }
+                // no-op
+            }
+          }
+          updateOffset(partition, partitionRecords);
+
+          // handleItem sets the thread interrupt, let's exit if there has been an interrupt set
+          if (Thread.currentThread().isInterrupted()) {
+            log.info("Kafka Consumer thread interrupted, shutting down Kafka consumer.");
+            return false;
+          }
+        } catch (MirroringException e) {
+          // We don't really know what to do here, so it's wiser to just break out.
+          log.error(
+              "Mirroring exception occurred while resubmitting to Kafka. We are going to stop the consumer thread now.",
+              e);
+          return false;
+        } catch (WakeupException e) {
+          log.info("Caught wakeup exception, shutting down KafkaSolrRequestConsumer.");
+          return false;
+        } catch (Exception e) {
+          // If there is any exception returned by handleItem, then reset the offset.
+
+          if (e instanceof ClassCastException || e instanceof ClassNotFoundException
+              || e instanceof SerializationException) { // TODO: optional
+            log.error("Non retryable error", e);
+            break;
+          }
+          log.warn("Exception occurred in Kafka consumer thread, but we will continue.", e);
+          resetOffsetForPartition(partition, partitionRecords);
+          break;
+        }
+      }
+    } catch (WakeupException e) {
+      log.info("Caught wakeup exception, shutting down KafkaSolrRequestConsumer");
+      return false;
+    } catch (Exception e) {
+
+      if (e instanceof ClassCastException || e instanceof ClassNotFoundException
+          || e instanceof SerializationException) { // TODO: optional
+        log.error("Non retryable error", e);
+        return false;
+      }
+
+      log.error("Exception occurred in Kafka consumer thread, but we will continue.", e);
+    }
+    return true;
+  }
+
+  /**
+   * Reset the local offset so that the consumer reads the records from Kafka again.
+   *
+   * @param partition        The TopicPartition to reset the offset for
+   * @param partitionRecords PartitionRecords for the specified partition
+   */
+  private void resetOffsetForPartition(TopicPartition partition,
+      List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords) {
+    if (log.isTraceEnabled()) {
+      log.trace("Resetting offset to: {}", partitionRecords.get(0).offset());
+    }
+    long resetOffset = partitionRecords.get(0).offset();
+    consumer.seek(partition, resetOffset);
+  }
+
+  /**
+   * Logs and updates the commit point for the partition that has been processed.
+   *
+   * @param partition        The TopicPartition to update the offset for
+   * @param partitionRecords PartitionRecords for the specified partition
+   */
+  private void updateOffset(TopicPartition partition,
+      List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords) {
+    long nextOffset = partitionRecords.get(partitionRecords.size() - 1).offset() + 1;
+
+    if (log.isTraceEnabled()) {
+      log.trace("Updated offset for topic={} partition={} to offset={}", partition.topic(),
+          partition.partition(), nextOffset);
+    }
+
+    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(nextOffset)));
+  }
+
+  /**
+   * Shutdown the Kafka consumer by calling wakeup.
+   */
+  public void shutdown() {
+    log.info("Shutdown called on KafkaCrossDcConsumer");
+    try {
+      solrClient.close();
+    } catch (Exception e) {
+      log.warn("Exception closing Solr client on shutdown");
+    }
+    consumer.wakeup();
+  }
+
+}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
index 140d0b9..d15e01d 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
@@ -71,12 +71,16 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
     }
 
     private Result<MirroredSolrRequest> handleSolrRequest(MirroredSolrRequest mirroredSolrRequest) {
-        log.debug("Handling Solr request");
+
         SolrRequest request = mirroredSolrRequest.getSolrRequest();
         final SolrParams requestParams = request.getParams();
 
+        if (log.isTraceEnabled()) {
+            log.trace("handleSolrRequest params={}", requestParams);
+        }
+
         final String shouldMirror = requestParams.get("shouldMirror");
-        log.info("shouldMirror={}", shouldMirror);
+
         if ("false".equalsIgnoreCase(shouldMirror)) {
             log.warn("Skipping mirrored request because shouldMirror is set to false. request={}", requestParams);
             return new Result<>(ResultStatus.FAILED_NO_RETRY);
@@ -87,7 +91,6 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
         try {
             prepareIfUpdateRequest(request);
             logRequest(request);
-            log.debug("About to submit Solr request {}", request);
             result = processMirroredSolrRequest(request);
         } catch (Exception e) {
             result = handleException(mirroredSolrRequest, e);
@@ -160,12 +163,19 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
      * Process the SolrRequest. If not, this method throws an exception.
      */
     private Result<MirroredSolrRequest> processMirroredSolrRequest(SolrRequest request) throws Exception {
-        log.info("Sending request to Solr at {} with params {}", client.getZkHost(), request.getParams());
+        if (log.isTraceEnabled()) {
+            log.trace("Sending request to Solr at ZK address={} with params {}", client.getZkStateReader().getZkClient().getZkServerAddress(), request.getParams());
+        }
         Result<MirroredSolrRequest> result;
 
         SolrResponseBase response = (SolrResponseBase) request.process(client);
 
         int status = response.getStatus();
+
+        if (log.isTraceEnabled()) {
+            log.trace("result status={}", status);
+        }
+
         if (status != 0) {
             throw new SolrException(SolrException.ErrorCode.getErrorCode(status), "response=" + response);
         }
@@ -215,11 +225,17 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
      */
     private void sanitizeDocument(SolrInputDocument doc) {
         SolrInputField field = doc.getField(VERSION_FIELD);
-        log.info("Removing {} value={}", VERSION_FIELD, field == null ? "null" : field.getValue());
+        if (log.isTraceEnabled()) {
+            log.trace("Removing {} value={}", VERSION_FIELD,
+                field == null ? "null" : field.getValue());
+        }
         doc.remove(VERSION_FIELD);
     }
 
     private void removeVersionFromDeleteByIds(UpdateRequest updateRequest) {
+        if (log.isTraceEnabled()) {
+            log.trace("remove versions from deletebyids");
+        }
         Map<String, Map<String, Object>> deleteIds = updateRequest.getDeleteByIdMap();
         if (deleteIds != null) {
             for (Map<String, Object> idParams : deleteIds.values()) {
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
index bcd620f..937571b 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
@@ -45,9 +45,13 @@ public class KafkaRequestMirroringHandler implements RequestMirroringHandler {
      */
     @Override
     public void mirror(UpdateRequest request) throws MirroringException {
-        log.info("submit update to sink {}", request.getDocuments());
-            // TODO: Enforce external version constraint for consistent update replication (cross-cluster)
-            sink.submit(new MirroredSolrRequest(1, request, TimeUnit.MILLISECONDS.toNanos(
-                    System.currentTimeMillis())));
+        if (log.isTraceEnabled()) {
+            log.trace("submit update to sink docs={}, deletes={}, params={}", request.getDocuments(), request.getDeleteById(), request.getParams());
+        }
+        // TODO: Enforce external version constraint for consistent update replication (cross-cluster)
+        //if (request.getParams().get("shouldMirror") == null) { // TODO: work out proper shouldMirror semantics
+            request.getParams().set("shouldMirror", "true");
+        //}
+        sink.submit(new MirroredSolrRequest(1, request, TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis())));
     }
 }
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
new file mode 100644
index 0000000..e2e09f2
--- /dev/null
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
@@ -0,0 +1,143 @@
+package org.apache.solr.update.processor;
+
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.CommitUpdateCommand;
+import org.apache.solr.update.DeleteUpdateCommand;
+import org.apache.solr.update.RollbackUpdateCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+
+import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
+
+public class MirroringUpdateProcessor extends UpdateRequestProcessor {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * Flag indicating whether this instance creates and submits a mirrored request. This override is
+   * necessary to prevent circular mirroring between coupled cluster running this processor.
+   */
+  private final boolean doMirroring;
+  private final RequestMirroringHandler requestMirroringHandler;
+
+  /**
+   * The mirrored request starts as null, gets created and appended to at each process() call,
+   * then submitted on finish().
+   */
+  private UpdateRequest mirrorRequest;
+  private final SolrParams mirrorParams;
+
+  /**
+   * The distributed processor downstream from us so we can establish if we're running on a leader shard
+   */
+  private DistributedUpdateProcessor distProc;
+
+  /**
+   * Distribution phase of the incoming requests
+   */
+  private DistributedUpdateProcessor.DistribPhase distribPhase;
+
+  public MirroringUpdateProcessor(final UpdateRequestProcessor next, boolean doMirroring,
+      final SolrParams mirroredReqParams,
+      final DistributedUpdateProcessor.DistribPhase distribPhase,
+      final RequestMirroringHandler requestMirroringHandler) {
+    super(next);
+    this.doMirroring = doMirroring;
+    this.mirrorParams = mirroredReqParams;
+    this.distribPhase = distribPhase;
+    this.requestMirroringHandler = requestMirroringHandler;
+
+    // Find the downstream distributed update processor
+    for (UpdateRequestProcessor proc = next; proc != null; proc = proc.next) {
+      if (proc instanceof DistributedUpdateProcessor) {
+        distProc = (DistributedUpdateProcessor) proc;
+        break;
+      }
+    }
+    if (distProc == null) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          "DistributedUpdateProcessor must follow "
+              + MirroringUpdateProcessor.class.getSimpleName());
+    }
+  }
+
+  private UpdateRequest createAndOrGetMirrorRequest() {
+    if (mirrorRequest == null) {
+      mirrorRequest = new UpdateRequest();
+      mirrorRequest.setParams(new ModifiableSolrParams(mirrorParams));
+    }
+    if (log.isDebugEnabled())
+      log.debug("createOrGetMirrorRequest={}",
+          mirrorRequest);
+    return mirrorRequest;
+  }
+
+  @Override public void processAdd(final AddUpdateCommand cmd) throws IOException {
+    if (log.isDebugEnabled())
+      log.debug("processAdd isLeader={} cmd={}", distProc.isLeader(), cmd);
+    super.processAdd(cmd); // let this throw to prevent mirroring invalid reqs
+
+    // submit only from the leader shards so we mirror each doc once
+    if (doMirroring && distProc.isLeader()) {
+      SolrInputDocument doc = cmd.getSolrInputDocument().deepCopy();
+      doc.removeField(CommonParams.VERSION_FIELD); // strip internal doc version
+      createAndOrGetMirrorRequest().add(doc, cmd.commitWithin, cmd.overwrite);
+    }
+  }
+
+  @Override public void processDelete(final DeleteUpdateCommand cmd) throws IOException {
+    if (log.isDebugEnabled())
+      log.debug("processDelete doMirroring={} isLeader={} cmd={}", doMirroring, distProc.isLeader(), cmd);
+    super.processDelete(cmd); // let this throw to prevent mirroring invalid requests
+
+    if (doMirroring) {
+      if (cmd.isDeleteById()) {
+        // deleteById requests runs once per leader, so we just submit the request from the leader shard
+        if (distProc.isLeader()) {
+          createAndOrGetMirrorRequest().deleteById(cmd.getId()); // strip versions from deletes
+        }
+      } else {
+        // DBQs are sent to each shard leader, so we mirror from the original node to only mirror once
+        // In general there's no way to guarantee that these run identically on the mirror since there are no
+        // external doc versions.
+        // TODO: Can we actually support this considering DBQs aren't versioned.
+        if (distribPhase == DistributedUpdateProcessor.DistribPhase.NONE) {
+          createAndOrGetMirrorRequest().deleteByQuery(cmd.query);
+        }
+      }
+    }
+  }
+
+  @Override public void processRollback(final RollbackUpdateCommand cmd) throws IOException {
+    super.processRollback(cmd);
+    // TODO: We can't/shouldn't support this ?
+  }
+
+  public void processCommit(CommitUpdateCommand cmd) throws IOException {
+    log.info("process commit cmd={}", cmd);
+    if (next != null) next.processCommit(cmd);
+  }
+
+  @Override public void finish() throws IOException {
+    super.finish();
+
+    if (doMirroring && mirrorRequest != null) {
+      try {
+        requestMirroringHandler.mirror(mirrorRequest);
+        mirrorRequest = null; // so we don't accidentally submit it again
+      } catch (Exception e) {
+        log.error("mirror submit failed", e);
+        throw new SolrException(SERVER_ERROR, "mirror submit failed", e);
+      }
+    }
+  }
+}
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
index cabdda8..d66dba7 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -16,12 +16,9 @@
  */
 package org.apache.solr.update.processor;
 
-import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.CloseHook;
 import org.apache.solr.core.CoreDescriptor;
@@ -30,10 +27,6 @@ import org.apache.solr.crossdc.common.KafkaCrossDcConf;
 import org.apache.solr.crossdc.common.KafkaMirroringSink;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
-import org.apache.solr.update.AddUpdateCommand;
-import org.apache.solr.update.CommitUpdateCommand;
-import org.apache.solr.update.DeleteUpdateCommand;
-import org.apache.solr.update.RollbackUpdateCommand;
 import org.apache.solr.util.plugin.SolrCoreAware;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,7 +34,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 
-import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
 import static org.apache.solr.update.processor.DistributedUpdateProcessor.*;
 import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
 
@@ -166,114 +158,4 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
                 DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)), doMirroring ? mirroringHandler : null);
     }
 
-    public static class MirroringUpdateProcessor extends UpdateRequestProcessor {
-        /** Flag indicating whether this instance creates and submits a mirrored request. This override is
-         * necessary to prevent circular mirroring between coupled cluster running this processor. */
-        private final boolean doMirroring;
-        private final RequestMirroringHandler requestMirroringHandler;
-
-        /** The mirrored request starts as null, gets created and appended to at each process() call,
-         * then submitted on finish(). */
-        private UpdateRequest mirrorRequest;
-        private final SolrParams mirrorParams;
-
-        /** The distributed processor downstream from us so we can establish if we're running on a leader shard */
-        private DistributedUpdateProcessor distProc;
-
-        /** Distribution phase of the incoming requests */
-        private DistribPhase distribPhase;
-
-        public MirroringUpdateProcessor(final UpdateRequestProcessor next, boolean doMirroring,
-                                        final SolrParams mirroredReqParams, final DistribPhase distribPhase,
-                                        final RequestMirroringHandler requestMirroringHandler) {
-            super(next);
-            this.doMirroring = doMirroring;
-            this.mirrorParams = mirroredReqParams;
-            this.distribPhase = distribPhase;
-            this.requestMirroringHandler = requestMirroringHandler;
-
-            // Find the downstream distributed update processor
-            for (UpdateRequestProcessor proc = next; proc != null; proc = proc.next) {
-                if (proc instanceof DistributedUpdateProcessor) {
-                    distProc = (DistributedUpdateProcessor) proc;
-                    break;
-                }
-            }
-            if (distProc == null) {
-                throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "DistributedUpdateProcessor must follow "
-                        + MirroringUpdateProcessor.class.getSimpleName());
-            }
-        }
-
-        private UpdateRequest createAndOrGetMirrorRequest() {
-            if (mirrorRequest == null) {
-                mirrorRequest = new UpdateRequest();
-                mirrorRequest.setParams(new ModifiableSolrParams(mirrorParams));
-            }
-            if (log.isDebugEnabled()) log.debug("createOrGetMirrorRequest={}", mirrorRequest);
-            return mirrorRequest;
-        }
-
-        @Override
-        public void processAdd(final AddUpdateCommand cmd) throws IOException {
-            if (log.isDebugEnabled()) log.debug("processAdd isLeader={} cmd={}", distProc.isLeader(), cmd);
-            super.processAdd(cmd); // let this throw to prevent mirroring invalid reqs
-
-            // submit only from the leader shards so we mirror each doc once
-            if (doMirroring && distProc.isLeader()) {
-                SolrInputDocument doc = cmd.getSolrInputDocument().deepCopy();
-                doc.removeField(CommonParams.VERSION_FIELD); // strip internal doc version
-                createAndOrGetMirrorRequest().add(doc, cmd.commitWithin, cmd.overwrite);
-            }
-        }
-
-        @Override
-        public void processDelete(final DeleteUpdateCommand cmd) throws IOException {
-            if (log.isDebugEnabled()) log.debug("processDelete doMirroring={} isLeader={} cmd={}", doMirroring, distProc.isLeader(), cmd);
-            super.processDelete(cmd); // let this throw to prevent mirroring invalid requests
-
-            if (doMirroring) {
-                if (cmd.isDeleteById()) {
-                    // deleteById requests runs once per leader, so we just submit the request from the leader shard
-                    if (distProc.isLeader()) {
-                        createAndOrGetMirrorRequest().deleteById(cmd.getId()); // strip versions from deletes
-                    }
-                } else {
-                    // DBQs are sent to each shard leader, so we mirror from the original node to only mirror once
-                    // In general there's no way to guarantee that these run identically on the mirror since there are no
-                    // external doc versions.
-                    // TODO: Can we actually support this considering DBQs aren't versioned.
-                    if (distribPhase == DistribPhase.NONE) {
-                        createAndOrGetMirrorRequest().deleteByQuery(cmd.query);
-                    }
-                }
-            }
-        }
-
-        @Override
-        public void processRollback(final RollbackUpdateCommand cmd) throws IOException {
-            super.processRollback(cmd);
-            // TODO: We can't/shouldn't support this ?
-        }
-
-        public void processCommit(CommitUpdateCommand cmd) throws IOException {
-            log.info("process commit cmd={}", cmd);
-            if (next != null) next.processCommit(cmd);
-        }
-
-        @Override
-        public void finish() throws IOException {
-            super.finish();
-
-            if (doMirroring && mirrorRequest != null) {
-                try {
-                    requestMirroringHandler.mirror(mirrorRequest);
-                    mirrorRequest = null; // so we don't accidentally submit it again
-                } catch (Exception e) {
-                    log.error("mirror submit failed", e);
-                    throw new SolrException(SERVER_ERROR, "mirror submit failed", e);
-                }
-            }
-        }
-    }
 }
\ No newline at end of file
diff --git a/crossdc-producer/src/test/Java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
similarity index 93%
rename from crossdc-producer/src/test/Java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
rename to crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
index 7df4438..d811bd0 100644
--- a/crossdc-producer/src/test/Java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
@@ -54,11 +54,12 @@ import static org.mockito.Mockito.spy;
 
   private static String COLLECTION = "collection1";
 
-  @BeforeClass public static void setupIntegrationTest() throws Exception {
+  @BeforeClass
+  public static void beforeSolrAndKafkaIntegrationTest() throws Exception {
 
     Properties config = new Properties();
-    //config.put("unclean.leader.election.enable", "true");
-    //config.put("enable.partition.eof", "false");
+    config.put("unclean.leader.election.enable", "true");
+    config.put("enable.partition.eof", "false");
 
     kafkaCluster = new EmbeddedKafkaCluster(NUM_BROKERS, config) {
       public String bootstrapServers() {
@@ -99,7 +100,8 @@ import static org.mockito.Mockito.spy;
 
   }
 
-  @AfterClass public static void tearDownIntegrationTest() throws Exception {
+  @AfterClass
+  public static void afterSolrAndKafkaIntegrationTest() throws Exception {
     ObjectReleaseTracker.clear();
 
     consumer.shutdown();
@@ -151,7 +153,6 @@ import static org.mockito.Mockito.spy;
       }
     }
 
-    //producer.close();
     System.out.println("Closed producer");
 
     assertTrue("results=" + results, foundUpdates);
@@ -160,7 +161,7 @@ import static org.mockito.Mockito.spy;
   }
 
   public void testProducerToCloud() throws Exception {
-    Thread.sleep(10000);
+    Thread.sleep(10000); // TODO: why?
     Properties properties = new Properties();
     properties.put("bootstrap.servers", kafkaCluster.bootstrapServers());
     properties.put("acks", "all");
@@ -185,20 +186,20 @@ import static org.mockito.Mockito.spy;
 
     System.out.println("Sent producer record");
 
+    solrCluster2.getSolrClient().commit(COLLECTION);
+
     QueryResponse results = null;
     boolean foundUpdates = false;
     for (int i = 0; i < 50; i++) {
-     // solrCluster1.getSolrClient().commit(COLLECTION);
+      solrCluster2.getSolrClient().commit(COLLECTION);
       results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
-      if (results.getResults().getNumFound() == 1) {
+      if (results.getResults().getNumFound() == 2) {
         foundUpdates = true;
       } else {
         Thread.sleep(500);
       }
     }
 
-    System.out.println("Closed producer");
-
     assertTrue("results=" + results, foundUpdates);
     System.out.println("Rest: " + results);
 
diff --git a/crossdc-producer/src/test/Java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java
similarity index 90%
rename from crossdc-producer/src/test/Java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java
rename to crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java
index ca6bbb1..747b06f 100644
--- a/crossdc-producer/src/test/Java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java
@@ -34,7 +34,15 @@ public class SolrKafkaTestsIgnoredThreadsFilter implements ThreadFilter {
     if (threadName.startsWith("metrics-meter-tick-thread")) {
       return true;
     }
-    
+
+    if (threadName.startsWith("pool-")) {
+      return true;
+    }
+
+    if (threadName.startsWith("kafka-")) { // TODO
+      return true;
+    }
+
 
     return false;
   }
diff --git a/crossdc-producer/src/test/resources/log4j2.xml b/crossdc-producer/src/test/resources/log4j2.xml
index c63e736..d3705d4 100644
--- a/crossdc-producer/src/test/resources/log4j2.xml
+++ b/crossdc-producer/src/test/resources/log4j2.xml
@@ -29,8 +29,8 @@
 
     <RollingRandomAccessFile
             name="MainLogFile"
-            fileName="${sys:log.dir:-logs}/${sys:log.name:-solr}.log"
-            filePattern="${sys:log.dir:-logs}/${sys:log.name:-solr}.log.%i">
+            fileName="${sys:log.dir:-logs}/${sys:log.name:-crossdc}.log"
+            filePattern="${sys:log.dir:-logs}/${sys:log.name:-crossdc}.log.%i">
       <PatternLayout>
         <Pattern>
           %maxLen{%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p (%t) [%notEmpty{c:%X{collection}}%notEmpty{ s:%X{shard}}%notEmpty{ r:%X{replica}}%notEmpty{ x:%X{core}}] %c{1.}
@@ -59,6 +59,13 @@
     <Logger name="org.apache.solr.hadoop" level="INFO"/>
     <Logger name="org.eclipse.jetty" level="INFO"/>
 
+    <Logger name="org.apache.solr.crossdc.consumer.KafkaCrossDcConsumer" level="TRACE"/>
+    <Logger name="org.apache.solr.update.processor.MirroringUpdateProcessor" level="TRACE"/>
+    <Logger name="org.apache.solr.update.processor.KafkaRequestMirroringHandler" level="TRACE"/>
+    <Logger name="org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor" level="TRACE"/>
+    <Logger name="org.apache.solr.crossdc.common.KafkaMirroringSink" level="TRACE"/>
+
+
     <Root level="INFO">
       <AppenderRef ref="MainLogFile"/>
       <AppenderRef ref="STDERR"/>