You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ma...@apache.org on 2022/08/17 19:49:39 UTC

[solr-sandbox] branch crossdc-wip updated: Config improvements and queue bug fixes. (#34)

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/crossdc-wip by this push:
     new 1b325fd  Config improvements and queue bug fixes. (#34)
1b325fd is described below

commit 1b325fdf55817284be5e9874e28f140d2360c9f5
Author: Mark Robert Miller <ma...@apache.org>
AuthorDate: Wed Aug 17 14:49:34 2022 -0500

    Config improvements and queue bug fixes. (#34)
---
 CROSSDC.md                                         |  8 ++
 .../solr/crossdc/common/KafkaCrossDcConf.java      | 61 +++++++++++++++-
 .../solr/crossdc/common/KafkaMirroringSink.java    |  9 ++-
 .../org/apache/solr/crossdc/consumer/Consumer.java | 85 +++++++++++++++++++---
 .../crossdc/consumer/KafkaCrossDcConsumer.java     |  5 ++
 .../MirroringUpdateRequestProcessorFactory.java    | 51 ++++++++++++-
 .../apache/solr/crossdc/DeleteByQueryToIdTest.java |  3 +-
 .../solr/crossdc/RetryQueueIntegrationTest.java    |  5 +-
 .../solr/crossdc/SolrAndKafkaIntegrationTest.java  |  5 +-
 .../solr/crossdc/SolrAndKafkaReindexTest.java      |  9 ++-
 .../solr/crossdc/ZkConfigIntegrationTest.java      |  3 +-
 11 files changed, 213 insertions(+), 31 deletions(-)

diff --git a/CROSSDC.md b/CROSSDC.md
index 228ef31..5c528f4 100644
--- a/CROSSDC.md
+++ b/CROSSDC.md
@@ -117,6 +117,14 @@ Additional configuration properties:
 
    *groupId* - the group id to give Kafka for the consumer, default to the empty string if not specified.
 
+The following additional configuration properties should either be specified for both the producer and the consumer or in the shared Zookeeper
+central config properties file. This is because the Consumer will use a Producer for retries.
+
+   *batchSizeBytes* - the maximum batch size in bytes for the queue
+   *bufferMemoryBytes* - the amount of memory in bytes allocated by the Producer in total for buffering 
+   *lingerMs* - the amount of time that the Producer will wait to add to a batch
+   *requestTimeout* - request timeout for the Producer - when used for the Consumers retry Producer, this should be less than the timeout that will cause the Consumer to be removed from the group for taking too long.
+
 #### Central Configuration Option
 
 You can optionally manage the configuration centrally in Solr's Zookeeper cluster by placing a properties file called *crossdc.properties* in the root Solr Zookeeper znode, eg, */solr/crossdc.properties*.  This allows you to update the configuration in a central location rather than at each solrconfig.xml in each Solr node and also automatically deals with new Solr nodes or Consumers to come up without requiring additional configuration.
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
index a273b43..c1d6149 100644
--- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
@@ -16,25 +16,49 @@
  */
 package org.apache.solr.crossdc.common;
 
+import org.apache.kafka.clients.producer.ProducerConfig;
+
 public class KafkaCrossDcConf extends CrossDcConf {
+
+    public static final String DEFAULT_BATCH_SIZE_BYTES = "512000";
+    public static final String DEFAULT_BUFFER_MEMORY_BYTES = "268435456";
+    public static final String DEFAULT_LINGER_MS = "30";
+    public static final String DEFAULT_REQUEST_TIMEOUT = "60000";
+
     private final String topicName;
 
     private final String groupId;
     private final boolean enableDataEncryption;
     private final String bootstrapServers;
-    private long slowSubmitThresholdInMillis;
+    private long slowSubmitThresholdInMillis = 1000;
     private int numOfRetries = 5;
     private final String solrZkConnectString;
 
     private final int maxPollRecords;
 
-    public KafkaCrossDcConf(String bootstrapServers, String topicName, String groupId, int maxPollRecords, boolean enableDataEncryption, String solrZkConnectString) {
+    private final int batchSizeBytes;
+    private final int bufferMemoryBytes;
+    private final int lingerMs;
+    private final int requestTimeout;
+
+  private final int fetchMinBytes;
+
+  private final int fetchMaxWaitMS;
+
+  public KafkaCrossDcConf(String bootstrapServers, String topicName, String groupId, int maxPollRecords, int batchSizeBytes, int bufferMemoryBytes, int lingerMs, int requestTimeout,
+      int fetchMinBytes, int fetchMaxWaitMS, boolean enableDataEncryption, String solrZkConnectString) {
         this.bootstrapServers = bootstrapServers;
         this.topicName = topicName;
         this.enableDataEncryption = enableDataEncryption;
         this.solrZkConnectString = solrZkConnectString;
         this.groupId = groupId;
         this.maxPollRecords = maxPollRecords;
+        this.batchSizeBytes = batchSizeBytes;
+        this.bufferMemoryBytes = bufferMemoryBytes;
+        this.lingerMs = lingerMs;
+        this.requestTimeout = requestTimeout;
+        this.fetchMinBytes = fetchMinBytes;
+        this.fetchMaxWaitMS = fetchMaxWaitMS;
     }
     public String getTopicName() {
         return topicName;
@@ -75,6 +99,30 @@ public class KafkaCrossDcConf extends CrossDcConf {
         return bootstrapServers;
     }
 
+    public int getBatchSizeBytes() {
+        return batchSizeBytes;
+    }
+
+    public int getBufferMemoryBytes() {
+        return bufferMemoryBytes;
+    }
+
+    public int getLingerMs() {
+        return lingerMs;
+    }
+
+    public int getRequestTimeout() {
+        return requestTimeout;
+    }
+
+    public int getFetchMinBytes() {
+      return fetchMinBytes;
+    }
+
+    public int getFetchMaxWaitMS() {
+      return fetchMaxWaitMS;
+    }
+
     @Override
     public String toString() {
         return String.format("KafkaCrossDcConf{" +
@@ -84,8 +132,15 @@ public class KafkaCrossDcConf extends CrossDcConf {
                 "bootstrapServers='%s', " +
                 "slowSubmitThresholdInMillis='%d', " +
                 "numOfRetries='%d', " +
+                "batchSizeBytes='%d', " +
+                "bufferMemoryBytes='%d', " +
+                "lingerMs='%d', " +
+                "requestTimeout='%d', " +
+                "fetchMinBytes='%d', " +
+                "fetchMaxWaitMS='%d', " +
                 "solrZkConnectString='%s'}",
                 topicName, groupId, enableDataEncryption, bootstrapServers,
-                slowSubmitThresholdInMillis, numOfRetries, solrZkConnectString);
+                slowSubmitThresholdInMillis, numOfRetries, batchSizeBytes,
+                bufferMemoryBytes, lingerMs, requestTimeout, fetchMinBytes, fetchMaxWaitMS, solrZkConnectString);
     }
 }
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
index 592d34d..787e6af 100644
--- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
@@ -18,6 +18,7 @@ package org.apache.solr.crossdc.common;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
@@ -61,7 +62,6 @@ public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
                         exception);
                 }
             });
-            producer.flush(); // TODO: remove
 
             lastSuccessfulEnqueueNanos = System.nanoTime();
             // Record time since last successful enqueue as 0
@@ -98,9 +98,10 @@ public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
 
         props.put("acks", "all");
         props.put("retries", 3);
-        props.put("batch.size", 15);
-        props.put("buffer.memory", 33554432);
-        props.put("linger.ms", 1);
+        props.put(ProducerConfig.BATCH_SIZE_CONFIG, conf.getBatchSizeBytes());
+        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, conf.getBufferMemoryBytes());
+        props.put(ProducerConfig.LINGER_MS_CONFIG, conf.getLingerMs());
+        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, conf.getRequestTimeout()); // should be less than time that causes consumer to be kicked out
 
         props.put("key.serializer", StringSerializer.class.getName());
         props.put("value.serializer", MirroredSolrRequestSerializer.class.getName());
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
index d6472fb..a19da19 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -33,6 +33,8 @@ import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import static org.apache.solr.crossdc.common.KafkaCrossDcConf.*;
+
 // Cross-DC Consumer main class
 public class Consumer {
     public static final String DEFAULT_PORT = "8090";
@@ -44,6 +46,9 @@ public class Consumer {
     private static final String MAX_POLL_RECORDS = "maxPollRecords";
     public static final String DEFAULT_MAX_POLL_RECORDS = "500";
 
+    private static final String DEFAULT_FETCH_MIN_BYTES = "512000";
+    private static final String DEFAULT_FETCH_MAX_WAIT_MS = "1000";
+
     private static boolean enabled = true;
 
     private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -55,10 +60,9 @@ public class Consumer {
 
     private Server server;
     CrossDcConsumer crossDcConsumer;
-    private String topicName;
-    private int maxPollRecords;
 
-    public void start(String bootstrapServers, String zkConnectString, String topicName, String groupId, int maxPollRecords, boolean enableDataEncryption, int port) {
+    public void start(String bootstrapServers, String zkConnectString, String topicName, String groupId, int maxPollRecords, int batchSizeBytes, int bufferMemoryBytes, int lingerMs, int requestTimeout,
+        int fetchMinBytes, int fetchMaxWaitMS, boolean enableDataEncryption, int port) {
         if (bootstrapServers == null) {
             throw new IllegalArgumentException("bootstrapServers config was not passed at startup");
         }
@@ -73,15 +77,37 @@ public class Consumer {
             maxPollRecords = Integer.parseInt(DEFAULT_MAX_POLL_RECORDS);
         }
 
-        this.topicName = topicName;
-        this.maxPollRecords = maxPollRecords;
+        if (batchSizeBytes == -1) {
+            batchSizeBytes = Integer.parseInt(DEFAULT_BATCH_SIZE_BYTES);
+        }
+
+        if (bufferMemoryBytes == -1) {
+            bufferMemoryBytes = Integer.parseInt(DEFAULT_BUFFER_MEMORY_BYTES);
+        }        
+        
+        if (lingerMs == -1) {
+            lingerMs = Integer.parseInt(DEFAULT_LINGER_MS);
+        }
+
+        if (requestTimeout == -1) {
+            requestTimeout = Integer.parseInt(DEFAULT_REQUEST_TIMEOUT);
+        }
+
+        if (fetchMinBytes == -1) {
+            fetchMinBytes = Integer.parseInt(DEFAULT_FETCH_MIN_BYTES);
+        }
+
+        if (fetchMaxWaitMS == -1) {
+            fetchMaxWaitMS = Integer.parseInt(DEFAULT_FETCH_MAX_WAIT_MS);
+        }
 
         //server = new Server();
         //ServerConnector connector = new ServerConnector(server);
         //connector.setPort(port);
         //server.setConnectors(new Connector[] {connector})
 
-        crossDcConsumer = getCrossDcConsumer(bootstrapServers, zkConnectString, topicName, groupId, maxPollRecords, enableDataEncryption);
+        crossDcConsumer = getCrossDcConsumer(bootstrapServers, zkConnectString, topicName, groupId, maxPollRecords, batchSizeBytes, bufferMemoryBytes, lingerMs,
+            requestTimeout, fetchMinBytes, fetchMaxWaitMS, enableDataEncryption);
 
         // Start consumer thread
 
@@ -96,9 +122,10 @@ public class Consumer {
     }
 
     private CrossDcConsumer getCrossDcConsumer(String bootstrapServers, String zkConnectString, String topicName, String groupId, int maxPollRecords,
-        boolean enableDataEncryption) {
+        int batchSizeBytes, int bufferMemoryBytes, int lingerMs, int requestTimeout, int fetchMinBytes, int fetchMaxWaitMS, boolean enableDataEncryption) {
 
-        KafkaCrossDcConf conf = new KafkaCrossDcConf(bootstrapServers, topicName, groupId, maxPollRecords, enableDataEncryption, zkConnectString);
+        KafkaCrossDcConf conf = new KafkaCrossDcConf(bootstrapServers, topicName, groupId, maxPollRecords, batchSizeBytes, bufferMemoryBytes, lingerMs,
+            requestTimeout, fetchMinBytes, fetchMaxWaitMS, enableDataEncryption, zkConnectString);
         return new KafkaCrossDcConsumer(conf);
     }
 
@@ -115,14 +142,21 @@ public class Consumer {
         String port = System.getProperty(PORT);
         String groupId = System.getProperty(GROUP_ID, "");
         String maxPollRecords = System.getProperty("maxPollRecords");
-
+        String batchSizeBytes = System.getProperty("batchSizeBytes");
+        String bufferMemoryBytes = System.getProperty("bufferMemoryBytes");
+        String lingerMs = System.getProperty("lingerMs");
+        String requestTimeout = System.getProperty("requestTimeout");
+        String fetchMinBytes = System.getProperty("fetchMinBytes");
+        String fetchMaxWaitMS = System.getProperty("fetchMaxWaitMS");
 
         try (SolrZkClient client = new SolrZkClient(zkConnectString, 15000)) {
 
             try {
                 if ((topicName == null || topicName.isBlank()) || (groupId == null || groupId.isBlank())
-                    || (bootstrapServers == null || bootstrapServers.isBlank()) || (port == null || port.isBlank()) || (maxPollRecords == null || maxPollRecords.isBlank()) && client
-                    .exists(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH, KafkaCrossDcConf.CROSSDC_PROPERTIES), true)) {
+                    || (bootstrapServers == null || bootstrapServers.isBlank()) || (port == null || port.isBlank()) || (maxPollRecords == null || maxPollRecords.isBlank())
+                    || (batchSizeBytes == null || batchSizeBytes.isBlank()) || (bufferMemoryBytes == null || bufferMemoryBytes.isBlank()) || (lingerMs == null || lingerMs.isBlank())
+                    || (requestTimeout == null || requestTimeout.isBlank())  || (fetchMinBytes == null || fetchMinBytes.isBlank())  || (fetchMaxWaitMS == null || fetchMaxWaitMS.isBlank())
+                    && client.exists(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH, KafkaCrossDcConf.CROSSDC_PROPERTIES), true)) {
                     byte[] data = client.getData(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH, KafkaCrossDcConf.CROSSDC_PROPERTIES), null, null, true);
                     Properties props = new Properties();
                     props.load(new ByteArrayInputStream(data));
@@ -132,6 +166,13 @@ public class Consumer {
                     port = getConfig(PORT, port, props);
                     groupId = getConfig(GROUP_ID, groupId, props);
                     maxPollRecords = getConfig(MAX_POLL_RECORDS, maxPollRecords, props);
+                    batchSizeBytes = getConfig("batchSizeBytes", batchSizeBytes, props);
+                    bufferMemoryBytes = getConfig("bufferMemoryBytes", bufferMemoryBytes, props);
+                    lingerMs = getConfig("lingerMs", lingerMs, props);
+                    requestTimeout = getConfig("requestTimeout", requestTimeout, props);
+                    fetchMinBytes = getConfig("fetchMinBytes", fetchMinBytes, props);
+                    fetchMaxWaitMS = getConfig("fetchMaxWaitMS", fetchMaxWaitMS, props);
+
                 }
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
@@ -159,9 +200,29 @@ public class Consumer {
         if (maxPollRecords == null || maxPollRecords.isBlank()) {
             maxPollRecords = DEFAULT_MAX_POLL_RECORDS;
         }
+        if (batchSizeBytes == null || batchSizeBytes.isBlank()) {
+            batchSizeBytes = DEFAULT_BATCH_SIZE_BYTES;
+        }
+        if (bufferMemoryBytes == null || bufferMemoryBytes.isBlank()) {
+            bufferMemoryBytes = DEFAULT_BUFFER_MEMORY_BYTES;
+        }
+        if (lingerMs == null || lingerMs.isBlank()) {
+            lingerMs = DEFAULT_LINGER_MS;
+        }
+        if (requestTimeout == null || requestTimeout.isBlank()) {
+            requestTimeout = DEFAULT_REQUEST_TIMEOUT;
+        }
+        if (fetchMinBytes == null || fetchMinBytes.isBlank()) {
+            fetchMinBytes = DEFAULT_FETCH_MIN_BYTES;
+        }
+        if (fetchMaxWaitMS == null || fetchMaxWaitMS.isBlank()) {
+            fetchMaxWaitMS = DEFAULT_FETCH_MAX_WAIT_MS;
+        }
 
         Consumer consumer = new Consumer();
-        consumer.start(bootstrapServers, zkConnectString, topicName, groupId, Integer.parseInt(maxPollRecords), false, Integer.parseInt(port));
+        consumer.start(bootstrapServers, zkConnectString, topicName, groupId, Integer.parseInt(maxPollRecords),
+            Integer.parseInt(batchSizeBytes), Integer.parseInt(bufferMemoryBytes), Integer.parseInt(lingerMs),
+            Integer.parseInt(requestTimeout), Integer.parseInt(fetchMinBytes), Integer.parseInt(fetchMaxWaitMS), false, Integer.parseInt(port));
     }
 
     private static String getConfig(String configName, String configValue, Properties props) {
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index 927a3df..9d7fcc9 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -55,6 +55,11 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
 
     kafkaConsumerProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
+    kafkaConsumerProp.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+    kafkaConsumerProp.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, conf.getFetchMinBytes());
+    kafkaConsumerProp.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, conf.getFetchMaxWaitMS());
+
     solrClient =
         new CloudSolrClient.Builder(Collections.singletonList(conf.getSolrZkConnectString()),
             Optional.empty()).build();
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
index 2757ee3..562b6b0 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -41,6 +41,8 @@ import java.util.Properties;
 import static org.apache.solr.update.processor.DistributedUpdateProcessor.*;
 import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
 
+import static org.apache.solr.crossdc.common.KafkaCrossDcConf.*;
+
 /**
  * An update processor that works with the {@link UpdateRequestProcessorFactory} to mirror update requests by
  * submitting them to a sink that implements a queue producer.
@@ -69,6 +71,11 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
     private String topicName;
     private String bootstrapServers;
 
+    private Integer batchSizeBytes;
+    private Integer bufferMemoryBytes;
+    private Integer lingerMs;
+    private Integer requestTimeout;
+
     private boolean enabled = true;
 
     @Override
@@ -82,6 +89,11 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
 
         topicName = args._getStr("topicName", null);
         bootstrapServers = args._getStr("bootstrapServers", null);
+
+        batchSizeBytes = (Integer) args.get("batchSizeBytes");
+        bufferMemoryBytes= (Integer) args.get("bufferMemoryBytes");;
+        lingerMs = (Integer) args.get("lingerMs");;
+        requestTimeout = (Integer) args.get("requestTimeout");;
     }
 
     private class Closer {
@@ -110,7 +122,9 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
         }
 
         try {
-            if (((topicName == null || topicName.isBlank()) || (bootstrapServers == null || bootstrapServers.isBlank())) && core.getCoreContainer().getZkController()
+            if (((topicName == null || topicName.isBlank()) || (bootstrapServers == null || bootstrapServers.isBlank()
+                || batchSizeBytes == null || bufferMemoryBytes == null
+                || lingerMs == null || requestTimeout == null)) && core.getCoreContainer().getZkController()
                 .getZkClient().exists(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH, KafkaCrossDcConf.CROSSDC_PROPERTIES), true)) {
                 byte[] data = core.getCoreContainer().getZkController().getZkClient().getData(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH, KafkaCrossDcConf.CROSSDC_PROPERTIES), null, null, true);
 
@@ -128,6 +142,18 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
                 if (bootstrapServers == null || bootstrapServers.isBlank()) {
                     bootstrapServers = props.getProperty("bootstrapServers");
                 }
+                if (batchSizeBytes == null) {
+                    batchSizeBytes = getIntegerPropValue("batchSizeBytes", props);
+                }
+                if (bufferMemoryBytes == null) {
+                    bufferMemoryBytes = getIntegerPropValue("bufferMemoryBytes", props);
+                }
+                if (lingerMs == null) {
+                    lingerMs = getIntegerPropValue("lingerMs", props);
+                }
+                if (requestTimeout == null) {
+                    requestTimeout = getIntegerPropValue("requestTimeout", props);
+                }
              }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
@@ -148,12 +174,25 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
             throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "topicName not specified for producer");
         }
 
+        if (batchSizeBytes == null) {
+            batchSizeBytes = Integer.valueOf(DEFAULT_BATCH_SIZE_BYTES);
+        }
+        if (bufferMemoryBytes == null) {
+            bufferMemoryBytes = Integer.valueOf(DEFAULT_BUFFER_MEMORY_BYTES);
+        }
+        if (lingerMs == null) {
+            lingerMs = Integer.valueOf(DEFAULT_LINGER_MS);
+        }
+        if (requestTimeout == null) {
+            requestTimeout = Integer.valueOf(DEFAULT_REQUEST_TIMEOUT);
+        }
+
         log.info("bootstrapServers={} topicName={}", bootstrapServers, topicName);
 
         // load the request mirroring sink class and instantiate.
        // mirroringHandler = core.getResourceLoader().newInstance(RequestMirroringHandler.class.getName(), KafkaRequestMirroringHandler.class);
 
-        KafkaCrossDcConf conf = new KafkaCrossDcConf(bootstrapServers, topicName, "",  -1, false,  null);
+        KafkaCrossDcConf conf = new KafkaCrossDcConf(bootstrapServers, topicName, "",  -1, batchSizeBytes, bufferMemoryBytes, lingerMs, requestTimeout,-1, -1,false,  null);
         KafkaMirroringSink sink = new KafkaMirroringSink(conf);
 
         Closer closer = new Closer(sink);
@@ -170,6 +209,14 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
         mirroringHandler = new KafkaRequestMirroringHandler(sink);
     }
 
+    private Integer getIntegerPropValue(String name, Properties props) {
+        String value = props.getProperty(name);
+        if (value == null) {
+            return null;
+        }
+        return Integer.parseInt(value);
+    }
+
     @Override
     public UpdateRequestProcessor getInstance(final SolrQueryRequest req, final SolrQueryResponse rsp,
                                                 final UpdateRequestProcessor next) {
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
index 9a5ba80..7817efd 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
@@ -104,7 +104,8 @@ import java.util.Properties;
     String bootstrapServers = kafkaCluster.bootstrapServers();
     log.info("bootstrapServers={}", bootstrapServers);
 
-    consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1,false, 0);
+    consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1,-1,-1,-1,
+        -1,-1, -1, false, 0);
 
   }
 
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
index ec727ea..1a12a87 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
@@ -106,7 +106,8 @@ import java.util.Properties;
     String bootstrapServers = kafkaCluster.bootstrapServers();
     log.info("bootstrapServers={}", bootstrapServers);
 
-    consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC,  "group1", -1,false, 0);
+    consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC,  "group1", -1,-1,-1,
+        -1,-1,-1, -1, false, 0);
   }
 
   private static MiniSolrCloudCluster startCluster(MiniSolrCloudCluster solrCluster, ZkTestServer zkTestServer, Path baseDir) throws Exception {
@@ -200,7 +201,7 @@ import java.util.Properties;
 
     QueryResponse results = null;
     boolean foundUpdates = false;
-    for (int i = 0; i < 50; i++) {
+    for (int i = 0; i < 200; i++) {
       solrCluster2.getSolrClient().commit(COLLECTION);
       solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
       results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
index fb0bb48..268be5c 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
@@ -99,7 +99,8 @@ import static org.mockito.Mockito.spy;
     String bootstrapServers = kafkaCluster.bootstrapServers();
     log.info("bootstrapServers={}", bootstrapServers);
 
-    consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1, false, 0);
+    consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1, -1,
+        -1,-1,-1,-1, -1,false, 0);
 
   }
 
@@ -148,7 +149,7 @@ import static org.mockito.Mockito.spy;
 
     QueryResponse results = null;
     boolean foundUpdates = false;
-    for (int i = 0; i < 50; i++) {
+    for (int i = 0; i < 100; i++) {
       solrCluster2.getSolrClient().commit(COLLECTION);
       solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
       results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
index cd2c0f2..9ac0071 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
@@ -91,7 +91,8 @@ import java.util.Properties;
     String bootstrapServers = kafkaCluster.bootstrapServers();
     log.info("bootstrapServers={}", bootstrapServers);
 
-    consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1,false, 0);
+    consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1, -1,-1,
+        -1,-1, -1, -1, false, 0);
 
   }
 
@@ -135,7 +136,7 @@ import java.util.Properties;
 
     QueryResponse results = null;
     boolean foundUpdates = false;
-    for (int i = 0; i < 50; i++) {
+    for (int i = 0; i < 100; i++) {
       solrCluster2.getSolrClient().commit(COLLECTION);
       solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
       results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
@@ -157,7 +158,7 @@ import java.util.Properties;
     addDocs(client, "second");
 
     foundUpdates = false;
-    for (int i = 0; i < 50; i++) {
+    for (int i = 0; i < 100; i++) {
       solrCluster2.getSolrClient().commit(COLLECTION);
       solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
       results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
@@ -182,7 +183,7 @@ import java.util.Properties;
     addDocs(client, "third");
 
     foundUpdates = false;
-    for (int i = 0; i < 50; i++) {
+    for (int i = 0; i < 100; i++) {
       solrCluster2.getSolrClient().commit(COLLECTION);
       solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
       results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
index 8d4cd13..94ad5c5 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
@@ -108,7 +108,8 @@ import java.util.Properties;
     String bootstrapServers = kafkaCluster.bootstrapServers();
     log.info("bootstrapServers={}", bootstrapServers);
 
-    consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1, false, 0);
+    consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1, -1,-1,
+        -1,-1,-1, -1, false, 0);
 
   }