You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ma...@apache.org on 2022/05/31 22:09:19 UTC
[solr-sandbox] branch crossdc-wip updated: Work out SolrAndKafkaIntegrationTest (#16)
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/crossdc-wip by this push:
new 9b0cced Work out SolrAndKafkaIntegrationTest (#16)
9b0cced is described below
commit 9b0cced1b6d6eff642de8120374eb7ae76e1f3c0
Author: Mark Robert Miller <ma...@apache.org>
AuthorDate: Tue May 31 17:09:14 2022 -0500
Work out SolrAndKafkaIntegrationTest (#16)
Each of the 2 tests still needs to be run separately - need to fix up running both in same run.
---
.../solr/crossdc/common/KafkaMirroringSink.java | 17 ++
.../common/MirroredSolrRequestSerializer.java | 2 +-
.../org/apache/solr/crossdc/consumer/Consumer.java | 225 +-----------------
.../crossdc/consumer/KafkaCrossDcConsumer.java | 252 +++++++++++++++++++++
.../messageprocessor/SolrMessageProcessor.java | 26 ++-
.../processor/KafkaRequestMirroringHandler.java | 12 +-
.../update/processor/MirroringUpdateProcessor.java | 143 ++++++++++++
.../MirroringUpdateRequestProcessorFactory.java | 118 ----------
.../solr/crossdc/SolrAndKafkaIntegrationTest.java | 21 +-
.../SolrKafkaTestsIgnoredThreadsFilter.java | 10 +-
crossdc-producer/src/test/resources/log4j2.xml | 11 +-
11 files changed, 474 insertions(+), 363 deletions(-)
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
index b062392..3b245f9 100644
--- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
@@ -18,6 +18,7 @@ package org.apache.solr.crossdc.common;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,6 +54,12 @@ public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
// Create Producer record
try {
+
+ producer.send(new ProducerRecord(conf.getTopicName(), request), (metadata, exception) -> {
+ log.info("Producer finished sending metadata={}, exception={}", metadata, exception);
+ });
+ producer.flush();
+
lastSuccessfulEnqueueNanos = System.nanoTime();
// Record time since last successful enqueue as 0
long elapsedTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - enqueueStartNanos);
@@ -86,9 +93,19 @@ public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
props.put("bootstrap.servers", conf.getBootStrapServers());
+ props.put("acks", "all");
+ props.put("retries", 99999);
+ props.put("batch.size", 15);
+ props.put("buffer.memory", 33554432);
+ props.put("linger.ms", 1);
+
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", MirroredSolrRequestSerializer.class.getName());
+ if (log.isDebugEnabled()) {
+ log.debug("Kafka Producer props={}", props);
+ }
+
ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(null);
Producer<String, MirroredSolrRequest> producer;
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
index 221f12f..1007c0b 100644
--- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
@@ -107,7 +107,7 @@ public class MirroredSolrRequestSerializer implements Serializer<MirroredSolrReq
ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream();
Map map = new HashMap();
- map.put("params", solrRequest.getParams().getMap());
+ map.put("params", solrRequest.getParams());
map.put("docs", solrRequest.getDocuments());
// TODO
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
index 5b3d01c..5bc7f6b 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -16,20 +16,7 @@
*/
package org.apache.solr.crossdc.consumer;
-import org.apache.kafka.clients.consumer.*;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.SerializationException;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.common.util.IOUtils;
-import org.apache.solr.crossdc.common.KafkaMirroringSink;
-import org.apache.solr.crossdc.common.MirroringException;
-import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
-import org.apache.solr.crossdc.common.IQueueHandler;
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
-import org.apache.solr.crossdc.common.MirroredSolrRequest;
-import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
@@ -38,11 +25,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
-import java.time.Duration;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -81,6 +63,9 @@ public class Consumer {
crossDcConsumer = getCrossDcConsumer(bootstrapServers, zkConnectString, topicName, enableDataEncryption);
// Start consumer thread
+
+ log.info("Starting CrossDC Consumer bootstrapServers={}, zkConnectString={}, topicName={}, enableDataEncryption={}", bootstrapServers, zkConnectString, topicName, enableDataEncryption);
+
consumerThreadExecutor = Executors.newSingleThreadExecutor();
consumerThreadExecutor.submit(crossDcConsumer);
@@ -122,208 +107,4 @@ public class Consumer {
}
- /**
- * Class to run the consumer thread for Kafka. This also contains the implementation for retries and
- * resubmitting to the queue in case of temporary failures.
- */
- public static class KafkaCrossDcConsumer extends CrossDcConsumer {
- private static final Logger log = LoggerFactory.getLogger(KafkaCrossDcConsumer.class);
-
- private final KafkaConsumer<String, MirroredSolrRequest> consumer;
- private final KafkaMirroringSink kafkaMirroringSink;
-
- private final int KAFKA_CONSUMER_POLL_TIMEOUT_MS = 100;
- private final String topicName;
- SolrMessageProcessor messageProcessor;
-
- CloudSolrClient solrClient;
-
- /**
- * @param conf The Kafka consumer configuration
- */
- public KafkaCrossDcConsumer(KafkaCrossDcConf conf) {
- this.topicName = conf.getTopicName();
-
- final Properties kafkaConsumerProp = new Properties();
-
- kafkaConsumerProp.put("bootstrap.servers", conf.getBootStrapServers());
-
- kafkaConsumerProp.put("group.id", "group_1"); // TODO
-
- solrClient = new CloudSolrClient.Builder(Collections.singletonList(conf.getSolrZkConnectString()), Optional.empty()).build();
-
- messageProcessor = new SolrMessageProcessor(solrClient, new ResubmitBackoffPolicy() {
- @Override public long getBackoffTimeMs(MirroredSolrRequest resubmitRequest) {
- return 0;
- }
- });
-
- log.info("Creating Kafka consumer with configuration {}", kafkaConsumerProp);
- consumer = createConsumer(kafkaConsumerProp);
-
- // Create producer for resubmitting failed requests
- log.info("Creating Kafka resubmit producer");
- this.kafkaMirroringSink = new KafkaMirroringSink(conf);
- log.info("Created Kafka resubmit producer");
-
- }
-
- private KafkaConsumer<String, MirroredSolrRequest> createConsumer(Properties properties) {
- KafkaConsumer kafkaConsumer = new KafkaConsumer(properties, new StringDeserializer(), new MirroredSolrRequestSerializer());
- return kafkaConsumer;
- }
-
- /**
- * This is where the magic happens.
- * 1. Polls and gets the packets from the queue
- * 2. Extract the MirroredSolrRequest objects
- * 3. Send the request to the MirroredSolrRequestHandler that has the processing, retry, error handling logic.
- */
- @Override
- public void run() {
- log.info("About to start Kafka consumer thread...");
-
- log.info("Kafka consumer subscribing to topic topic={}", topicName);
-
- try {
-
- consumer.subscribe(Collections.singleton(topicName));
-
- while (pollAndProcessRequests()) {
- //no-op within this loop: everything is done in pollAndProcessRequests method defined above.
- }
-
- log.info("Closed kafka consumer. Exiting now.");
- try {
- consumer.close();
- } catch (Exception e) {
- log.warn("Failed to close kafka consumer", e);
- }
-
- try {
- kafkaMirroringSink.close();
- } catch (Exception e) {
- log.warn("Failed to close kafka mirroring sink", e);
- }
- } finally {
- IOUtils.closeQuietly(solrClient);
- }
-
- }
-
- /**
- * Polls and processes the requests from Kafka. This method returns false when the consumer needs to be
- * shutdown i.e. when there's a wakeup exception.
- */
- boolean pollAndProcessRequests() {
- try {
- ConsumerRecords<String, MirroredSolrRequest> records = consumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_TIMEOUT_MS));
- for (TopicPartition partition : records.partitions()) {
- List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords = records.records(partition);
- try {
- for (ConsumerRecord<String, MirroredSolrRequest> record : partitionRecords) {
- log.info("Fetched record from topic={} partition={} key={} value={}",
- record.topic(), record.partition(), record.key(), record.value());
- IQueueHandler.Result result = messageProcessor.handleItem(record.value());
- switch (result.status()) {
- case FAILED_RESUBMIT:
- kafkaMirroringSink.submit(record.value());
- break;
- case HANDLED:
- // no-op
- break;
- case NOT_HANDLED_SHUTDOWN:
- case FAILED_RETRY:
- log.error("Unexpected response while processing request. We never expect {}.",
- result.status().toString());
- break;
- default:
- // no-op
- }
- }
- updateOffset(partition, partitionRecords);
-
- // handleItem sets the thread interrupt, let's exit if there has been an interrupt set
- if(Thread.currentThread().isInterrupted()) {
- log.info("Kafka Consumer thread interrupted, shutting down Kafka consumer.");
- return false;
- }
- } catch (MirroringException e) {
- // We don't really know what to do here, so it's wiser to just break out.
- log.error("Mirroring exception occured while resubmitting to Kafka. We are going to stop the consumer thread now.", e);
- return false;
- } catch (WakeupException e) {
- log.info("Caught wakeup exception, shutting down KafkaSolrRequestConsumer.");
- return false;
- } catch (Exception e) {
- // If there is any exception returned by handleItem, then reset the offset.
-
- if (e instanceof ClassCastException || e instanceof ClassNotFoundException || e instanceof SerializationException) {
- log.error("Non retryable error", e);
- break;
- }
- log.warn("Exception occurred in Kafka consumer thread, but we will continue.", e);
- resetOffsetForPartition(partition, partitionRecords);
- break;
- }
- }
- } catch (WakeupException e) {
- log.info("Caught wakeup exception, shutting down KafkaSolrRequestConsumer");
- return false;
- } catch (Exception e) {
-
- e.printStackTrace();
- if (e instanceof ClassCastException || e instanceof ClassNotFoundException || e instanceof SerializationException) {
- log.error("Non retryable error", e);
- return false;
- }
-
- log.error("Exception occurred in Kafka consumer thread, but we will continue.", e);
- }
- return true;
- }
-
- /**
- * Reset the local offset so that the consumer reads the records from Kafka again.
- * @param partition The TopicPartition to reset the offset for
- * @param partitionRecords PartitionRecords for the specified partition
- */
- private void resetOffsetForPartition(TopicPartition partition, List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords) {
- if (log.isDebugEnabled()) {
- log.debug("Resetting offset to: {}", partitionRecords.get(0).offset());
- }
- long resetOffset = partitionRecords.get(0).offset();
- consumer.seek(partition, resetOffset);
- }
-
- /**
- * Logs and updates the commit point for the partition that has been processed.
- * @param partition The TopicPartition to update the offset for
- * @param partitionRecords PartitionRecords for the specified partition
- */
- private void updateOffset(TopicPartition partition, List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords) {
- long nextOffset = partitionRecords.get(partitionRecords.size() - 1).offset() + 1;
- consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(nextOffset)));
-
- if (log.isTraceEnabled()) {
- log.trace("Updated offset for topic={} partition={} to offset={}",
- partition.topic(), partition.partition(), nextOffset);
- }
- }
-
- /**
- * Shutdown the Kafka consumer by calling wakeup.
- */
- public void shutdown() {
- log.info("Shutdown called on KafkaCrossDcConsumer");
- try {
- solrClient.close();
- } catch (Exception e) {
- log.warn("Exception closing Solr client on shutdown");
- }
- consumer.wakeup();
- }
-
-
- }
}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
new file mode 100644
index 0000000..5a7e1aa
--- /dev/null
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -0,0 +1,252 @@
+package org.apache.solr.crossdc.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.common.util.IOUtils;
+import org.apache.solr.crossdc.common.*;
+import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Class to run the consumer thread for Kafka. This also contains the implementation for retries and
+ * resubmitting to the queue in case of temporary failures.
+ */
+public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final KafkaConsumer<String, MirroredSolrRequest> consumer;
+ private final KafkaMirroringSink kafkaMirroringSink;
+
+ private final int KAFKA_CONSUMER_POLL_TIMEOUT_MS = 5000;
+ private final String topicName;
+ SolrMessageProcessor messageProcessor;
+
+ CloudSolrClient solrClient;
+
+ /**
+ * @param conf The Kafka consumer configuration
+ */
+ public KafkaCrossDcConsumer(KafkaCrossDcConf conf) {
+ this.topicName = conf.getTopicName();
+
+ final Properties kafkaConsumerProp = new Properties();
+
+ kafkaConsumerProp.put("bootstrap.servers", conf.getBootStrapServers());
+
+ kafkaConsumerProp.put("group.id", "group_1"); // TODO
+
+ solrClient =
+ new CloudSolrClient.Builder(Collections.singletonList(conf.getSolrZkConnectString()),
+ Optional.empty()).build();
+
+ messageProcessor = new SolrMessageProcessor(solrClient, new ResubmitBackoffPolicy() {
+ @Override public long getBackoffTimeMs(MirroredSolrRequest resubmitRequest) {
+ return 0;
+ }
+ });
+
+ log.info("Creating Kafka consumer with configuration {}", kafkaConsumerProp);
+ consumer = createConsumer(kafkaConsumerProp);
+
+ // Create producer for resubmitting failed requests
+ log.info("Creating Kafka resubmit producer");
+ this.kafkaMirroringSink = new KafkaMirroringSink(conf);
+ log.info("Created Kafka resubmit producer");
+
+ }
+
+ private KafkaConsumer<String, MirroredSolrRequest> createConsumer(Properties properties) {
+ KafkaConsumer kafkaConsumer = new KafkaConsumer(properties, new StringDeserializer(),
+ new MirroredSolrRequestSerializer());
+ return kafkaConsumer;
+ }
+
+ /**
+ * This is where the magic happens.
+ * 1. Polls and gets the packets from the queue
+ * 2. Extract the MirroredSolrRequest objects
+ * 3. Send the request to the MirroredSolrRequestHandler that has the processing, retry, error handling logic.
+ */
+ @Override public void run() {
+ log.info("About to start Kafka consumer thread, topic={}", topicName);
+
+ try {
+
+ consumer.subscribe(Collections.singleton(topicName));
+
+ while (pollAndProcessRequests()) {
+ //no-op within this loop: everything is done in pollAndProcessRequests method defined above.
+ }
+
+ log.info("Closed kafka consumer. Exiting now.");
+ try {
+ consumer.close();
+ } catch (Exception e) {
+ log.warn("Failed to close kafka consumer", e);
+ }
+
+ try {
+ kafkaMirroringSink.close();
+ } catch (Exception e) {
+ log.warn("Failed to close kafka mirroring sink", e);
+ }
+ } finally {
+ IOUtils.closeQuietly(solrClient);
+ }
+
+ }
+
+ /**
+ * Polls and processes the requests from Kafka. This method returns false when the consumer needs to be
+ * shutdown i.e. when there's a wakeup exception.
+ */
+ boolean pollAndProcessRequests() {
+ log.trace("Entered pollAndProcessRequests loop");
+ try {
+ ConsumerRecords<String, MirroredSolrRequest> records =
+ consumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_TIMEOUT_MS));
+ for (TopicPartition partition : records.partitions()) {
+ List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords =
+ records.records(partition);
+ try {
+ for (ConsumerRecord<String, MirroredSolrRequest> record : partitionRecords) {
+ if (log.isTraceEnabled()) {
+ log.trace("Fetched record from topic={} partition={} key={} value={}", record.topic(),
+ record.partition(), record.key(), record.value());
+ }
+ IQueueHandler.Result result = messageProcessor.handleItem(record.value());
+ switch (result.status()) {
+ case FAILED_RESUBMIT:
+ if (log.isTraceEnabled()) {
+ log.trace("result=failed-resubmit");
+ }
+ kafkaMirroringSink.submit(record.value());
+ break;
+ case HANDLED:
+ // no-op
+ if (log.isTraceEnabled()) {
+ log.trace("result=handled");
+ }
+ break;
+ case NOT_HANDLED_SHUTDOWN:
+ if (log.isTraceEnabled()) {
+ log.trace("result=nothandled_shutdown");
+ }
+ case FAILED_RETRY:
+ log.error("Unexpected response while processing request. We never expect {}.",
+ result.status().toString());
+ break;
+ default:
+ if (log.isTraceEnabled()) {
+ log.trace("result=no matching case");
+ }
+ // no-op
+ }
+ }
+ updateOffset(partition, partitionRecords);
+
+ // handleItem sets the thread interrupt, let's exit if there has been an interrupt set
+ if (Thread.currentThread().isInterrupted()) {
+ log.info("Kafka Consumer thread interrupted, shutting down Kafka consumer.");
+ return false;
+ }
+ } catch (MirroringException e) {
+ // We don't really know what to do here, so it's wiser to just break out.
+ log.error(
+ "Mirroring exception occurred while resubmitting to Kafka. We are going to stop the consumer thread now.",
+ e);
+ return false;
+ } catch (WakeupException e) {
+ log.info("Caught wakeup exception, shutting down KafkaSolrRequestConsumer.");
+ return false;
+ } catch (Exception e) {
+ // If there is any exception returned by handleItem, then reset the offset.
+
+ if (e instanceof ClassCastException || e instanceof ClassNotFoundException
+ || e instanceof SerializationException) { // TODO: optional
+ log.error("Non retryable error", e);
+ break;
+ }
+ log.warn("Exception occurred in Kafka consumer thread, but we will continue.", e);
+ resetOffsetForPartition(partition, partitionRecords);
+ break;
+ }
+ }
+ } catch (WakeupException e) {
+ log.info("Caught wakeup exception, shutting down KafkaSolrRequestConsumer");
+ return false;
+ } catch (Exception e) {
+
+ if (e instanceof ClassCastException || e instanceof ClassNotFoundException
+ || e instanceof SerializationException) { // TODO: optional
+ log.error("Non retryable error", e);
+ return false;
+ }
+
+ log.error("Exception occurred in Kafka consumer thread, but we will continue.", e);
+ }
+ return true;
+ }
+
+ /**
+ * Reset the local offset so that the consumer reads the records from Kafka again.
+ *
+ * @param partition The TopicPartition to reset the offset for
+ * @param partitionRecords PartitionRecords for the specified partition
+ */
+ private void resetOffsetForPartition(TopicPartition partition,
+ List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords) {
+ if (log.isTraceEnabled()) {
+ log.trace("Resetting offset to: {}", partitionRecords.get(0).offset());
+ }
+ long resetOffset = partitionRecords.get(0).offset();
+ consumer.seek(partition, resetOffset);
+ }
+
+ /**
+ * Logs and updates the commit point for the partition that has been processed.
+ *
+ * @param partition The TopicPartition to update the offset for
+ * @param partitionRecords PartitionRecords for the specified partition
+ */
+ private void updateOffset(TopicPartition partition,
+ List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords) {
+ long nextOffset = partitionRecords.get(partitionRecords.size() - 1).offset() + 1;
+
+ if (log.isTraceEnabled()) {
+ log.trace("Updated offset for topic={} partition={} to offset={}", partition.topic(),
+ partition.partition(), nextOffset);
+ }
+
+ consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(nextOffset)));
+ }
+
+ /**
+ * Shutdown the Kafka consumer by calling wakeup.
+ */
+ public void shutdown() {
+ log.info("Shutdown called on KafkaCrossDcConsumer");
+ try {
+ solrClient.close();
+ } catch (Exception e) {
+ log.warn("Exception closing Solr client on shutdown");
+ }
+ consumer.wakeup();
+ }
+
+}
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
index 140d0b9..d15e01d 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
@@ -71,12 +71,16 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
}
private Result<MirroredSolrRequest> handleSolrRequest(MirroredSolrRequest mirroredSolrRequest) {
- log.debug("Handling Solr request");
+
SolrRequest request = mirroredSolrRequest.getSolrRequest();
final SolrParams requestParams = request.getParams();
+ if (log.isTraceEnabled()) {
+ log.trace("handleSolrRequest params={}", requestParams);
+ }
+
final String shouldMirror = requestParams.get("shouldMirror");
- log.info("shouldMirror={}", shouldMirror);
+
if ("false".equalsIgnoreCase(shouldMirror)) {
log.warn("Skipping mirrored request because shouldMirror is set to false. request={}", requestParams);
return new Result<>(ResultStatus.FAILED_NO_RETRY);
@@ -87,7 +91,6 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
try {
prepareIfUpdateRequest(request);
logRequest(request);
- log.debug("About to submit Solr request {}", request);
result = processMirroredSolrRequest(request);
} catch (Exception e) {
result = handleException(mirroredSolrRequest, e);
@@ -160,12 +163,19 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
* Process the SolrRequest. If not, this method throws an exception.
*/
private Result<MirroredSolrRequest> processMirroredSolrRequest(SolrRequest request) throws Exception {
- log.info("Sending request to Solr at {} with params {}", client.getZkHost(), request.getParams());
+ if (log.isTraceEnabled()) {
+ log.trace("Sending request to Solr at ZK address={} with params {}", client.getZkStateReader().getZkClient().getZkServerAddress(), request.getParams());
+ }
Result<MirroredSolrRequest> result;
SolrResponseBase response = (SolrResponseBase) request.process(client);
int status = response.getStatus();
+
+ if (log.isTraceEnabled()) {
+ log.trace("result status={}", status);
+ }
+
if (status != 0) {
throw new SolrException(SolrException.ErrorCode.getErrorCode(status), "response=" + response);
}
@@ -215,11 +225,17 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
*/
private void sanitizeDocument(SolrInputDocument doc) {
SolrInputField field = doc.getField(VERSION_FIELD);
- log.info("Removing {} value={}", VERSION_FIELD, field == null ? "null" : field.getValue());
+ if (log.isTraceEnabled()) {
+ log.trace("Removing {} value={}", VERSION_FIELD,
+ field == null ? "null" : field.getValue());
+ }
doc.remove(VERSION_FIELD);
}
private void removeVersionFromDeleteByIds(UpdateRequest updateRequest) {
+ if (log.isTraceEnabled()) {
+ log.trace("remove versions from deletebyids");
+ }
Map<String, Map<String, Object>> deleteIds = updateRequest.getDeleteByIdMap();
if (deleteIds != null) {
for (Map<String, Object> idParams : deleteIds.values()) {
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
index bcd620f..937571b 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/KafkaRequestMirroringHandler.java
@@ -45,9 +45,13 @@ public class KafkaRequestMirroringHandler implements RequestMirroringHandler {
*/
@Override
public void mirror(UpdateRequest request) throws MirroringException {
- log.info("submit update to sink {}", request.getDocuments());
- // TODO: Enforce external version constraint for consistent update replication (cross-cluster)
- sink.submit(new MirroredSolrRequest(1, request, TimeUnit.MILLISECONDS.toNanos(
- System.currentTimeMillis())));
+ if (log.isTraceEnabled()) {
+ log.trace("submit update to sink docs={}, deletes={}, params={}", request.getDocuments(), request.getDeleteById(), request.getParams());
+ }
+ // TODO: Enforce external version constraint for consistent update replication (cross-cluster)
+ //if (request.getParams().get("shouldMirror") == null) { // TODO: work out proper shouldMirror semantics
+ request.getParams().set("shouldMirror", "true");
+ //}
+ sink.submit(new MirroredSolrRequest(1, request, TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis())));
}
}
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
new file mode 100644
index 0000000..e2e09f2
--- /dev/null
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
@@ -0,0 +1,143 @@
+package org.apache.solr.update.processor;
+
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.CommitUpdateCommand;
+import org.apache.solr.update.DeleteUpdateCommand;
+import org.apache.solr.update.RollbackUpdateCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+
+import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
+
+public class MirroringUpdateProcessor extends UpdateRequestProcessor {
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ /**
+ * Flag indicating whether this instance creates and submits a mirrored request. This override is
+ * necessary to prevent circular mirroring between coupled cluster running this processor.
+ */
+ private final boolean doMirroring;
+ private final RequestMirroringHandler requestMirroringHandler;
+
+ /**
+ * The mirrored request starts as null, gets created and appended to at each process() call,
+ * then submitted on finish().
+ */
+ private UpdateRequest mirrorRequest;
+ private final SolrParams mirrorParams;
+
+ /**
+ * The distributed processor downstream from us so we can establish if we're running on a leader shard
+ */
+ private DistributedUpdateProcessor distProc;
+
+ /**
+ * Distribution phase of the incoming requests
+ */
+ private DistributedUpdateProcessor.DistribPhase distribPhase;
+
+ public MirroringUpdateProcessor(final UpdateRequestProcessor next, boolean doMirroring,
+ final SolrParams mirroredReqParams,
+ final DistributedUpdateProcessor.DistribPhase distribPhase,
+ final RequestMirroringHandler requestMirroringHandler) {
+ super(next);
+ this.doMirroring = doMirroring;
+ this.mirrorParams = mirroredReqParams;
+ this.distribPhase = distribPhase;
+ this.requestMirroringHandler = requestMirroringHandler;
+
+ // Find the downstream distributed update processor
+ for (UpdateRequestProcessor proc = next; proc != null; proc = proc.next) {
+ if (proc instanceof DistributedUpdateProcessor) {
+ distProc = (DistributedUpdateProcessor) proc;
+ break;
+ }
+ }
+ if (distProc == null) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "DistributedUpdateProcessor must follow "
+ + MirroringUpdateProcessor.class.getSimpleName());
+ }
+ }
+
+ private UpdateRequest createAndOrGetMirrorRequest() {
+ if (mirrorRequest == null) {
+ mirrorRequest = new UpdateRequest();
+ mirrorRequest.setParams(new ModifiableSolrParams(mirrorParams));
+ }
+ if (log.isDebugEnabled())
+ log.debug("createOrGetMirrorRequest={}",
+ mirrorRequest);
+ return mirrorRequest;
+ }
+
+ @Override public void processAdd(final AddUpdateCommand cmd) throws IOException {
+ if (log.isDebugEnabled())
+ log.debug("processAdd isLeader={} cmd={}", distProc.isLeader(), cmd);
+ super.processAdd(cmd); // let this throw to prevent mirroring invalid reqs
+
+ // submit only from the leader shards so we mirror each doc once
+ if (doMirroring && distProc.isLeader()) {
+ SolrInputDocument doc = cmd.getSolrInputDocument().deepCopy();
+ doc.removeField(CommonParams.VERSION_FIELD); // strip internal doc version
+ createAndOrGetMirrorRequest().add(doc, cmd.commitWithin, cmd.overwrite);
+ }
+ }
+
+ @Override public void processDelete(final DeleteUpdateCommand cmd) throws IOException {
+ if (log.isDebugEnabled())
+ log.debug("processDelete doMirroring={} isLeader={} cmd={}", doMirroring, distProc.isLeader(), cmd);
+ super.processDelete(cmd); // let this throw to prevent mirroring invalid requests
+
+ if (doMirroring) {
+ if (cmd.isDeleteById()) {
+ // deleteById requests runs once per leader, so we just submit the request from the leader shard
+ if (distProc.isLeader()) {
+ createAndOrGetMirrorRequest().deleteById(cmd.getId()); // strip versions from deletes
+ }
+ } else {
+ // DBQs are sent to each shard leader, so we mirror from the original node to only mirror once
+ // In general there's no way to guarantee that these run identically on the mirror since there are no
+ // external doc versions.
+ // TODO: Can we actually support this considering DBQs aren't versioned.
+ if (distribPhase == DistributedUpdateProcessor.DistribPhase.NONE) {
+ createAndOrGetMirrorRequest().deleteByQuery(cmd.query);
+ }
+ }
+ }
+ }
+
+ @Override public void processRollback(final RollbackUpdateCommand cmd) throws IOException {
+ super.processRollback(cmd);
+ // TODO: We can't/shouldn't support this ?
+ }
+
+ public void processCommit(CommitUpdateCommand cmd) throws IOException {
+ log.info("process commit cmd={}", cmd);
+ if (next != null) next.processCommit(cmd);
+ }
+
+ @Override public void finish() throws IOException {
+ super.finish();
+
+ if (doMirroring && mirrorRequest != null) {
+ try {
+ requestMirroringHandler.mirror(mirrorRequest);
+ mirrorRequest = null; // so we don't accidentally submit it again
+ } catch (Exception e) {
+ log.error("mirror submit failed", e);
+ throw new SolrException(SERVER_ERROR, "mirror submit failed", e);
+ }
+ }
+ }
+}
diff --git a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
index cabdda8..d66dba7 100644
--- a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
+++ b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -16,12 +16,9 @@
*/
package org.apache.solr.update.processor;
-import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CloseHook;
import org.apache.solr.core.CoreDescriptor;
@@ -30,10 +27,6 @@ import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.common.KafkaMirroringSink;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
-import org.apache.solr.update.AddUpdateCommand;
-import org.apache.solr.update.CommitUpdateCommand;
-import org.apache.solr.update.DeleteUpdateCommand;
-import org.apache.solr.update.RollbackUpdateCommand;
import org.apache.solr.util.plugin.SolrCoreAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,7 +34,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
-import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
import static org.apache.solr.update.processor.DistributedUpdateProcessor.*;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
@@ -166,114 +158,4 @@ public class MirroringUpdateRequestProcessorFactory extends UpdateRequestProcess
DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)), doMirroring ? mirroringHandler : null);
}
- public static class MirroringUpdateProcessor extends UpdateRequestProcessor {
- /** Flag indicating whether this instance creates and submits a mirrored request. This override is
- * necessary to prevent circular mirroring between coupled cluster running this processor. */
- private final boolean doMirroring;
- private final RequestMirroringHandler requestMirroringHandler;
-
- /** The mirrored request starts as null, gets created and appended to at each process() call,
- * then submitted on finish(). */
- private UpdateRequest mirrorRequest;
- private final SolrParams mirrorParams;
-
- /** The distributed processor downstream from us so we can establish if we're running on a leader shard */
- private DistributedUpdateProcessor distProc;
-
- /** Distribution phase of the incoming requests */
- private DistribPhase distribPhase;
-
- public MirroringUpdateProcessor(final UpdateRequestProcessor next, boolean doMirroring,
- final SolrParams mirroredReqParams, final DistribPhase distribPhase,
- final RequestMirroringHandler requestMirroringHandler) {
- super(next);
- this.doMirroring = doMirroring;
- this.mirrorParams = mirroredReqParams;
- this.distribPhase = distribPhase;
- this.requestMirroringHandler = requestMirroringHandler;
-
- // Find the downstream distributed update processor
- for (UpdateRequestProcessor proc = next; proc != null; proc = proc.next) {
- if (proc instanceof DistributedUpdateProcessor) {
- distProc = (DistributedUpdateProcessor) proc;
- break;
- }
- }
- if (distProc == null) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "DistributedUpdateProcessor must follow "
- + MirroringUpdateProcessor.class.getSimpleName());
- }
- }
-
- private UpdateRequest createAndOrGetMirrorRequest() {
- if (mirrorRequest == null) {
- mirrorRequest = new UpdateRequest();
- mirrorRequest.setParams(new ModifiableSolrParams(mirrorParams));
- }
- if (log.isDebugEnabled()) log.debug("createOrGetMirrorRequest={}", mirrorRequest);
- return mirrorRequest;
- }
-
- @Override
- public void processAdd(final AddUpdateCommand cmd) throws IOException {
- if (log.isDebugEnabled()) log.debug("processAdd isLeader={} cmd={}", distProc.isLeader(), cmd);
- super.processAdd(cmd); // let this throw to prevent mirroring invalid reqs
-
- // submit only from the leader shards so we mirror each doc once
- if (doMirroring && distProc.isLeader()) {
- SolrInputDocument doc = cmd.getSolrInputDocument().deepCopy();
- doc.removeField(CommonParams.VERSION_FIELD); // strip internal doc version
- createAndOrGetMirrorRequest().add(doc, cmd.commitWithin, cmd.overwrite);
- }
- }
-
- @Override
- public void processDelete(final DeleteUpdateCommand cmd) throws IOException {
- if (log.isDebugEnabled()) log.debug("processDelete doMirroring={} isLeader={} cmd={}", doMirroring, distProc.isLeader(), cmd);
- super.processDelete(cmd); // let this throw to prevent mirroring invalid requests
-
- if (doMirroring) {
- if (cmd.isDeleteById()) {
- // deleteById requests runs once per leader, so we just submit the request from the leader shard
- if (distProc.isLeader()) {
- createAndOrGetMirrorRequest().deleteById(cmd.getId()); // strip versions from deletes
- }
- } else {
- // DBQs are sent to each shard leader, so we mirror from the original node to only mirror once
- // In general there's no way to guarantee that these run identically on the mirror since there are no
- // external doc versions.
- // TODO: Can we actually support this considering DBQs aren't versioned.
- if (distribPhase == DistribPhase.NONE) {
- createAndOrGetMirrorRequest().deleteByQuery(cmd.query);
- }
- }
- }
- }
-
- @Override
- public void processRollback(final RollbackUpdateCommand cmd) throws IOException {
- super.processRollback(cmd);
- // TODO: We can't/shouldn't support this ?
- }
-
- public void processCommit(CommitUpdateCommand cmd) throws IOException {
- log.info("process commit cmd={}", cmd);
- if (next != null) next.processCommit(cmd);
- }
-
- @Override
- public void finish() throws IOException {
- super.finish();
-
- if (doMirroring && mirrorRequest != null) {
- try {
- requestMirroringHandler.mirror(mirrorRequest);
- mirrorRequest = null; // so we don't accidentally submit it again
- } catch (Exception e) {
- log.error("mirror submit failed", e);
- throw new SolrException(SERVER_ERROR, "mirror submit failed", e);
- }
- }
- }
- }
}
\ No newline at end of file
diff --git a/crossdc-producer/src/test/Java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
similarity index 93%
rename from crossdc-producer/src/test/Java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
rename to crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
index 7df4438..d811bd0 100644
--- a/crossdc-producer/src/test/Java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
@@ -54,11 +54,12 @@ import static org.mockito.Mockito.spy;
private static String COLLECTION = "collection1";
- @BeforeClass public static void setupIntegrationTest() throws Exception {
+ @BeforeClass
+ public static void beforeSolrAndKafkaIntegrationTest() throws Exception {
Properties config = new Properties();
- //config.put("unclean.leader.election.enable", "true");
- //config.put("enable.partition.eof", "false");
+ config.put("unclean.leader.election.enable", "true");
+ config.put("enable.partition.eof", "false");
kafkaCluster = new EmbeddedKafkaCluster(NUM_BROKERS, config) {
public String bootstrapServers() {
@@ -99,7 +100,8 @@ import static org.mockito.Mockito.spy;
}
- @AfterClass public static void tearDownIntegrationTest() throws Exception {
+ @AfterClass
+ public static void afterSolrAndKafkaIntegrationTest() throws Exception {
ObjectReleaseTracker.clear();
consumer.shutdown();
@@ -151,7 +153,6 @@ import static org.mockito.Mockito.spy;
}
}
- //producer.close();
System.out.println("Closed producer");
assertTrue("results=" + results, foundUpdates);
@@ -160,7 +161,7 @@ import static org.mockito.Mockito.spy;
}
public void testProducerToCloud() throws Exception {
- Thread.sleep(10000);
+ Thread.sleep(10000); // TODO: why?
Properties properties = new Properties();
properties.put("bootstrap.servers", kafkaCluster.bootstrapServers());
properties.put("acks", "all");
@@ -185,20 +186,20 @@ import static org.mockito.Mockito.spy;
System.out.println("Sent producer record");
+ solrCluster2.getSolrClient().commit(COLLECTION);
+
QueryResponse results = null;
boolean foundUpdates = false;
for (int i = 0; i < 50; i++) {
- // solrCluster1.getSolrClient().commit(COLLECTION);
+ solrCluster2.getSolrClient().commit(COLLECTION);
results = solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
- if (results.getResults().getNumFound() == 1) {
+ if (results.getResults().getNumFound() == 2) {
foundUpdates = true;
} else {
Thread.sleep(500);
}
}
- System.out.println("Closed producer");
-
assertTrue("results=" + results, foundUpdates);
System.out.println("Rest: " + results);
diff --git a/crossdc-producer/src/test/Java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java
similarity index 90%
rename from crossdc-producer/src/test/Java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java
rename to crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java
index ca6bbb1..747b06f 100644
--- a/crossdc-producer/src/test/Java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java
+++ b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrKafkaTestsIgnoredThreadsFilter.java
@@ -34,7 +34,15 @@ public class SolrKafkaTestsIgnoredThreadsFilter implements ThreadFilter {
if (threadName.startsWith("metrics-meter-tick-thread")) {
return true;
}
-
+
+ if (threadName.startsWith("pool-")) {
+ return true;
+ }
+
+ if (threadName.startsWith("kafka-")) { // TODO
+ return true;
+ }
+
return false;
}
diff --git a/crossdc-producer/src/test/resources/log4j2.xml b/crossdc-producer/src/test/resources/log4j2.xml
index c63e736..d3705d4 100644
--- a/crossdc-producer/src/test/resources/log4j2.xml
+++ b/crossdc-producer/src/test/resources/log4j2.xml
@@ -29,8 +29,8 @@
<RollingRandomAccessFile
name="MainLogFile"
- fileName="${sys:log.dir:-logs}/${sys:log.name:-solr}.log"
- filePattern="${sys:log.dir:-logs}/${sys:log.name:-solr}.log.%i">
+ fileName="${sys:log.dir:-logs}/${sys:log.name:-crossdc}.log"
+ filePattern="${sys:log.dir:-logs}/${sys:log.name:-crossdc}.log.%i">
<PatternLayout>
<Pattern>
%maxLen{%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p (%t) [%notEmpty{c:%X{collection}}%notEmpty{ s:%X{shard}}%notEmpty{ r:%X{replica}}%notEmpty{ x:%X{core}}] %c{1.}
@@ -59,6 +59,13 @@
<Logger name="org.apache.solr.hadoop" level="INFO"/>
<Logger name="org.eclipse.jetty" level="INFO"/>
+ <Logger name="org.apache.solr.crossdc.consumer.KafkaCrossDcConsumer" level="TRACE"/>
+ <Logger name="org.apache.solr.update.processor.MirroringUpdateProcessor" level="TRACE"/>
+ <Logger name="org.apache.solr.update.processor.KafkaRequestMirroringHandler" level="TRACE"/>
+ <Logger name="org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor" level="TRACE"/>
+ <Logger name="org.apache.solr.crossdc.common.KafkaMirroringSink" level="TRACE"/>
+
+
<Root level="INFO">
<AppenderRef ref="MainLogFile"/>
<AppenderRef ref="STDERR"/>