You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ma...@apache.org on 2022/08/17 19:49:39 UTC
[solr-sandbox] branch crossdc-wip updated: Config improvements and queue bug fixes. (#34)
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/crossdc-wip by this push:
new 1b325fd Config improvements and queue bug fixes. (#34)
1b325fd is described below
commit 1b325fdf55817284be5e9874e28f140d2360c9f5
Author: Mark Robert Miller <ma...@apache.org>
AuthorDate: Wed Aug 17 14:49:34 2022 -0500
Config improvements and queue bug fixes. (#34)
---
CROSSDC.md | 8 ++
.../solr/crossdc/common/KafkaCrossDcConf.java | 61 +++++++++++++++-
.../solr/crossdc/common/KafkaMirroringSink.java | 9 ++-
.../org/apache/solr/crossdc/consumer/Consumer.java | 85 +++++++++++++++++++---
.../crossdc/consumer/KafkaCrossDcConsumer.java | 5 ++
.../MirroringUpdateRequestProcessorFactory.java | 51 ++++++++++++-
.../apache/solr/crossdc/DeleteByQueryToIdTest.java | 3 +-
.../solr/crossdc/RetryQueueIntegrationTest.java | 5 +-
.../solr/crossdc/SolrAndKafkaIntegrationTest.java | 5 +-
.../solr/crossdc/SolrAndKafkaReindexTest.java | 9 ++-
.../solr/crossdc/ZkConfigIntegrationTest.java | 3 +-
11 files changed, 213 insertions(+), 31 deletions(-)
diff --git a/CROSSDC.md b/CROSSDC.md
index 228ef31..5c528f4 100644
--- a/CROSSDC.md
+++ b/CROSSDC.md
@@ -117,6 +117,14 @@ Additional configuration properties:
*groupId* - the group id to give Kafka for the consumer, default to the empty string if not specified.
+The following additional configuration properties should either be specified for both the producer and the consumer or in the shared Zookeeper
+central config properties file. This is because the Consumer will use a Producer for retries.
+
+ *batchSizeBytes* - the maximum batch size in bytes for the queue
+ *bufferMemoryBytes* - the amount of memory in bytes allocated by the Producer in total for buffering
+ *lingerMs* - the amount of time that the Producer will wait to add to a batch
+ *requestTimeout* - request timeout for the Producer - when used for the Consumers retry Producer, this should be less than the timeout that will cause the Consumer to be removed from the group for taking too long.
+
#### Central Configuration Option
You can optionally manage the configuration centrally in Solr's Zookeeper cluster by placing a properties file called *crossdc.properties* in the root Solr Zookeeper znode, eg, */solr/crossdc.properties*. This allows you to update the configuration in a central location rather than at each solrconfig.xml in each Solr node and also automatically deals with new Solr nodes or Consumers to come up without requiring additional configuration.
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
index a273b43..c1d6149 100644
--- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
@@ -16,25 +16,49 @@
*/
package org.apache.solr.crossdc.common;
+import org.apache.kafka.clients.producer.ProducerConfig;
+
public class KafkaCrossDcConf extends CrossDcConf {
+
+ public static final String DEFAULT_BATCH_SIZE_BYTES = "512000";
+ public static final String DEFAULT_BUFFER_MEMORY_BYTES = "268435456";
+ public static final String DEFAULT_LINGER_MS = "30";
+ public static final String DEFAULT_REQUEST_TIMEOUT = "60000";
+
private final String topicName;
private final String groupId;
private final boolean enableDataEncryption;
private final String bootstrapServers;
- private long slowSubmitThresholdInMillis;
+ private long slowSubmitThresholdInMillis = 1000;
private int numOfRetries = 5;
private final String solrZkConnectString;
private final int maxPollRecords;
- public KafkaCrossDcConf(String bootstrapServers, String topicName, String groupId, int maxPollRecords, boolean enableDataEncryption, String solrZkConnectString) {
+ private final int batchSizeBytes;
+ private final int bufferMemoryBytes;
+ private final int lingerMs;
+ private final int requestTimeout;
+
+ private final int fetchMinBytes;
+
+ private final int fetchMaxWaitMS;
+
+ public KafkaCrossDcConf(String bootstrapServers, String topicName, String groupId, int maxPollRecords, int batchSizeBytes, int bufferMemoryBytes, int lingerMs, int requestTimeout,
+ int fetchMinBytes, int fetchMaxWaitMS, boolean enableDataEncryption, String solrZkConnectString) {
this.bootstrapServers = bootstrapServers;
this.topicName = topicName;
this.enableDataEncryption = enableDataEncryption;
this.solrZkConnectString = solrZkConnectString;
this.groupId = groupId;
this.maxPollRecords = maxPollRecords;
+ this.batchSizeBytes = batchSizeBytes;
+ this.bufferMemoryBytes = bufferMemoryBytes;
+ this.lingerMs = lingerMs;
+ this.requestTimeout = requestTimeout;
+ this.fetchMinBytes = fetchMinBytes;
+ this.fetchMaxWaitMS = fetchMaxWaitMS;
}
public String getTopicName() {
return topicName;
@@ -75,6 +99,30 @@ public class KafkaCrossDcConf extends CrossDcConf {
return bootstrapServers;
}
+ public int getBatchSizeBytes() {
+ return batchSizeBytes;
+ }
+
+ public int getBufferMemoryBytes() {
+ return bufferMemoryBytes;
+ }
+
+ public int getLingerMs() {
+ return lingerMs;
+ }
+
+ public int getRequestTimeout() {
+ return requestTimeout;
+ }
+
+ public int getFetchMinBytes() {
+ return fetchMinBytes;
+ }
+
+ public int getFetchMaxWaitMS() {
+ return fetchMaxWaitMS;
+ }
+
@Override
public String toString() {
return String.format("KafkaCrossDcConf{" +
@@ -84,8 +132,15 @@ public class KafkaCrossDcConf extends CrossDcConf {
"bootstrapServers='%s', " +
"slowSubmitThresholdInMillis='%d', " +
"numOfRetries='%d', " +
+ "batchSizeBytes='%d', " +
+ "bufferMemoryBytes='%d', " +
+ "lingerMs='%d', " +
+ "requestTimeout='%d', " +
+ "fetchMinBytes='%d', " +
+ "fetchMaxWaitMS='%d', " +
"solrZkConnectString='%s'}",
topicName, groupId, enableDataEncryption, bootstrapServers,
- slowSubmitThresholdInMillis, numOfRetries, solrZkConnectString);
+ slowSubmitThresholdInMillis, numOfRetries, batchSizeBytes,
+ bufferMemoryBytes, lingerMs, requestTimeout, fetchMinBytes, fetchMaxWaitMS, solrZkConnectString);
}
}
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
index 592d34d..787e6af 100644
--- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
@@ -18,6 +18,7 @@ package org.apache.solr.crossdc.common;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
@@ -61,7 +62,6 @@ public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
exception);
}
});
- producer.flush(); // TODO: remove
lastSuccessfulEnqueueNanos = System.nanoTime();
// Record time since last successful enqueue as 0
@@ -98,9 +98,10 @@ public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
props.put("acks", "all");
props.put("retries", 3);
- props.put("batch.size", 15);
- props.put("buffer.memory", 33554432);
- props.put("linger.ms", 1);
+ props.put(ProducerConfig.BATCH_SIZE_CONFIG, conf.getBatchSizeBytes());
+ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, conf.getBufferMemoryBytes());
+ props.put(ProducerConfig.LINGER_MS_CONFIG, conf.getLingerMs());
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, conf.getRequestTimeout()); // should be less than time that causes consumer to be kicked out
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", MirroredSolrRequestSerializer.class.getName());
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
index d6472fb..a19da19 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -33,6 +33,8 @@ import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import static org.apache.solr.crossdc.common.KafkaCrossDcConf.*;
+
// Cross-DC Consumer main class
public class Consumer {
public static final String DEFAULT_PORT = "8090";
@@ -44,6 +46,9 @@ public class Consumer {
private static final String MAX_POLL_RECORDS = "maxPollRecords";
public static final String DEFAULT_MAX_POLL_RECORDS = "500";
+ private static final String DEFAULT_FETCH_MIN_BYTES = "512000";
+ private static final String DEFAULT_FETCH_MAX_WAIT_MS = "1000";
+
private static boolean enabled = true;
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -55,10 +60,9 @@ public class Consumer {
private Server server;
CrossDcConsumer crossDcConsumer;
- private String topicName;
- private int maxPollRecords;
- public void start(String bootstrapServers, String zkConnectString, String topicName, String groupId, int maxPollRecords, boolean enableDataEncryption, int port) {
+ public void start(String bootstrapServers, String zkConnectString, String topicName, String groupId, int maxPollRecords, int batchSizeBytes, int bufferMemoryBytes, int lingerMs, int requestTimeout,
+ int fetchMinBytes, int fetchMaxWaitMS, boolean enableDataEncryption, int port) {
if (bootstrapServers == null) {
throw new IllegalArgumentException("bootstrapServers config was not passed at startup");
}
@@ -73,15 +77,37 @@ public class Consumer {
maxPollRecords = Integer.parseInt(DEFAULT_MAX_POLL_RECORDS);
}
- this.topicName = topicName;
- this.maxPollRecords = maxPollRecords;
+ if (batchSizeBytes == -1) {
+ batchSizeBytes = Integer.parseInt(DEFAULT_BATCH_SIZE_BYTES);
+ }
+
+ if (bufferMemoryBytes == -1) {
+ bufferMemoryBytes = Integer.parseInt(DEFAULT_BUFFER_MEMORY_BYTES);
+ }
+
+ if (lingerMs == -1) {
+ lingerMs = Integer.parseInt(DEFAULT_LINGER_MS);
+ }
+
+ if (requestTimeout == -1) {
+ requestTimeout = Integer.parseInt(DEFAULT_REQUEST_TIMEOUT);
+ }
+
+ if (fetchMinBytes == -1) {
+ fetchMinBytes = Integer.parseInt(DEFAULT_FETCH_MIN_BYTES);
+ }
+
+ if (fetchMaxWaitMS == -1) {
+ fetchMaxWaitMS = Integer.parseInt(DEFAULT_FETCH_MAX_WAIT_MS);
+ }
//server = new Server();
//ServerConnector connector = new ServerConnector(server);
//connector.setPort(port);
//server.setConnectors(new Connector[] {connector})
- crossDcConsumer = getCrossDcConsumer(bootstrapServers, zkConnectString, topicName, groupId, maxPollRecords, enableDataEncryption);
+ crossDcConsumer = getCrossDcConsumer(bootstrapServers, zkConnectString, topicName, groupId, maxPollRecords, batchSizeBytes, bufferMemoryBytes, lingerMs,
+ requestTimeout, fetchMinBytes, fetchMaxWaitMS, enableDataEncryption);
// Start consumer thread
@@ -96,9 +122,10 @@ public class Consumer {
}
private CrossDcConsumer getCrossDcConsumer(String bootstrapServers, String zkConnectString, String topicName, String groupId, int maxPollRecords,
- boolean enableDataEncryption) {
+ int batchSizeBytes, int bufferMemoryBytes, int lingerMs, int requestTimeout, int fetchMinBytes, int fetchMaxWaitMS, boolean enableDataEncryption) {
- KafkaCrossDcConf conf = new KafkaCrossDcConf(bootstrapServers, topicName, groupId, maxPollRecords, enableDataEncryption, zkConnectString);
+ KafkaCrossDcConf conf = new KafkaCrossDcConf(bootstrapServers, topicName, groupId, maxPollRecords, batchSizeBytes, bufferMemoryBytes, lingerMs,
+ requestTimeout, fetchMinBytes, fetchMaxWaitMS, enableDataEncryption, zkConnectString);
return new KafkaCrossDcConsumer(conf);
}
@@ -115,14 +142,21 @@ public class Consumer {
String port = System.getProperty(PORT);
String groupId = System.getProperty(GROUP_ID, "");
String maxPollRecords = System.getProperty("maxPollRecords");
-
+ String batchSizeBytes = System.getProperty("batchSizeBytes");
+ String bufferMemoryBytes = System.getProperty("bufferMemoryBytes");
+ String lingerMs = System.getProperty("lingerMs");
+ String requestTimeout = System.getProperty("requestTimeout");
+ String fetchMinBytes = System.getProperty("fetchMinBytes");
+ String fetchMaxWaitMS = System.getProperty("fetchMaxWaitMS");
try (SolrZkClient client = new SolrZkClient(zkConnectString, 15000)) {
try {
if ((topicName == null || topicName.isBlank()) || (groupId == null || groupId.isBlank())
- || (bootstrapServers == null || bootstrapServers.isBlank()) || (port == null || port.isBlank()) || (maxPollRecords == null || maxPollRecords.isBlank()) && client
- .exists(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH, KafkaCrossDcConf.CROSSDC_PROPERTIES), true)) {
+ || (bootstrapServers == null || bootstrapServers.isBlank()) || (port == null || port.isBlank()) || (maxPollRecords == null || maxPollRecords.isBlank())
+ || (batchSizeBytes == null || batchSizeBytes.isBlank()) || (bufferMemoryBytes == null || bufferMemoryBytes.isBlank()) || (lingerMs == null || lingerMs.isBlank())
+ || (requestTimeout == null || requestTimeout.isBlank()) || (fetchMinBytes == null || fetchMinBytes.isBlank()) || (fetchMaxWaitMS == null || fetchMaxWaitMS.isBlank())
+ && client.exists(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH, KafkaCrossDcConf.CROSSDC_PROPERTIES), true)) {
byte[] data = client.getData(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH, KafkaCrossDcConf.CROSSDC_PROPERTIES), null, null, true);
Properties props = new Properties();
props.load(new ByteArrayInputStream(data));
@@ -132,6 +166,13 @@ public class Consumer {
port = getConfig(PORT, port, props);
groupId = getConfig(GROUP_ID, groupId, props);
maxPollRecords = getConfig(MAX_POLL_RECORDS, maxPollRecords, props);
+ batchSizeBytes = getConfig("batchSizeBytes", batchSizeBytes, props);
+ bufferMemoryBytes = getConfig("bufferMemoryBytes", bufferMemoryBytes, props);
+ lingerMs = getConfig("lingerMs", lingerMs, props);
+ requestTimeout = getConfig("requestTimeout", requestTimeout, props);
+ fetchMinBytes = getConfig("fetchMinBytes", fetchMinBytes, props);
+ fetchMaxWaitMS = getConfig("fetchMaxWaitMS", fetchMaxWaitMS, props);
+
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -159,9 +200,29 @@ public class Consumer {
if (maxPollRecords == null || maxPollRecords.isBlank()) {
maxPollRecords = DEFAULT_MAX_POLL_RECORDS;
}
+ if (batchSizeBytes == null || batchSizeBytes.isBlank()) {
+ batchSizeBytes = DEFAULT_BATCH_SIZE_BYTES;
+ }
+ if (bufferMemoryBytes == null || bufferMemoryBytes.isBlank()) {
+ bufferMemoryBytes = DEFAULT_BUFFER_MEMORY_BYTES;
+ }
+ if (lingerMs == null || lingerMs.isBlank()) {
+ lingerMs = DEFAULT_LINGER_MS;
+ }
+ if (requestTimeout == null || requestTimeout.isBlank()) {
+ requestTimeout = DEFAULT_REQUEST_TIMEOUT;
+ }
+ if (fetchMinBytes == null || fetchMinBytes.isBlank()) {
+ fetchMinBytes = DEFAULT_FETCH_MIN_BYTES;
+ }
+ if (fetchMaxWaitMS == null || fetchMaxWaitMS.isBlank()) {
+ fetchMaxWaitMS = DEFAULT_FETCH_MAX_WAIT_MS;
+ }
Consumer consumer = new Consumer();
- consumer.start(bootstrapServers, zkConnectString, topicName, groupId, Integer.parseInt(maxPollRecords), false, Integer.parseInt(port));
+ consumer.start(bootstrapServers, zkConnectString, topicName, groupId, Integer.parseInt(maxPollRecords),
+ Integer.parseInt(batchSizeBytes), Integer.parseInt(bufferMemoryBytes), Integer.parseInt(lingerMs),
+ Integer.parseInt(requestTimeout), Integer.parseInt(fetchMinBytes), Integer.parseInt(fetchMaxWaitMS), false, Integer.parseInt(port));
}
private static String getConfig(String configName, String configValue, Properties props) {
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index 927a3df..9d7fcc9 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -55,6 +55,11 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
kafkaConsumerProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ kafkaConsumerProp.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+ kafkaConsumerProp.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, conf.getFetchMinBytes());
+ kafkaConsumerProp.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, conf.getFetchMaxWaitMS());
+
solrClient =
new CloudSolrClient.Builder(Collections.singletonList(conf.getSolrZkConnectString()),
Optional.empty()).build();
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
index 2757ee3..562b6b0 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -41,6 +41,8 @@ import java.util.Properties;
import static org.apache.solr.update.processor.DistributedUpdateProcessor.*;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
+import static org.apache.solr.crossdc.common.KafkaCrossDcConf.*;
+
/**
* An update processor that works with the {@link UpdateRequestProcessorFactory} to mirror update requests by
* submitting them to a sink that implements a queue producer.
@@ -69,6 +71,11 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
private String topicName;
private String bootstrapServers;
+ private Integer batchSizeBytes;
+ private Integer bufferMemoryBytes;
+ private Integer lingerMs;
+ private Integer requestTimeout;
+
private boolean enabled = true;
@Override
@@ -82,6 +89,11 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
topicName = args._getStr("topicName", null);
bootstrapServers = args._getStr("bootstrapServers", null);
+
+ batchSizeBytes = (Integer) args.get("batchSizeBytes");
+ bufferMemoryBytes= (Integer) args.get("bufferMemoryBytes");;
+ lingerMs = (Integer) args.get("lingerMs");;
+ requestTimeout = (Integer) args.get("requestTimeout");;
}
private class Closer {
@@ -110,7 +122,9 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
}
try {
- if (((topicName == null || topicName.isBlank()) || (bootstrapServers == null || bootstrapServers.isBlank())) && core.getCoreContainer().getZkController()
+ if (((topicName == null || topicName.isBlank()) || (bootstrapServers == null || bootstrapServers.isBlank()
+ || batchSizeBytes == null || bufferMemoryBytes == null
+ || lingerMs == null || requestTimeout == null)) && core.getCoreContainer().getZkController()
.getZkClient().exists(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH, KafkaCrossDcConf.CROSSDC_PROPERTIES), true)) {
byte[] data = core.getCoreContainer().getZkController().getZkClient().getData(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH, KafkaCrossDcConf.CROSSDC_PROPERTIES), null, null, true);
@@ -128,6 +142,18 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
if (bootstrapServers == null || bootstrapServers.isBlank()) {
bootstrapServers = props.getProperty("bootstrapServers");
}
+ if (batchSizeBytes == null) {
+ batchSizeBytes = getIntegerPropValue("batchSizeBytes", props);
+ }
+ if (bufferMemoryBytes == null) {
+ bufferMemoryBytes = getIntegerPropValue("bufferMemoryBytes", props);
+ }
+ if (lingerMs == null) {
+ lingerMs = getIntegerPropValue("lingerMs", props);
+ }
+ if (requestTimeout == null) {
+ requestTimeout = getIntegerPropValue("requestTimeout", props);
+ }
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -148,12 +174,25 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "topicName not specified for producer");
}
+ if (batchSizeBytes == null) {
+ batchSizeBytes = Integer.valueOf(DEFAULT_BATCH_SIZE_BYTES);
+ }
+ if (bufferMemoryBytes == null) {
+ bufferMemoryBytes = Integer.valueOf(DEFAULT_BUFFER_MEMORY_BYTES);
+ }
+ if (lingerMs == null) {
+ lingerMs = Integer.valueOf(DEFAULT_LINGER_MS);
+ }
+ if (requestTimeout == null) {
+ requestTimeout = Integer.valueOf(DEFAULT_REQUEST_TIMEOUT);
+ }
+
log.info("bootstrapServers={} topicName={}", bootstrapServers, topicName);
// load the request mirroring sink class and instantiate.
// mirroringHandler = core.getResourceLoader().newInstance(RequestMirroringHandler.class.getName(), KafkaRequestMirroringHandler.class);
- KafkaCrossDcConf conf = new KafkaCrossDcConf(bootstrapServers, topicName, "", -1, false, null);
+ KafkaCrossDcConf conf = new KafkaCrossDcConf(bootstrapServers, topicName, "", -1, batchSizeBytes, bufferMemoryBytes, lingerMs, requestTimeout,-1, -1,false, null);
KafkaMirroringSink sink = new KafkaMirroringSink(conf);
Closer closer = new Closer(sink);
@@ -170,6 +209,14 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
mirroringHandler = new KafkaRequestMirroringHandler(sink);
}
+ private Integer getIntegerPropValue(String name, Properties props) {
+ String value = props.getProperty(name);
+ if (value == null) {
+ return null;
+ }
+ return Integer.parseInt(value);
+ }
+
@Override
public UpdateRequestProcessor getInstance(final SolrQueryRequest req, final SolrQueryResponse rsp,
final UpdateRequestProcessor next) {
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
index 9a5ba80..7817efd 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
@@ -104,7 +104,8 @@ import java.util.Properties;
String bootstrapServers = kafkaCluster.bootstrapServers();
log.info("bootstrapServers={}", bootstrapServers);
- consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1,false, 0);
+ consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1,-1,-1,-1,
+ -1,-1, -1, false, 0);
}
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
index ec727ea..1a12a87 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
@@ -106,7 +106,8 @@ import java.util.Properties;
String bootstrapServers = kafkaCluster.bootstrapServers();
log.info("bootstrapServers={}", bootstrapServers);
- consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1,false, 0);
+ consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1,-1,-1,
+ -1,-1,-1, -1, false, 0);
}
private static MiniSolrCloudCluster startCluster(MiniSolrCloudCluster solrCluster, ZkTestServer zkTestServer, Path baseDir) throws Exception {
@@ -200,7 +201,7 @@ import java.util.Properties;
QueryResponse results = null;
boolean foundUpdates = false;
- for (int i = 0; i < 50; i++) {
+ for (int i = 0; i < 200; i++) {
solrCluster2.getSolrClient().commit(COLLECTION);
solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
index fb0bb48..268be5c 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
@@ -99,7 +99,8 @@ import static org.mockito.Mockito.spy;
String bootstrapServers = kafkaCluster.bootstrapServers();
log.info("bootstrapServers={}", bootstrapServers);
- consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1, false, 0);
+ consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1, -1,
+ -1,-1,-1,-1, -1,false, 0);
}
@@ -148,7 +149,7 @@ import static org.mockito.Mockito.spy;
QueryResponse results = null;
boolean foundUpdates = false;
- for (int i = 0; i < 50; i++) {
+ for (int i = 0; i < 100; i++) {
solrCluster2.getSolrClient().commit(COLLECTION);
solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
index cd2c0f2..9ac0071 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
@@ -91,7 +91,8 @@ import java.util.Properties;
String bootstrapServers = kafkaCluster.bootstrapServers();
log.info("bootstrapServers={}", bootstrapServers);
- consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1,false, 0);
+ consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1, -1,-1,
+ -1,-1, -1, -1, false, 0);
}
@@ -135,7 +136,7 @@ import java.util.Properties;
QueryResponse results = null;
boolean foundUpdates = false;
- for (int i = 0; i < 50; i++) {
+ for (int i = 0; i < 100; i++) {
solrCluster2.getSolrClient().commit(COLLECTION);
solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
@@ -157,7 +158,7 @@ import java.util.Properties;
addDocs(client, "second");
foundUpdates = false;
- for (int i = 0; i < 50; i++) {
+ for (int i = 0; i < 100; i++) {
solrCluster2.getSolrClient().commit(COLLECTION);
solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
@@ -182,7 +183,7 @@ import java.util.Properties;
addDocs(client, "third");
foundUpdates = false;
- for (int i = 0; i < 50; i++) {
+ for (int i = 0; i < 100; i++) {
solrCluster2.getSolrClient().commit(COLLECTION);
solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
diff --git a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
index 8d4cd13..94ad5c5 100644
--- a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
@@ -108,7 +108,8 @@ import java.util.Properties;
String bootstrapServers = kafkaCluster.bootstrapServers();
log.info("bootstrapServers={}", bootstrapServers);
- consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1, false, 0);
+ consumer.start(bootstrapServers, solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1, -1,-1,
+ -1,-1,-1, -1, false, 0);
}