You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ma...@apache.org on 2022/04/14 06:52:34 UTC

[solr-sandbox] branch crossdc-wip updated: WIP for Cross DC consumer (#5)

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/crossdc-wip by this push:
     new d75761a  WIP for Cross DC consumer  (#5)
d75761a is described below

commit d75761a4b92adf4ac3d8aabacaa7455689447d8d
Author: Mark Robert Miller <ma...@apache.org>
AuthorDate: Thu Apr 14 01:52:30 2022 -0500

    WIP for Cross DC consumer  (#5)
    
    * CrossDc Consumer wip
    
    * Temp commit
    
    * wip commit
    
    * Draft commit, cleanup code
    
    * WIP commit
    
    * Fix tests, WIP commit
    
    * Refactor MessageProcessor and some cleanup.
    
    Co-authored-by: Anshum Gupta <an...@apache.org>
---
 build.gradle                                       |   6 +
 crossdc-consumer/build.gradle                      |  18 +-
 .../apache/solr/crossdc/KafkaMirroringSink.java    |  92 ++++++
 .../apache/solr/crossdc/MirroringException.java}   |  26 +-
 .../apache/solr/crossdc/RequestMirroringSink.java} |  21 +-
 .../apache/solr/crossdc/ResubmitBackoffPolicy.java |   7 +
 .../apache/solr/crossdc/common/CrossDcConf.java}   |   9 +-
 .../solr/crossdc/common/CrossDcConstants.java}     |  15 +-
 .../apache/solr/crossdc/common/IQueueHandler.java  |  75 +++++
 .../solr/crossdc/common/KafkaCrossDcConf.java      |  58 ++++
 .../solr/crossdc/common/MirroredSolrRequest.java   | 124 +++++++++
 .../common/MirroredSolrRequestSerializer.java      |  83 ++++++
 .../solr/crossdc/common/SolrExceptionUtil.java}    |  23 +-
 .../org/apache/solr/crossdc/consumer/Consumer.java | 255 +++++++++++++++++
 .../solr/crossdc/helpers/SendDummyUpdates.java     |  51 ++++
 .../crossdc/messageprocessor/MessageProcessor.java |  17 ++
 .../messageprocessor/SolrMessageProcessor.java     | 308 +++++++++++++++++++++
 .../apache/solr/crossdc/TestMessageProcessor.java  | 125 +++++++++
 gradle/wrapper/gradle-wrapper.properties           |   2 +-
 settings.gradle                                    |   4 +-
 version.props                                      |   0
 21 files changed, 1269 insertions(+), 50 deletions(-)

diff --git a/build.gradle b/build.gradle
index b9644b2..081a382 100644
--- a/build.gradle
+++ b/build.gradle
@@ -21,3 +21,9 @@
  * This is a general purpose Gradle build.
  * Learn more about Gradle by exploring our samples at https://docs.gradle.org/6.7.1/samples
  */
+
+description 'Root for Solr plugins sandbox'
+
+subprojects {
+    group "org.apache.solr.crossdc"
+}
diff --git a/crossdc-consumer/build.gradle b/crossdc-consumer/build.gradle
index 8c44542..58f3dac 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-consumer/build.gradle
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 plugins {
-    id 'java'
+    id 'java-library'
 }
 
 description = 'Cross-DC Consumer package'
@@ -24,7 +24,23 @@ version '1.0-SNAPSHOT'
 
 repositories {
     mavenCentral()
+    jcenter()
 }
 
 dependencies {
+    implementation group: 'org.apache.solr', name: 'solr-solrj', version: '8.11.1'
+    api 'org.eclipse.jetty:jetty-http:9.4.41.v20210516'
+    api 'org.eclipse.jetty:jetty-server:9.4.41.v20210516'
+    api 'org.eclipse.jetty:jetty-servlet:9.4.41.v20210516'
+    api 'org.apache.kafka:kafka-clients:2.8.0'
+    compile group: 'com.google.guava', name: 'guava', version: '14.0'
+    runtimeOnly ('com.google.protobuf:protobuf-java-util:3.19.2')
+    testImplementation 'org.hamcrest:hamcrest:2.2'
+    testImplementation 'junit:junit:4.13.2'
+    testImplementation('org.mockito:mockito-core:4.3.1', {
+        exclude group: "net.bytebuddy", module: "byte-buddy-agent"
+    })
 }
+subprojects {
+    group "org.apache.solr"
+}
\ No newline at end of file
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java
new file mode 100644
index 0000000..1b6109a
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/KafkaMirroringSink.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+public class KafkaMirroringSink implements RequestMirroringSink {
+    private static final Logger logger = LoggerFactory.getLogger(KafkaMirroringSink.class);
+
+    private long lastSuccessfulEnqueueNanos;
+    private KafkaCrossDcConf conf;
+    private final Producer<String, MirroredSolrRequest> producer;
+
+    public KafkaMirroringSink(final KafkaCrossDcConf conf) {
+        // Create Kafka Mirroring Sink
+        this.conf = conf;
+        this.producer = initProducer();
+        logger.info("KafkaMirroringSink has been created. Producer & Topic have been created successfully! Configurations {}", conf);
+    }
+
+    @Override
+    public void submit(MirroredSolrRequest request) throws MirroringException {
+        if (logger.isDebugEnabled()) {
+            logger.debug("About to submit a MirroredSolrRequest");
+        }
+
+        final long enqueueStartNanos = System.nanoTime();
+
+        // Create Producer record
+
+        try {
+            lastSuccessfulEnqueueNanos = System.nanoTime();
+            // Record time since last successful enque as 0
+            long elapsedTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - enqueueStartNanos);
+            // Update elapsed time
+
+            if (elapsedTimeMillis > conf.getSlowSubmitThresholdInMillis()) {
+                slowSubmitAction(request, elapsedTimeMillis);
+            }
+        } catch (Exception e) {
+            // We are intentionally catching all exceptions, the expected exception form this function is {@link MirroringException}
+
+            String message = String.format("Unable to enqueue request %s, # of attempts %s", request, conf.getNumOfRetries());
+            logger.error(message, e);
+
+            throw new MirroringException(message, e);
+        }
+    }
+
+    /**
+     * Create and init the producer using {@link this#conf}
+     * All producer configs are listed here
+     * https://kafka.apache.org/documentation/#producerconfigs
+     *
+     * @return
+     */
+    private Producer<String, MirroredSolrRequest> initProducer() {
+        // Initialize and return Kafka producer
+        Properties props = new Properties();
+        logger.info("Creating Kafka producer! Configurations {} ", conf.toString());
+        Producer<String, MirroredSolrRequest> producer = new KafkaProducer(props);
+        return producer;
+    }
+
+    private void slowSubmitAction(Object request, long elapsedTimeMillis) {
+        logger.warn("Enqueuing the request to Kafka took more than {} millis. enqueueElapsedTime={}",
+                conf.getSlowSubmitThresholdInMillis(),
+                elapsedTimeMillis);
+    }
+}
diff --git a/crossdc-consumer/build.gradle b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/MirroringException.java
similarity index 64%
copy from crossdc-consumer/build.gradle
copy to crossdc-consumer/src/main/java/org/apache/solr/crossdc/MirroringException.java
index 8c44542..2073750 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/MirroringException.java
@@ -14,17 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-plugins {
-    id 'java'
-}
+package org.apache.solr.crossdc;
 
-description = 'Cross-DC Consumer package'
+/**
+ * Exception thrown during cross-dc mirroring
+ */
+public class MirroringException extends Exception {
+    public MirroringException() {
+        super();
+    }
 
-version '1.0-SNAPSHOT'
+    public MirroringException(String message) {
+        super(message);
+    }
 
-repositories {
-    mavenCentral()
-}
+    public MirroringException(String message, Throwable cause) {
+        super(message, cause);
+    }
 
-dependencies {
+    public MirroringException(Throwable cause) {
+        super(cause);
+    }
 }
diff --git a/crossdc-consumer/build.gradle b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/RequestMirroringSink.java
similarity index 55%
copy from crossdc-consumer/build.gradle
copy to crossdc-consumer/src/main/java/org/apache/solr/crossdc/RequestMirroringSink.java
index 8c44542..5d09c16 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/RequestMirroringSink.java
@@ -14,17 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-plugins {
-    id 'java'
-}
-
-description = 'Cross-DC Consumer package'
+package org.apache.solr.crossdc;
 
-version '1.0-SNAPSHOT'
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
 
-repositories {
-    mavenCentral()
-}
+public interface RequestMirroringSink {
 
-dependencies {
+    /**
+     * Submits a mirrored solr request to the appropriate end-point such that it is eventually received by solr
+     * A direct sink may simply use CloudSolrServer to process requests directly.
+     * A queueing sink will serialize the request and submit it to a queue for later consumption
+     * @param request the request that is to be mirrored
+     * @throws MirroringException Implementations may throw an exception
+     */
+    void submit(final MirroredSolrRequest request) throws MirroringException;
 }
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/ResubmitBackoffPolicy.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/ResubmitBackoffPolicy.java
new file mode 100644
index 0000000..145e0d0
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/ResubmitBackoffPolicy.java
@@ -0,0 +1,7 @@
+package org.apache.solr.crossdc;
+
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+
+public interface ResubmitBackoffPolicy {
+  long getBackoffTimeMs(MirroredSolrRequest resubmitRequest);
+}
diff --git a/crossdc-consumer/settings.gradle b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
similarity index 84%
rename from crossdc-consumer/settings.gradle
rename to crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
index 8c7c712..b1f3d24 100644
--- a/crossdc-consumer/settings.gradle
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
@@ -14,11 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.solr.crossdc.common;
 
-rootProject.name = 'crossdc-consumer'
-
-description = 'Module for Apache Solr Cross DC Consumer'
-
-subprojects {
-    group "org.apache.solr.crossdc"
+public abstract class CrossDcConf {
+    public abstract String getClusterName();
 }
diff --git a/crossdc-consumer/build.gradle b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/CrossDcConstants.java
similarity index 80%
copy from crossdc-consumer/build.gradle
copy to crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/CrossDcConstants.java
index 8c44542..1d6e355 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/CrossDcConstants.java
@@ -14,17 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-plugins {
-    id 'java'
-}
-
-description = 'Cross-DC Consumer package'
 
-version '1.0-SNAPSHOT'
-
-repositories {
-    mavenCentral()
-}
+package org.apache.solr.crossdc.common;
 
-dependencies {
+public class CrossDcConstants {
+    // Requests containing this parameter will not be mirrored.
+    public static final String SHOULD_MIRROR = "shouldMirror";
 }
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/IQueueHandler.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/IQueueHandler.java
new file mode 100644
index 0000000..3c0ddff
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/IQueueHandler.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.crossdc.common;
+
+public interface IQueueHandler<T> {
+    enum ResultStatus {
+        /** Item was successfully processed */
+        HANDLED,
+
+        /** Item was not processed, and the consumer should shutdown */
+        NOT_HANDLED_SHUTDOWN,
+
+        /** Item processing failed, and the item should be retried immediately */
+        FAILED_RETRY,
+
+        /** Item processing failed, and the item should not be retried (unsuccessfully processed) */
+        FAILED_NO_RETRY,
+
+        /** Item processing failed, and the item should be re-queued */
+        FAILED_RESUBMIT
+    }
+
+    class Result<T> {
+        private final ResultStatus _status;
+        private final Throwable _throwable;
+        private final T _newItem;
+
+        public Result(final ResultStatus status) {
+            _status = status;
+            _throwable = null;
+            _newItem = null;
+        }
+
+        public Result(final ResultStatus status, final Throwable throwable) {
+            _status = status;
+            _throwable = throwable;
+            _newItem = null;
+        }
+
+        public Result(final ResultStatus status, final Throwable throwable, final T newItem) {
+            _status = status;
+            _throwable = throwable;
+            _newItem = newItem;
+        }
+
+        public ResultStatus status() {
+            return _status;
+        }
+
+        public Throwable throwable() {
+            return _throwable;
+        }
+
+        public T newItem() {
+            return _newItem;
+        }
+    }
+
+    Result<T> handleItem(T item);
+}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
new file mode 100644
index 0000000..ee6a4a6
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc.common;
+
+public class KafkaCrossDcConf extends CrossDcConf {
+    private final String topicName;
+    private final boolean enableDataEncryption;
+    private long slowSubmitThresholdInMillis;
+    private int numOfRetries = 5;
+    private final String solrZkConnectString;
+
+
+    public KafkaCrossDcConf(String topicName, boolean enableDataEncryption, String solrZkConnectString) {
+        this.topicName = topicName;
+        this.enableDataEncryption = enableDataEncryption;
+        this.solrZkConnectString = solrZkConnectString;
+    }
+    public String getTopicName() {
+        return topicName;
+    }
+
+    public boolean getEnableDataEncryption() { return enableDataEncryption; }
+
+    public long getSlowSubmitThresholdInMillis() {
+        return slowSubmitThresholdInMillis;
+    }
+
+    public void setSlowSubmitThresholdInMillis(long slowSubmitThresholdInMillis) {
+        this.slowSubmitThresholdInMillis = slowSubmitThresholdInMillis;
+    }
+
+    public int getNumOfRetries() {
+        return numOfRetries;
+    }
+
+    public String getSolrZkConnectString() {
+        return solrZkConnectString;
+    }
+
+    @Override
+    public String getClusterName() {
+        return null;
+    }
+}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequest.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequest.java
new file mode 100644
index 0000000..4c96116
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc.common;
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.common.params.SolrParams;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class to encapsulate a mirrored Solr request.
+ * This adds a timestamp and #attempts to the request for tracking purpopse.
+ */
+public class MirroredSolrRequest {
+    private final SolrRequest solrRequest;
+
+    // Attempts counter for processing the request
+    private int attempt = 1;
+
+    // Timestamp to track when this request was first written. This should be used to track the replication lag.
+    private long submitTimeNanos = 0;
+
+    public MirroredSolrRequest(final SolrRequest solrRequest) {
+        this(1, solrRequest, 0);
+    }
+
+    public MirroredSolrRequest(final int attempt, final SolrRequest solrRequest, final long submitTimeNanos) {
+        if (solrRequest == null) {
+            throw new NullPointerException("solrRequest cannot be null");
+        }
+        this.attempt = attempt;
+        this.solrRequest = solrRequest;
+        this.submitTimeNanos = submitTimeNanos;
+    }
+
+    public MirroredSolrRequest(final int attempt,
+                               final long submitTimeNanos) {
+        this.attempt = attempt;
+        this.submitTimeNanos = submitTimeNanos;
+        solrRequest = null;
+    }
+
+    public static MirroredSolrRequest mirroredAdminCollectionRequest(SolrParams params) {
+        Map<String, List<String>> createParams = new HashMap();
+        // don't mirror back
+        createParams.put(CrossDcConstants.SHOULD_MIRROR, Collections.singletonList("false"));
+
+        final Iterator<String> paramNamesIterator = params.getParameterNamesIterator();
+        while (paramNamesIterator.hasNext()) {
+            final String key = paramNamesIterator.next();
+            if (key.equals("createNodeSet") || key.equals("node")) {
+                // don't forward as nodeset most likely makes no sense here.
+                // should we log when we skip this parameter that was part of the original request ?
+                continue;
+            }
+            final String[] values = params.getParams(key);
+            if (values != null) {
+                createParams.put(key, Arrays.asList(values));
+            }
+        }
+
+        return new MirroredSolrRequest(1,
+                TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()));
+    }
+
+    public int getAttempt() {
+        return attempt;
+    }
+
+    public void setAttempt(final int attempt) {
+        this.attempt = attempt;
+    }
+
+    public SolrRequest getSolrRequest() {
+        return solrRequest;
+    }
+
+    public long getSubmitTimeNanos() {
+        return submitTimeNanos;
+    }
+
+    public void setSubmitTimeNanos(final long submitTimeNanos) {
+        this.submitTimeNanos = submitTimeNanos;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (!(o instanceof MirroredSolrRequest)) return false;
+
+        final MirroredSolrRequest that = (MirroredSolrRequest)o;
+
+        return Objects.equals(solrRequest, that.solrRequest);
+    }
+
+    @Override
+    public int hashCode() {
+        return solrRequest.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "MirroredSolrRequest{" +
+               "solrRequest=" + solrRequest +
+               ", attempt=" + attempt +
+               ", submitTimeNanos=" + submitTimeNanos +
+               '}';
+    }
+}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
new file mode 100644
index 0000000..592a36c
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc.common;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+public class MirroredSolrRequestSerializer implements Serializer<MirroredSolrRequest> {
+
+    private boolean isKey;
+    /**
+     * Configure this class.
+     *
+     * @param configs configs in key/value pairs
+     * @param isKey   whether is for key or value
+     */
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        this.isKey = isKey;
+    }
+
+    /**
+     * Convert {@code data} into a byte array.
+     *
+     * @param topic topic associated with data
+     * @param request  MirroredSolrRequest that needs to be serialized
+     * @return serialized bytes
+     */
+    @Override
+    public byte[] serialize(String topic, MirroredSolrRequest request) {
+        // TODO: add checks
+        SolrRequest solrRequest = request.getSolrRequest();
+        UpdateRequest updateRequest = (UpdateRequest)solrRequest;
+        JavaBinUpdateRequestCodec codec = new JavaBinUpdateRequestCodec();
+        ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream();
+        try {
+            codec.marshal(updateRequest, baos);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return baos.byteArray();
+    }
+
+    /**
+     * Close this serializer.
+     * <p>
+     * This method must be idempotent as it may be called multiple times.
+     */
+    @Override
+    public void close() {
+        Serializer.super.close();
+    }
+
+    private static final class ExposedByteArrayOutputStream extends ByteArrayOutputStream {
+        ExposedByteArrayOutputStream() {
+            super();
+        }
+
+        byte[] byteArray() {
+            return buf;
+        }
+    }
+}
diff --git a/crossdc-consumer/build.gradle b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/SolrExceptionUtil.java
similarity index 62%
copy from crossdc-consumer/build.gradle
copy to crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/SolrExceptionUtil.java
index 8c44542..47dced2 100644
--- a/crossdc-consumer/build.gradle
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/common/SolrExceptionUtil.java
@@ -14,17 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-plugins {
-    id 'java'
-}
-
-description = 'Cross-DC Consumer package'
+package org.apache.solr.crossdc.common;
 
-version '1.0-SNAPSHOT'
-
-repositories {
-    mavenCentral()
-}
+import org.apache.solr.common.SolrException;
 
-dependencies {
+public class SolrExceptionUtil {
+    public static SolrException asSolrException(final Exception e) {
+        SolrException solrException = null;
+        if (e.getCause() instanceof SolrException) {
+            solrException = (SolrException) e.getCause();
+        } else if (e instanceof SolrException) {
+            solrException = (SolrException) e;
+        }
+        return solrException;
+    }
 }
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
new file mode 100644
index 0000000..3bb62b3
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.crossdc.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.solr.crossdc.KafkaMirroringSink;
+import org.apache.solr.crossdc.MirroringException;
+import org.apache.solr.crossdc.common.IQueueHandler;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
+import org.eclipse.jetty.server.Connector;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+// Cross-DC Consumer main class
+public class Consumer {
+    private static boolean enabled = true;
+
+    /**
+     * ExecutorService to manage the cross-dc consumer threads.
+     */
+    private ExecutorService consumerThreadExecutor;
+
+    private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
+
+    private Server server;
+    CrossDcConsumer crossDcConsumer;
+
+    public void start(String[] args) {
+
+        // TODO: use args for config
+        server = new Server();
+        ServerConnector connector = new ServerConnector(server);
+        connector.setPort(8090);
+        server.setConnectors(new Connector[] {connector});
+        crossDcConsumer = getCrossDcConsumer();
+
+        // Start consumer thread
+        consumerThreadExecutor = Executors.newSingleThreadExecutor();
+        consumerThreadExecutor.submit(crossDcConsumer);
+
+        // Register shutdown hook
+        Thread shutdownHook = new Thread(() -> System.out.println("Shutting down consumers!"));
+        Runtime.getRuntime().addShutdownHook(shutdownHook);
+    }
+
+    private CrossDcConsumer getCrossDcConsumer() {
+        // nocommit - hardcoded conf
+        KafkaCrossDcConf conf = new KafkaCrossDcConf("test-topic", true, "localhost:2181");
+        return new KafkaCrossDcConsumer(conf);
+    }
+
+    public static void main(String[] args) {
+        Consumer consumer = new Consumer();
+        consumer.start(args);
+    }
+
+    /**
+     * Abstract class for defining cross-dc consumer
+     */
+    public abstract static class CrossDcConsumer implements Runnable {
+        SolrMessageProcessor messageProcessor;
+
+    }
+
+    public static class CrossDcConsumerFactory {
+        private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+        CrossDcConsumer getCrossDcConsumer(){
+            return null;
+        }
+
+    }
+
+    /**
+     * Class to run the consumer thread for Kafka. This also contains the implementation for retries and
+     * resubmitting to the queue in case of temporary failures.
+     */
+    public static class KafkaCrossDcConsumer extends CrossDcConsumer {
+        private static final Logger logger = LoggerFactory.getLogger(KafkaCrossDcConsumer.class);
+
+        private final KafkaConsumer<String, MirroredSolrRequest> consumer;
+        private final KafkaMirroringSink kafkaMirroringSink;
+
+        private final int KAFKA_CONSUMER_POLL_TIMEOUT_MS = 100;
+        SolrMessageProcessor messageProcessor;
+
+        /**
+         * @param conf The Kafka consumer configuration
+         */
+        public KafkaCrossDcConsumer(KafkaCrossDcConf conf) {
+            final Properties kafkaConsumerProp = new Properties();
+            logger.info("Creating Kafka consumer with configuration {}", kafkaConsumerProp);
+            consumer = createConsumer(kafkaConsumerProp);
+
+            // Create producer for resubmitting failed requests
+            logger.info("Creating Kafka resubmit producer");
+            this.kafkaMirroringSink = new KafkaMirroringSink(conf);
+            logger.info("Created Kafka resubmit producer");
+
+        }
+
+        private KafkaConsumer<String, MirroredSolrRequest> createConsumer(Properties properties) {
+            KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
+            return kafkaConsumer;
+        }
+
+        /**
+         * This is where the magic happens.
+         * 1. Polls and gets the packets from the queue
+         * 2. Extract the MirroredSolrRequest objects
+         * 3. Send the request to the MirroredSolrRequestHandler that has the processing, retry, error handling logic.
+         */
+        @Override
+        public void run() {
+            logger.info("About to start Kafka consumer thread ");
+            String topic="topic";
+
+            logger.info("Kafka consumer subscribing to topic topic={}", topic);
+            consumer.subscribe(Collections.singleton(topic));
+
+            while (pollAndProcessRequests()) {
+                //no-op within this loop: everything is done in pollAndProcessRequests method defined above.
+            }
+
+            logger.info("Closed kafka consumer. Exiting now.");
+            consumer.close();
+
+        }
+
+        /**
+         * Polls and processes the requests from Kafka. This method returns false when the consumer needs to be
+         * shutdown i.e. when there's a wakeup exception.
+         */
+        boolean pollAndProcessRequests() {
+            try {
+                ConsumerRecords<String, MirroredSolrRequest> records = consumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_TIMEOUT_MS));
+                for (TopicPartition partition : records.partitions()) {
+                    List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords = records.records(partition);
+                    try {
+                        for (ConsumerRecord<String, MirroredSolrRequest> record : partitionRecords) {
+                            logger.trace("Fetched record from topic={} partition={} key={} value={}",
+                                    record.topic(), record.partition(), record.key(), record.value());
+                            IQueueHandler.Result result = messageProcessor.handleItem(record.value());
+                            switch (result.status()) {
+                                case FAILED_RESUBMIT:
+                                    kafkaMirroringSink.submit(record.value());
+                                    break;
+                                case HANDLED:
+                                    // no-op
+                                    break;
+                                case NOT_HANDLED_SHUTDOWN:
+                                case FAILED_RETRY:
+                                    logger.error("Unexpected response while processing request. We never expect {}.",
+                                            result.status().toString());
+                                    break;
+                                default:
+                                    // no-op
+                            }
+                        }
+                        updateOffset(partition, partitionRecords);
+
+                        // handleItem sets the thread interrupt, let's exit if there has been an interrupt set
+                        if(Thread.currentThread().isInterrupted()) {
+                            logger.info("Kafka Consumer thread interrupted, shutting down Kafka consumer.");
+                            return false;
+                        }
+                    } catch (MirroringException e) {
+                        // We don't really know what to do here, so it's wiser to just break out.
+                        logger.error("Mirroring exception occured while resubmitting to Kafka. We are going to stop the consumer thread now.", e);
+                        return false;
+                    } catch (WakeupException e) {
+                        logger.info("Caught wakeup exception, shutting down KafkaSolrRequestConsumer.");
+                        return false;
+                    } catch (Exception e) {
+                        // If there is any exception returned by handleItem, then reset the offset.
+                        logger.warn("Exception occurred in Kafka consumer thread, but we will continue.", e);
+                        resetOffsetForPartition(partition, partitionRecords);
+                        break;
+                    }
+                }
+            } catch (WakeupException e) {
+                logger.info("Caught wakeup exception, shutting down KafkaSolrRequestConsumer");
+                return false;
+            } catch (Exception e) {
+                logger.error("Exception occurred in Kafka consumer thread, but we will continue.", e);
+            }
+            return true;
+        }
+
+        /**
+         * Reset the local offset so that the consumer reads the records from Kafka again.
+         * @param partition The TopicPartition to reset the offset for
+         * @param partitionRecords PartitionRecords for the specified partition
+         */
+        private void resetOffsetForPartition(TopicPartition partition, List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords) {
+            logger.debug("Resetting offset to: {}", partitionRecords.get(0).offset());
+            long resetOffset = partitionRecords.get(0).offset();
+            consumer.seek(partition, resetOffset);
+        }
+
+        /**
+         * Logs and updates the commit point for the partition that has been processed.
+         * @param partition The TopicPartition to update the offset for
+         * @param partitionRecords PartitionRecords for the specified partition
+         */
+        private void updateOffset(TopicPartition partition, List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords) {
+            long nextOffset = partitionRecords.get(partitionRecords.size() - 1).offset() + 1;
+            consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(nextOffset)));
+
+            logger.trace("Updated offset for topic={} partition={} to offset={}",
+                    partition.topic(), partition.partition(), nextOffset);
+        }
+
+        /**
+         * Shutdown the Kafka consumer by calling wakeup.
+         */
+        void shutdown() {
+            consumer.wakeup();
+        }
+
+
+    }
+}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/helpers/SendDummyUpdates.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/helpers/SendDummyUpdates.java
new file mode 100644
index 0000000..a066741
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/helpers/SendDummyUpdates.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc.helpers;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
+
+import java.util.Properties;
+
+public class SendDummyUpdates {
+    public static void main(String[] args) {
+        String TOPIC = "Trial";
+        Properties properties = new Properties();
+        properties.put("bootstrap.servers", "localhost:9092");
+        properties.put("acks", "all");
+        properties.put("retries", 0);
+        properties.put("batch.size", 16384);
+        properties.put("buffer.memory", 33554432);
+        properties.put("linger.ms", 1);
+        properties.put("key.serializer", StringSerializer.class.getName());
+        properties.put("value.serializer", MirroredSolrRequestSerializer.class.getName());
+        Producer<String, MirroredSolrRequest> producer = new KafkaProducer(properties);
+        UpdateRequest updateRequest = new UpdateRequest();
+        updateRequest.add("id", String.valueOf(System.currentTimeMillis()));
+        MirroredSolrRequest mirroredSolrRequest = new MirroredSolrRequest(updateRequest);
+        System.out.println("About to send producer record");
+        producer.send(new ProducerRecord(TOPIC, mirroredSolrRequest));
+        System.out.println("Sent producer record");
+        producer.close();
+        System.out.println("Closed producer");
+    }
+}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/MessageProcessor.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/MessageProcessor.java
new file mode 100644
index 0000000..210f4ca
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/MessageProcessor.java
@@ -0,0 +1,17 @@
+package org.apache.solr.crossdc.messageprocessor;
+
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.crossdc.ResubmitBackoffPolicy;
+
+public abstract class MessageProcessor {
+
+  private final ResubmitBackoffPolicy resubmitBackoffPolicy;
+
+  public MessageProcessor(ResubmitBackoffPolicy resubmitBackoffPolicy) {
+    this.resubmitBackoffPolicy = resubmitBackoffPolicy;
+  }
+
+  public ResubmitBackoffPolicy getResubmitBackoffPolicy() {
+    return resubmitBackoffPolicy;
+  }
+}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
new file mode 100644
index 0000000..650fc83
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.crossdc.messageprocessor;
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.SolrResponseBase;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.crossdc.ResubmitBackoffPolicy;
+import org.apache.solr.crossdc.common.CrossDcConstants;
+import org.apache.solr.crossdc.common.IQueueHandler;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.SolrExceptionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Message processor implements all the logic to process a MirroredSolrRequest.
+ * It handles:
+ *  1. Sending the update request to Solr
+ *  2. Discarding or retrying failed requests
+ *  3. Flagging requests for resubmission by the underlying consumer implementation.
+ */
+public class SolrMessageProcessor extends MessageProcessor implements IQueueHandler<MirroredSolrRequest>  {
+    private static final Logger logger = LoggerFactory.getLogger(SolrMessageProcessor.class);
+    final CloudSolrClient client;
+
+    private static final String VERSION_FIELD = "_version_";
+
+    public SolrMessageProcessor(CloudSolrClient client, ResubmitBackoffPolicy resubmitBackoffPolicy) {
+        super(resubmitBackoffPolicy);
+        this.client = client;
+    }
+
+    @Override
+    public Result<MirroredSolrRequest> handleItem(MirroredSolrRequest mirroredSolrRequest) {
+        connectToSolrIfNeeded();
+        preventCircularMirroring(mirroredSolrRequest);
+        return processMirroredRequest(mirroredSolrRequest);
+    }
+
+    private Result<MirroredSolrRequest> processMirroredRequest(MirroredSolrRequest request) {
+        final Result<MirroredSolrRequest> result = handleSolrRequest(request);
+        // Back-off before returning
+        backoffIfNeeded(result);
+        return result;
+    }
+
+    private Result<MirroredSolrRequest> handleSolrRequest(MirroredSolrRequest mirroredSolrRequest) {
+        logger.debug("Handling Solr request");
+        SolrRequest request = mirroredSolrRequest.getSolrRequest();
+        final SolrParams requestParams = request.getParams();
+
+        final String shouldMirror = requestParams.get("shouldMirror");
+        if (shouldMirror != null && !Boolean.parseBoolean(shouldMirror)) {
+            logger.warn("Skipping mirrored request because shouldMirror is set to false. request={}", request);
+            return new Result<>(ResultStatus.FAILED_NO_RETRY);
+        }
+        logFirstAttemptLatency(mirroredSolrRequest);
+
+        Result<MirroredSolrRequest> result;
+        try {
+            prepareIfUpdateRequest(request);
+            logRequest(request);
+            logger.debug("About to submit Solr request {}", request);
+            result = processMirroredSolrRequest(request);
+        } catch (Exception e) {
+            result = handleException(mirroredSolrRequest, e);
+        }
+
+        return result;
+    }
+
+    private Result<MirroredSolrRequest> handleException(MirroredSolrRequest mirroredSolrRequest, Exception e) {
+        final SolrException solrException = SolrExceptionUtil.asSolrException(e);
+        logIf4xxException(solrException);
+        if (!isRetryable(e)) {
+            logFailure(mirroredSolrRequest, e, solrException, false);
+            return new Result<>(ResultStatus.FAILED_NO_RETRY, e);
+        } else {
+            logFailure(mirroredSolrRequest, e, solrException, true);
+            mirroredSolrRequest.setAttempt(mirroredSolrRequest.getAttempt() + 1);
+            maybeBackoff(solrException);
+            return new Result<>(ResultStatus.FAILED_RESUBMIT, e, mirroredSolrRequest);
+        }
+    }
+
+    private void maybeBackoff(SolrException solrException) {
+        if (solrException == null) {
+            return;
+        }
+        long sleepTimeMs = 1000;
+        String backoffTimeSuggested = solrException.getMetadata("backoffTime-ms");
+        if (backoffTimeSuggested != null && !"0".equals(backoffTimeSuggested)) {
+            // If backoff policy is not configured (returns "0" by default), then sleep 1 second. If configured, do as it says.
+            sleepTimeMs = Math.max(1, Long.parseLong(backoffTimeSuggested));
+        }
+        logger.info("Consumer backoff. sleepTimeMs={}", sleepTimeMs);
+        uncheckedSleep(sleepTimeMs);
+    }
+
+    private boolean isRetryable(Exception e) {
+        SolrException se = SolrExceptionUtil.asSolrException(e);
+
+        if (se != null) {
+            int code = se.code();
+            if (code == SolrException.ErrorCode.CONFLICT.code) {
+                return false;
+            }
+        }
+        // Everything other than version conflict exceptions should be retried.
+        logger.warn("Unexpected exception, will resubmit the request to the queue", e);
+        return true;
+    }
+
+    private void logIf4xxException(SolrException solrException) {
+        // This shouldn't really happen but if it doesn, it most likely requires fixing in the return code from Solr.
+        if (solrException != null && 400 <= solrException.code() && solrException.code() < 500) {
+            logger.error("Exception occurred with 4xx response. {}", solrException.code(), solrException);
+        }
+    }
+
+    private void logFailure(MirroredSolrRequest mirroredSolrRequest, Exception e, SolrException solrException, boolean retryable) {
+        // This shouldn't really happen.
+        if (solrException != null && 400 <= solrException.code() && solrException.code() < 500) {
+            logger.error("Exception occurred with 4xx response. {}", solrException.code(), solrException);
+            return;
+        }
+
+        logger.warn("Resubmitting mirrored solr request after failure errorCode={} retryCount={}", solrException != null ? solrException.code() : -1, mirroredSolrRequest.getAttempt(), e);
+    }
+
+    /**
+     *
+     * Process the SolrRequest. If not, this method throws an exception.
+     */
+    private Result<MirroredSolrRequest> processMirroredSolrRequest(SolrRequest request) throws Exception {
+        Result<MirroredSolrRequest> result;
+        SolrResponseBase response = (SolrResponseBase) request.process(client);
+
+        int status = response.getStatus();
+        if (status != 0) {
+            throw new SolrException(SolrException.ErrorCode.getErrorCode(status), "response=" + response);
+        }
+
+        result = new Result<>(ResultStatus.HANDLED);
+        return result;
+    }
+
+    private void logRequest(SolrRequest request) {
+        if(request instanceof UpdateRequest) {
+            final StringBuilder rmsg = new StringBuilder(64);
+            rmsg.append("Submitting update request");
+            if(((UpdateRequest) request).getDeleteById() != null) {
+                rmsg.append(" numDeleteByIds=").append(((UpdateRequest) request).getDeleteById().size());
+            }
+            if(((UpdateRequest) request).getDocuments() != null) {
+                rmsg.append(" numUpdates=").append(((UpdateRequest) request).getDocuments().size());
+            }
+            if(((UpdateRequest) request).getDeleteQuery() != null) {
+                rmsg.append(" numDeleteByQuery=").append(((UpdateRequest) request).getDeleteQuery().size());
+            }
+            logger.info(rmsg.toString());
+        }
+    }
+
+    /**
+     * Clean up the Solr request to be submitted locally.
+     * @param request The SolrRequest to be cleaned up for submitting locally.
+     */
+    private void prepareIfUpdateRequest(SolrRequest request) {
+        if (request instanceof UpdateRequest) {
+            // Remove versions from add requests
+            UpdateRequest updateRequest = (UpdateRequest) request;
+
+            List<SolrInputDocument> documents = updateRequest.getDocuments();
+            if (documents != null) {
+                for (SolrInputDocument doc : documents) {
+                    sanitizeDocument(doc);
+                }
+            }
+            removeVersionFromDeleteByIds(updateRequest);
+        }
+    }
+
+    /**
+     * Strips fields that are problematic for replication.
+     */
+    private void sanitizeDocument(SolrInputDocument doc) {
+        logger.info("Removing {}", VERSION_FIELD + " : " + doc.getField(VERSION_FIELD).getValue());
+        doc.remove(VERSION_FIELD);
+    }
+
+    private void removeVersionFromDeleteByIds(UpdateRequest updateRequest) {
+        Map<String, Map<String, Object>> deleteIds = updateRequest.getDeleteByIdMap();
+        if (deleteIds != null) {
+            for (Map<String, Object> idParams : deleteIds.values()) {
+                if (idParams != null) {
+                    idParams.put(UpdateRequest.VER, null);
+                }
+            }
+        }
+    }
+
+    private void logFirstAttemptLatency(MirroredSolrRequest mirroredSolrRequest) {
+        // Only record the latency of the first attempt, essentially measuring the latency from submitting on the
+        // primary side until the request is eligible to be consumed on the buddy side (or vice versa).
+        if (mirroredSolrRequest.getAttempt() == 1) {
+            logger.debug("First attempt latency = {}",
+                    System.currentTimeMillis() - TimeUnit.NANOSECONDS.toMillis(mirroredSolrRequest.getSubmitTimeNanos()));
+        }
+    }
+
+    /**
+     * Adds {@link CrossDcConstants#SHOULD_MIRROR}=false to the params if it's not already specified.
+     * Logs a warning if it is specified and NOT set to false. (i.e. circular mirror may occur)
+     *
+     * @param mirroredSolrRequest MirroredSolrRequest object that is being processed.
+     */
+    private void preventCircularMirroring(MirroredSolrRequest mirroredSolrRequest) {
+        if (mirroredSolrRequest.getSolrRequest() instanceof UpdateRequest) {
+            UpdateRequest updateRequest = (UpdateRequest) mirroredSolrRequest.getSolrRequest();
+            ModifiableSolrParams params = updateRequest.getParams();
+            String shouldMirror = (params == null ? null : params.get(CrossDcConstants.SHOULD_MIRROR));
+            if (shouldMirror == null) {
+                logger.warn(CrossDcConstants.SHOULD_MIRROR + " param is missing - setting to false. Request={}", mirroredSolrRequest);
+                updateRequest.setParam(CrossDcConstants.SHOULD_MIRROR, "false");
+            } else if (!"false".equalsIgnoreCase(shouldMirror)) {
+                logger.warn(CrossDcConstants.SHOULD_MIRROR + " param equal to " + shouldMirror);
+            }
+        } else {
+            SolrParams params = mirroredSolrRequest.getSolrRequest().getParams();
+            String shouldMirror = (params == null ? null : params.get(CrossDcConstants.SHOULD_MIRROR));
+            if (shouldMirror == null) {
+                if (params instanceof ModifiableSolrParams) {
+                    logger.warn("{} {}", CrossDcConstants.SHOULD_MIRROR, "param is missing - setting to false");
+                    ((ModifiableSolrParams) params).set(CrossDcConstants.SHOULD_MIRROR, "false");
+                } else {
+                    logger.warn("{} {}", CrossDcConstants.SHOULD_MIRROR, "param is missing and params are not modifiable");
+                }
+            } else if (!"false".equalsIgnoreCase(shouldMirror)) {
+                logger.warn("{} {}", CrossDcConstants.SHOULD_MIRROR, "param is present and set to " + shouldMirror);
+            }
+        }
+    }
+
+    private void connectToSolrIfNeeded() {
+        // Don't try to consume anything if we can't connect to the solr server
+        boolean connected = false;
+        while (!connected) {
+            try {
+                client.connect(); // volatile null-check if already connected
+                connected = true;
+            } catch (Exception e) {
+                logger.error("Unable to connect to solr server. Not consuming.", e);
+                uncheckedSleep(5000);
+            }
+        }
+    }
+
+    public void uncheckedSleep(long millis) {
+        try {
+            Thread.sleep(millis);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void backoffIfNeeded(Result<MirroredSolrRequest> result) {
+        if (result.status().equals(ResultStatus.FAILED_RESUBMIT)) {
+            final long backoffMs = getResubmitBackoffPolicy().getBackoffTimeMs(result.newItem());
+            if (backoffMs > 0L) {
+                try {
+                    Thread.sleep(backoffMs);
+                } catch (final InterruptedException ex) {
+                    // we're about to exit the method anyway, so just log this and return the item. Let the caller
+                    // handle it.
+                    logger.warn("Thread interrupted while backing off before retry");
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+    }
+
+}
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java
new file mode 100644
index 0000000..34f44f9
--- /dev/null
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.crossdc;
+
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.crossdc.common.IQueueHandler;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.*;
+
+public class TestMessageProcessor {
+    static final String VERSION_FIELD = "_version_";
+
+    private static class NoOpResubmitBackoffPolicy implements ResubmitBackoffPolicy {
+        @Override
+        public long getBackoffTimeMs(MirroredSolrRequest resubmitRequest) {
+            return 0;
+        }
+    }
+
+    @Mock
+    private CloudSolrClient solrClient;
+    private SolrMessageProcessor processor;
+
+    private ResubmitBackoffPolicy backoffPolicy = spy(new NoOpResubmitBackoffPolicy());
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+
+        processor = Mockito.spy(new SolrMessageProcessor(solrClient,
+                backoffPolicy));
+        Mockito.doNothing().when(processor).uncheckedSleep(anyLong());
+    }
+
+    @Test
+    public void testDocumentSanitization() {
+        UpdateRequest request = spy(new UpdateRequest());
+
+        // Add docs with and without version
+        request.add(new SolrInputDocument() {
+            {
+                setField("id", 1);
+                setField(VERSION_FIELD, 1);
+            }
+        });
+        request.add(new SolrInputDocument() {
+            {
+                setField("id", 2);
+            }
+        });
+
+        // Delete by id with and without version
+        request.deleteById("1");
+        request.deleteById("2", 10L);
+
+        request.setParam("shouldMirror", "true");
+        // The response is irrelevant, but it will fail because mocked server returns null when processing
+        processor.handleItem(new MirroredSolrRequest(request));
+
+        // After processing, check that all version fields are stripped
+        for (SolrInputDocument doc : request.getDocuments()) {
+            assertNull("Doc still has version", doc.getField(VERSION_FIELD));
+        }
+
+        // Check versions in delete by id
+        for (Map<String, Object> idParams : request.getDeleteByIdMap().values()) {
+            if (idParams != null) {
+                idParams.put(UpdateRequest.VER, null);
+                assertNull("Delete still has version", idParams.get(UpdateRequest.VER));
+            }
+        }
+    }
+
+    @Test
+    public void testSuccessNoBackoff() throws Exception {
+        final UpdateRequest request = spy(new UpdateRequest());
+        when(solrClient.request(eq(request), anyString())).thenReturn(new NamedList<>());
+
+        processor.handleItem(new MirroredSolrRequest(request));
+
+        verify(backoffPolicy, times(0)).getBackoffTimeMs(any());
+    }
+
+    @Test
+    public void testClientErrorNoRetries() throws Exception {
+        final UpdateRequest request = new UpdateRequest();
+        request.setParam("shouldMirror", "true");
+        when(solrClient.request(eq(request), anyString())).thenThrow(
+                new SolrException(
+                        SolrException.ErrorCode.BAD_REQUEST, "err msg"));
+
+        IQueueHandler.Result<MirroredSolrRequest> result = processor.handleItem(new MirroredSolrRequest(request));
+        assertEquals(IQueueHandler.ResultStatus.FAILED_RESUBMIT, result.status());
+    }
+}
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 4d9ca16..8cf6eb5 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,5 +1,5 @@
 distributionBase=GRADLE_USER_HOME
 distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-6.7.1-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-6.8.3-all.zip
 zipStoreBase=GRADLE_USER_HOME
 zipStorePath=wrapper/dists
diff --git a/settings.gradle b/settings.gradle
index 825c91f..3e6558d 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -7,4 +7,6 @@
  * in the user manual at https://docs.gradle.org/6.7.1/userguide/multi_project_builds.html
  */
 
-rootProject.name = 'lucene-solr-sandbox'
+rootProject.name = 'solr-sandbox'
+
+include 'crossdc-consumer'
\ No newline at end of file
diff --git a/version.props b/version.props
new file mode 100644
index 0000000..e69de29