You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by an...@apache.org on 2022/05/13 00:29:20 UTC

[solr-sandbox] branch crossdc-wip updated: IntegrationTest WIP (#9)

This is an automated email from the ASF dual-hosted git repository.

anshum pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/crossdc-wip by this push:
     new f7bb6ab  IntegrationTest WIP (#9)
f7bb6ab is described below

commit f7bb6aba1bb71355d05363cd3bc1046aa51026fd
Author: Mark Robert Miller <ma...@apache.org>
AuthorDate: Thu May 12 19:29:15 2022 -0500

    IntegrationTest WIP (#9)
---
 build.gradle                                       |   2 +
 crossdc-consumer/build.gradle                      |  19 +++-
 .../apache/solr/crossdc/KafkaMirroringSink.java    |  27 ++++--
 .../apache/solr/crossdc/ResubmitBackoffPolicy.java |  16 ++++
 .../solr/crossdc/common/CrossDcConstants.java      |   1 -
 .../apache/solr/crossdc/common/IQueueHandler.java  |   1 -
 .../solr/crossdc/common/KafkaCrossDcConf.java      |   8 +-
 .../common/MirroredSolrRequestSerializer.java      |  30 +++++-
 .../org/apache/solr/crossdc/consumer/Consumer.java |  95 ++++++++++++-------
 .../solr/crossdc/helpers/SendDummyUpdates.java     |   2 +-
 .../crossdc/messageprocessor/MessageProcessor.java |  16 ++++
 .../solr/crossdc/SimpleSolrIntegrationTest.java    |   2 +-
 .../solr/crossdc/SolrAndKafkaIntegrationTest.java  | 105 +++++++++++++++++----
 .../configs/cloud-minimal/conf/schema.xml          |   0
 .../configs/cloud-minimal/conf/solrconfig.xml      |   0
 .../src/{ => test}/resources/log4j2.xml            |   0
 16 files changed, 254 insertions(+), 70 deletions(-)

diff --git a/build.gradle b/build.gradle
index 081a382..79f252e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -22,8 +22,10 @@
  * Learn more about Gradle by exploring our samples at https://docs.gradle.org/6.7.1/samples
  */
 
+
 description 'Root for Solr plugins sandbox'
 
+
 subprojects {
     group "org.apache.solr.crossdc"
 }
diff --git a/crossdc-consumer/build.gradle b/crossdc-consumer/build.gradle
index 141b060..ab162f7 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-consumer/build.gradle
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 plugins {
-    id 'java-library'
+    id 'application'
 }
 
 description = 'Cross-DC Consumer package'
@@ -27,13 +27,17 @@ repositories {
     jcenter()
 }
 
+application {
+    mainClass = 'org.apache.solr.crossdc.consumer.Consumer'
+}
+
 dependencies {
     implementation group: 'org.apache.solr', name: 'solr-solrj', version: '8.11.1'
     implementation 'org.slf4j:slf4j-api'
-    api 'org.eclipse.jetty:jetty-http:9.4.41.v20210516'
-    api 'org.eclipse.jetty:jetty-server:9.4.41.v20210516'
-    api 'org.eclipse.jetty:jetty-servlet:9.4.41.v20210516'
-    api 'org.apache.kafka:kafka-clients:2.8.0'
+    compile 'org.eclipse.jetty:jetty-http:9.4.41.v20210516'
+    compile 'org.eclipse.jetty:jetty-server:9.4.41.v20210516'
+    compile 'org.eclipse.jetty:jetty-servlet:9.4.41.v20210516'
+    compile 'org.apache.kafka:kafka-clients:2.8.0'
     compile group: 'com.google.guava', name: 'guava', version: '14.0'
     runtimeOnly ('com.google.protobuf:protobuf-java-util:3.19.2')
     testImplementation 'org.hamcrest:hamcrest:2.2'
@@ -56,6 +60,11 @@ dependencies {
     // testImplementation 'org.apache.kafka:kafka-streams-test-utils:2.8.1'
 
 }
+
 subprojects {
     group "org.apache.solr"
+}
+
+test {
+    jvmArgs '-Djava.security.egd=file:/dev/./urandom'
 }
\ No newline at end of file
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java
index 120241a..d6dcd50 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java
@@ -18,18 +18,22 @@ package org.apache.solr.crossdc;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.solr.crossdc.common.KafkaCrossDcConf;
 import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.lang.invoke.MethodHandles;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
 public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
-    private static final Logger logger = LoggerFactory.getLogger(KafkaMirroringSink.class);
+
+    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
     private long lastSuccessfulEnqueueNanos;
     private KafkaCrossDcConf conf;
@@ -39,13 +43,13 @@ public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
         // Create Kafka Mirroring Sink
         this.conf = conf;
         this.producer = initProducer();
-        logger.info("KafkaMirroringSink has been created. Producer & Topic have been created successfully! Configurations {}", conf);
+        log.info("KafkaMirroringSink has been created. Producer & Topic have been created successfully! Configurations {}", conf);
     }
 
     @Override
     public void submit(MirroredSolrRequest request) throws MirroringException {
-        if (logger.isDebugEnabled()) {
-            logger.debug("About to submit a MirroredSolrRequest");
+        if (log.isDebugEnabled()) {
+            log.debug("About to submit a MirroredSolrRequest");
         }
 
         final long enqueueStartNanos = System.nanoTime();
@@ -53,7 +57,7 @@ public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
         // Create Producer record
         try {
             lastSuccessfulEnqueueNanos = System.nanoTime();
-            // Record time since last successful enque as 0
+            // Record time since last successful enqueue as 0
             long elapsedTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - enqueueStartNanos);
             // Update elapsed time
 
@@ -64,7 +68,7 @@ public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
             // We are intentionally catching all exceptions, the expected exception form this function is {@link MirroringException}
 
             String message = String.format("Unable to enqueue request %s, # of attempts %s", request, conf.getNumOfRetries());
-            logger.error(message, e);
+            log.error(message, e);
 
             throw new MirroringException(message, e);
         }
@@ -80,13 +84,20 @@ public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
     private Producer<String, MirroredSolrRequest> initProducer() {
         // Initialize and return Kafka producer
         Properties props = new Properties();
-        logger.info("Creating Kafka producer! Configurations {} ", conf.toString());
+
+        log.info("Creating Kafka producer! Configurations {} ", conf.toString());
+
+        props.put("bootstrap.servers", conf.getBootStrapServers());
+
+        props.put("key.serializer", StringSerializer.class.getName());
+        props.put("value.serializer", MirroredSolrRequestSerializer.class.getName());
+
         Producer<String, MirroredSolrRequest> producer = new KafkaProducer(props);
         return producer;
     }
 
     private void slowSubmitAction(Object request, long elapsedTimeMillis) {
-        logger.warn("Enqueuing the request to Kafka took more than {} millis. enqueueElapsedTime={}",
+        log.warn("Enqueuing the request to Kafka took more than {} millis. enqueueElapsedTime={}",
                 conf.getSlowSubmitThresholdInMillis(),
                 elapsedTimeMillis);
     }
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/ResubmitBackoffPolicy.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/ResubmitBackoffPolicy.java
index 145e0d0..e531cca 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/ResubmitBackoffPolicy.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/ResubmitBackoffPolicy.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.solr.crossdc;
 
 import org.apache.solr.crossdc.common.MirroredSolrRequest;
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/CrossDcConstants.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/CrossDcConstants.java
index 1d6e355..ba0a73d 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/CrossDcConstants.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/CrossDcConstants.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.solr.crossdc.common;
 
 public class CrossDcConstants {
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/IQueueHandler.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/IQueueHandler.java
index 3c0ddff..a242932 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/IQueueHandler.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/IQueueHandler.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.solr.crossdc.common;
 
 public interface IQueueHandler<T> {
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
index ee6a4a6..9748f66 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
@@ -19,12 +19,14 @@ package org.apache.solr.crossdc.common;
 public class KafkaCrossDcConf extends CrossDcConf {
     private final String topicName;
     private final boolean enableDataEncryption;
+    private final String bootstrapServers;
     private long slowSubmitThresholdInMillis;
     private int numOfRetries = 5;
     private final String solrZkConnectString;
 
 
-    public KafkaCrossDcConf(String topicName, boolean enableDataEncryption, String solrZkConnectString) {
+    public KafkaCrossDcConf(String bootstrapServers, String topicName, boolean enableDataEncryption, String solrZkConnectString) {
+        this.bootstrapServers = bootstrapServers;
         this.topicName = topicName;
         this.enableDataEncryption = enableDataEncryption;
         this.solrZkConnectString = solrZkConnectString;
@@ -55,4 +57,8 @@ public class KafkaCrossDcConf extends CrossDcConf {
     public String getClusterName() {
         return null;
     }
+
+  public String getBootStrapServers() {
+        return bootstrapServers;
+  }
 }
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
index 592a36c..bb3104e 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
@@ -16,16 +16,23 @@
  */
 package org.apache.solr.crossdc.common;
 
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
 import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.lang.invoke.MethodHandles;
 import java.util.Map;
 
-public class MirroredSolrRequestSerializer implements Serializer<MirroredSolrRequest> {
+public class MirroredSolrRequestSerializer implements Serializer<MirroredSolrRequest>, Deserializer<MirroredSolrRequest> {
+
+    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
     private boolean isKey;
     /**
@@ -39,6 +46,27 @@ public class MirroredSolrRequestSerializer implements Serializer<MirroredSolrReq
         this.isKey = isKey;
     }
 
+    @Override
+    public MirroredSolrRequest deserialize(String topic, byte[] data) {
+        UpdateRequest solrRequest;
+
+        JavaBinUpdateRequestCodec codec = new JavaBinUpdateRequestCodec();
+        ByteArrayInputStream bais = new ByteArrayInputStream(data);
+
+        try {
+            solrRequest = codec.unmarshal(bais,
+                (document, req, commitWithin, override) -> {
+
+                });
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        log.info("solrRequest={}, {}", solrRequest.getParams(), solrRequest.getDocuments());
+
+        return new MirroredSolrRequest(solrRequest);
+    }
+
     /**
      * Convert {@code data} into a byte array.
      *
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
index 8885d8a..3373a96 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -14,20 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.solr.crossdc.consumer;
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.crossdc.KafkaMirroringSink;
 import org.apache.solr.crossdc.MirroringException;
+import org.apache.solr.crossdc.ResubmitBackoffPolicy;
 import org.apache.solr.crossdc.common.IQueueHandler;
 import org.apache.solr.crossdc.common.KafkaCrossDcConf;
 import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
 import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.Server;
@@ -40,6 +42,7 @@ import java.lang.invoke.MethodHandles;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -48,27 +51,25 @@ import java.util.concurrent.Executors;
 public class Consumer {
     private static boolean enabled = true;
 
+    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
     /**
      * ExecutorService to manage the cross-dc consumer threads.
      */
     private ExecutorService consumerThreadExecutor;
 
-    private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
-
     private Server server;
     CrossDcConsumer crossDcConsumer;
+    private String topicName;
 
-    public void start(String[] args) {
-
-        boolean enableDataEncryption = Boolean.getBoolean("enableDataEncryption");
-        String topicName = System.getProperty("topicName");
-        String zkConnectString = System.getProperty("zkConnectString");
+    public void start(String bootstrapServers, String zkConnectString, String topicName, boolean enableDataEncryption, int port) {
+        this.topicName = topicName;
 
         server = new Server();
         ServerConnector connector = new ServerConnector(server);
-        connector.setPort(8090);
+        connector.setPort(port);
         server.setConnectors(new Connector[] {connector});
-        crossDcConsumer = getCrossDcConsumer(zkConnectString, topicName, enableDataEncryption);
+        crossDcConsumer = getCrossDcConsumer(bootstrapServers, zkConnectString, topicName, enableDataEncryption);
 
         // Start consumer thread
         consumerThreadExecutor = Executors.newSingleThreadExecutor();
@@ -79,16 +80,26 @@ public class Consumer {
         Runtime.getRuntime().addShutdownHook(shutdownHook);
     }
 
-    private CrossDcConsumer getCrossDcConsumer(String zkConnectString, String topicName,
+    private CrossDcConsumer getCrossDcConsumer(String bootstrapServers, String zkConnectString, String topicName,
         boolean enableDataEncryption) {
 
-        final KafkaCrossDcConf conf = new KafkaCrossDcConf(topicName, enableDataEncryption, zkConnectString);
+        KafkaCrossDcConf conf = new KafkaCrossDcConf(bootstrapServers, topicName, enableDataEncryption, zkConnectString);
         return new KafkaCrossDcConsumer(conf);
     }
 
     public static void main(String[] args) {
+        String bootstrapServers = System.getProperty("bootstrapServers");
+        boolean enableDataEncryption = Boolean.getBoolean("enableDataEncryption");
+        String topicName = System.getProperty("topicName");
+        String zkConnectString = System.getProperty("zkConnectString");
+        String port = System.getProperty("port", "8090");
+
         Consumer consumer = new Consumer();
-        consumer.start(args);
+        consumer.start(bootstrapServers, zkConnectString, topicName, enableDataEncryption, Integer.parseInt(port));
+    }
+
+    public void shutdown() {
+        crossDcConsumer.shutdown();
     }
 
     /**
@@ -96,15 +107,7 @@ public class Consumer {
      */
     public abstract static class CrossDcConsumer implements Runnable {
         SolrMessageProcessor messageProcessor;
-
-    }
-
-    public static class CrossDcConsumerFactory {
-        private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-        CrossDcConsumer getCrossDcConsumer(){
-            return null;
-        }
+        abstract void shutdown();
 
     }
 
@@ -119,13 +122,32 @@ public class Consumer {
         private final KafkaMirroringSink kafkaMirroringSink;
 
         private final int KAFKA_CONSUMER_POLL_TIMEOUT_MS = 100;
+        private final String topicName;
         SolrMessageProcessor messageProcessor;
 
         /**
          * @param conf The Kafka consumer configuration
          */
         public KafkaCrossDcConsumer(KafkaCrossDcConf conf) {
+            this.topicName = conf.getTopicName();
+
             final Properties kafkaConsumerProp = new Properties();
+
+            kafkaConsumerProp.put("bootstrap.servers", conf.getBootStrapServers());
+
+            kafkaConsumerProp.put("key.deserializer", StringDeserializer.class.getName());
+            kafkaConsumerProp.put("value.deserializer", MirroredSolrRequestSerializer.class.getName());
+
+            kafkaConsumerProp.put("group.id", "group_1");
+
+            CloudSolrClient solrClient = new CloudSolrClient.Builder(Collections.singletonList(conf.getSolrZkConnectString()), Optional.empty()).build();
+
+            messageProcessor = new SolrMessageProcessor(solrClient, new ResubmitBackoffPolicy() {
+                @Override public long getBackoffTimeMs(MirroredSolrRequest resubmitRequest) {
+                    return 0;
+                }
+            });
+
             log.info("Creating Kafka consumer with configuration {}", kafkaConsumerProp);
             consumer = createConsumer(kafkaConsumerProp);
 
@@ -150,21 +172,25 @@ public class Consumer {
         @Override
         public void run() {
             log.info("About to start Kafka consumer thread...");
-            String topic="topic";
 
-            log.info("Kafka consumer subscribing to topic topic={}", topic);
-            consumer.subscribe(Collections.singleton(topic));
+            log.info("Kafka consumer subscribing to topic topic={}", topicName);
+            consumer.subscribe(Collections.singleton(topicName));
 
             while (pollAndProcessRequests()) {
                 //no-op within this loop: everything is done in pollAndProcessRequests method defined above.
             }
 
             log.info("Closed kafka consumer. Exiting now.");
-            consumer.close();
+            try {
+                consumer.close();
+            } catch (Exception e) {
+                log.warn("Failed to close kafka consumer", e);
+            }
+
             try {
                 kafkaMirroringSink.close();
-            } catch (IOException e) {
-                log.error("Failed to close kafka mirroring sink", e);
+            } catch (Exception e) {
+                log.warn("Failed to close kafka mirroring sink", e);
             }
 
         }
@@ -180,7 +206,7 @@ public class Consumer {
                     List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords = records.records(partition);
                     try {
                         for (ConsumerRecord<String, MirroredSolrRequest> record : partitionRecords) {
-                            log.trace("Fetched record from topic={} partition={} key={} value={}",
+                            log.info("Fetched record from topic={} partition={} key={} value={}",
                                     record.topic(), record.partition(), record.key(), record.value());
                             IQueueHandler.Result result = messageProcessor.handleItem(record.value());
                             switch (result.status()) {
@@ -260,7 +286,8 @@ public class Consumer {
         /**
          * Shutdown the Kafka consumer by calling wakeup.
          */
-        void shutdown() {
+        public void shutdown() {
+            log.info("Shutdown called on KafkaCrossDcConsumer");
             consumer.wakeup();
         }
 
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/helpers/SendDummyUpdates.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/helpers/SendDummyUpdates.java
index a066741..818369c 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/helpers/SendDummyUpdates.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/helpers/SendDummyUpdates.java
@@ -32,7 +32,7 @@ public class SendDummyUpdates {
         Properties properties = new Properties();
         properties.put("bootstrap.servers", "localhost:9092");
         properties.put("acks", "all");
-        properties.put("retries", 0);
+        properties.put("retries", 3);
         properties.put("batch.size", 16384);
         properties.put("buffer.memory", 33554432);
         properties.put("linger.ms", 1);
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/MessageProcessor.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/MessageProcessor.java
index 210f4ca..efbfef6 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/MessageProcessor.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/MessageProcessor.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.solr.crossdc.messageprocessor;
 
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
index 3177469..547fdca 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SimpleSolrIntegrationTest.java
@@ -32,7 +32,7 @@ public class SimpleSolrIntegrationTest extends SolrCloudTestCase {
 
     cluster1 =
         new SolrCloudTestCase.Builder(2, createTempDir())
-            .addConfig("conf", getFile("src/resources/configs/cloud-minimal/conf").toPath())
+            .addConfig("conf", getFile("src/test/resources/configs/cloud-minimal/conf").toPath())
             .configure();
 
     String collection = "collection1";
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
index 45f5392..b11a087 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
@@ -1,64 +1,135 @@
 package org.apache.solr.crossdc;
 
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakAction;
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.lucene.util.QuickPatchThreadsFilter;
 import org.apache.solr.SolrIgnoredThreadsFilter;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.cloud.MiniSolrCloudCluster;
 import org.apache.solr.cloud.SolrCloudTestCase;
-import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
+import org.apache.solr.crossdc.consumer.Consumer;
 import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.Map;
+import java.lang.invoke.MethodHandles;
+import java.util.Properties;
 
 import static org.mockito.Mockito.spy;
 
 @ThreadLeakFilters(
     defaultFilters = true,
     filters = { SolrIgnoredThreadsFilter.class, QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class})
+@ThreadLeakAction(ThreadLeakAction.Action.INTERRUPT)
 public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   static final String VERSION_FIELD = "_version_";
 
   private static final int NUM_BROKERS = 1;
-  public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+  public static EmbeddedKafkaCluster kafkaCluster;
+
+  protected static volatile MiniSolrCloudCluster solrCluster1;
+  protected static volatile MiniSolrCloudCluster solrCluster2;
 
-  protected static volatile MiniSolrCloudCluster cluster1;
-  protected static volatile MiniSolrCloudCluster cluster2;
-  private static SolrMessageProcessor processor;
+  protected static volatile Consumer consumer = new Consumer();
+
+  private static String TOPIC = "topic1";
+  
+  private static String COLLECTION = "collection1";
 
   private static ResubmitBackoffPolicy backoffPolicy = spy(new TestMessageProcessor.NoOpResubmitBackoffPolicy());
 
   @BeforeClass
   public static void setupIntegrationTest() throws Exception {
+    Properties config = new Properties();
+    config.put("unclean.leader.election.enable", "true");
+    config.put("enable.partition.eof", "false");
+
+    kafkaCluster = new EmbeddedKafkaCluster(NUM_BROKERS, config) {
+      public String bootstrapServers() {
+        return super.bootstrapServers().replaceAll("localhost", "127.0.0.1");
+      }
+    };
+    kafkaCluster.start();
 
-    CLUSTER.start();
+    kafkaCluster.createTopic(TOPIC, 1, 1);
 
-    cluster1 =
+    solrCluster1 =
         new Builder(2, createTempDir())
-            .addConfig("conf", getFile("src/resources/configs/cloud-minimal/conf").toPath())
+            .addConfig("conf", getFile("src/test/resources/configs/cloud-minimal/conf").toPath())
             .configure();
 
-    processor = new SolrMessageProcessor(cluster1.getSolrClient(), backoffPolicy);
+    CollectionAdminRequest.Create create =
+        CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
+    solrCluster1.getSolrClient().request(create);
+    solrCluster1.waitForActiveCollection(COLLECTION, 1, 1);
+
+    solrCluster1.getSolrClient().setDefaultCollection(COLLECTION);
+
+    String bootstrapServers = kafkaCluster.bootstrapServers();
+    log.info("bootstrapServers={}", bootstrapServers);
+
+    consumer.start(bootstrapServers, solrCluster1.getZkServer().getZkAddress(), TOPIC, false, 0);
+
   }
 
   @AfterClass
   public static void tearDownIntegrationTest() throws Exception {
+    consumer.shutdown();
 
-    CLUSTER.stop();
+    try {
+      kafkaCluster.stop();
+    } catch (Exception e) {
+      log.error("Exception stopping Kafka cluster", e);
+    }
 
-    if (cluster1 != null) {
-      cluster1.shutdown();
+    if (solrCluster1 != null) {
+      solrCluster1.shutdown();
     }
-    if (cluster2 != null) {
-      cluster2.shutdown();
+    if (solrCluster2 != null) {
+      solrCluster2.shutdown();
     }
   }
 
-  public void test() {
-
+  public void test() throws InterruptedException {
+    Thread.sleep(10000);
+    Properties properties = new Properties();
+    properties.put("bootstrap.servers", kafkaCluster.bootstrapServers());
+    properties.put("acks", "all");
+    properties.put("retries", 0);
+    properties.put("batch.size", 1);
+    properties.put("buffer.memory", 33554432);
+    properties.put("linger.ms", 1);
+    properties.put("key.serializer", StringSerializer.class.getName());
+    properties.put("value.serializer", MirroredSolrRequestSerializer.class.getName());
+    Producer<String, MirroredSolrRequest> producer = new KafkaProducer(properties);
+    UpdateRequest updateRequest = new UpdateRequest();
+    updateRequest.setParam("shouldMirror", "true");
+    updateRequest.add("id", String.valueOf(System.currentTimeMillis()));
+    updateRequest.add("text", "test");
+    updateRequest.setParam("collection", COLLECTION);
+    MirroredSolrRequest mirroredSolrRequest = new MirroredSolrRequest(updateRequest);
+    System.out.println("About to send producer record");
+    producer.send(new ProducerRecord(TOPIC, mirroredSolrRequest), (metadata, exception) -> {
+      log.info("Producer finished sending metadata={}, exception={}", metadata, exception);
+    });
+    producer.flush();
+    System.out.println("Sent producer record");
+    producer.close();
+    System.out.println("Closed producer");
+
+    Thread.sleep(10000);
   }
 }
diff --git a/crossdc-consumer/src/resources/configs/cloud-minimal/conf/schema.xml b/crossdc-consumer/src/test/resources/configs/cloud-minimal/conf/schema.xml
similarity index 100%
rename from crossdc-consumer/src/resources/configs/cloud-minimal/conf/schema.xml
rename to crossdc-consumer/src/test/resources/configs/cloud-minimal/conf/schema.xml
diff --git a/crossdc-consumer/src/resources/configs/cloud-minimal/conf/solrconfig.xml b/crossdc-consumer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml
similarity index 100%
rename from crossdc-consumer/src/resources/configs/cloud-minimal/conf/solrconfig.xml
rename to crossdc-consumer/src/test/resources/configs/cloud-minimal/conf/solrconfig.xml
diff --git a/crossdc-consumer/src/resources/log4j2.xml b/crossdc-consumer/src/test/resources/log4j2.xml
similarity index 100%
rename from crossdc-consumer/src/resources/log4j2.xml
rename to crossdc-consumer/src/test/resources/log4j2.xml