You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ma...@apache.org on 2023/05/22 23:34:58 UTC
[solr-sandbox] branch crossdc-wip updated: Update some logging, add a couple kafka consumer config options. (#59)
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/crossdc-wip by this push:
new 2a2f4f4 Update some logging, add a couple kafka consumer config options. (#59)
2a2f4f4 is described below
commit 2a2f4f4b94d851439ed0e66fa41fc5aff2089a1b
Author: Mark Robert Miller <ma...@apache.org>
AuthorDate: Mon May 22 18:34:53 2023 -0500
Update some logging, add a couple kafka consumer config options. (#59)
---
CROSSDC.md | 2 +-
.../solr/crossdc/common/KafkaCrossDcConf.java | 19 +++++++++++++++
.../crossdc/consumer/KafkaCrossDcConsumer.java | 27 +++++++++++++++-------
.../messageprocessor/SolrMessageProcessor.java | 18 +++++++++------
.../apache/solr/crossdc/TestMessageProcessor.java | 14 +++++------
5 files changed, 56 insertions(+), 24 deletions(-)
diff --git a/CROSSDC.md b/CROSSDC.md
index 5c528f4..cf8deaf 100644
--- a/CROSSDC.md
+++ b/CROSSDC.md
@@ -124,7 +124,7 @@ central config properties file. This is because the Consumer will use a Producer
*bufferMemoryBytes* - the amount of memory in bytes allocated by the Producer in total for buffering
*lingerMs* - the amount of time that the Producer will wait to add to a batch
*requestTimeout* - request timeout for the Producer - when used for the Consumers retry Producer, this should be less than the timeout that will cause the Consumer to be removed from the group for taking too long.
-
+ *maxPollIntervalMs* - The maximum delay between invocations of poll() when using consumer group management.
#### Central Configuration Option
You can optionally manage the configuration centrally in Solr's Zookeeper cluster by placing a properties file called *crossdc.properties* in the root Solr Zookeeper znode, eg, */solr/crossdc.properties*. This allows you to update the configuration in a central location rather than at each solrconfig.xml in each Solr node and also automatically deals with new Solr nodes or Consumers to come up without requiring additional configuration.
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
index 0b45bbb..251754d 100644
--- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
@@ -49,6 +49,10 @@ public class KafkaCrossDcConf extends CrossDcConf {
public static final String DEFAULT_MAX_PARTITION_FETCH_BYTES = "33554432";
+ public static final String DEFAULT_MAX_POLL_INTERVAL_MS = "90000";
+
+ public static final String DEFAULT_SESSION_TIMEOUT_MS = "10000";
+
public static final String DEFAULT_PORT = "8090";
private static final String DEFAULT_GROUP_ID = "SolrCrossDCConsumer";
@@ -86,6 +90,18 @@ public class KafkaCrossDcConf extends CrossDcConf {
public static final String FETCH_MAX_BYTES = "fetchMaxBytes";
+ // The maximum delay between invocations of poll() when using consumer group management. This places
+ // an upper bound on the amount of time that the consumer can be idle before fetching more records.
+ // If poll() is not called before expiration of this timeout, then the consumer is considered failed
+ // and the group will rebalance in order to reassign the partitions to another member. For consumers
+ // using a non-null <code>group.instance.id</code> which reach this timeout, partitions will not be
+ // immediately reassigned. Instead, the consumer will stop sending heartbeats and partitions will be
+ // reassigned after expiration of <code>session.timeout.ms</code>. This mirrors the behavior of a
+ // static consumer which has shutdown.
+ public static final String MAX_POLL_INTERVAL_MS = "maxPollIntervalMs";
+
+ public static final String SESSION_TIMEOUT_MS = "sessionTimeoutMs";
+
public static final String MAX_PARTITION_FETCH_BYTES = "maxPartitionFetchBytes";
public static final String ZK_CONNECT_STRING = "zkConnectString";
@@ -121,6 +137,9 @@ public class KafkaCrossDcConf extends CrossDcConf {
new ConfigProperty(FETCH_MIN_BYTES, DEFAULT_FETCH_MIN_BYTES),
new ConfigProperty(FETCH_MAX_BYTES, DEFAULT_FETCH_MAX_BYTES),
new ConfigProperty(FETCH_MAX_WAIT_MS, DEFAULT_FETCH_MAX_WAIT_MS),
+ new ConfigProperty(MAX_POLL_INTERVAL_MS, DEFAULT_MAX_POLL_INTERVAL_MS),
+ new ConfigProperty(SESSION_TIMEOUT_MS, DEFAULT_SESSION_TIMEOUT_MS),
+
new ConfigProperty(MAX_PARTITION_FETCH_BYTES, DEFAULT_MAX_PARTITION_FETCH_BYTES),
new ConfigProperty(MAX_POLL_RECORDS, DEFAULT_MAX_POLL_RECORDS),
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index 451253b..2f66d0f 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -53,6 +53,9 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
kafkaConsumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, conf.getInt(KafkaCrossDcConf.MAX_POLL_RECORDS));
+ kafkaConsumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, conf.get(KafkaCrossDcConf.MAX_POLL_INTERVAL_MS));
+
+ kafkaConsumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, conf.get(KafkaCrossDcConf.SESSION_TIMEOUT_MS));
kafkaConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaConsumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
@@ -142,14 +145,19 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
record.partition(), record.key(), record.value());
}
IQueueHandler.Result<MirroredSolrRequest> result = messageProcessor.handleItem(record.value());
+ if (log.isTraceEnabled()) {
+ log.trace("Finished processing record from topic={} partition={} key={} value={} result={}",
+ record.topic(), record.partition(), record.key(), record.value(), result);
+ }
+
switch (result.status()) {
case FAILED_RESUBMIT:
// currently, we use a strategy taken from an earlier working implementation
// of just resubmitting back to the queue - note that in rare cases, this could
// allow for incorrect update reorders
- if (log.isTraceEnabled()) {
- log.trace("result=failed-resubmit");
- }
+
+ log.info("Resubmitting failed Solr update to Kafka queue");
+
metrics.counter("failed-resubmit").inc();
kafkaMirroringSink.submit(record.value());
break;
@@ -228,9 +236,7 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
*/
private void resetOffsetForPartition(TopicPartition partition,
List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords) {
- if (log.isTraceEnabled()) {
- log.trace("Resetting offset to: {}", partitionRecords.get(0).offset());
- }
+ log.info("Resetting offset to: {}", partitionRecords.get(0).offset());
long resetOffset = partitionRecords.get(0).offset();
consumer.seek(partition, resetOffset);
}
@@ -245,12 +251,17 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords) {
long nextOffset = partitionRecords.get(partitionRecords.size() - 1).offset() + 1;
- if (log.isTraceEnabled()) {
- log.trace("Updated offset for topic={} partition={} to offset={}", partition.topic(),
+ if (log.isDebugEnabled()) {
+ log.trace("Updating offset for topic={} partition={} to offset={}", partition.topic(),
partition.partition(), nextOffset);
}
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(nextOffset)));
+
+ if (log.isDebugEnabled()) {
+ log.trace("Finished updating offset for topic={} partition={} to offset={}", partition.topic(),
+ partition.partition(), nextOffset);
+ }
}
/**
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
index b1a428e..f906ed1 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
@@ -82,8 +82,8 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
SolrRequest request = mirroredSolrRequest.getSolrRequest();
final SolrParams requestParams = request.getParams();
- if (log.isTraceEnabled()) {
- log.trace("handleSolrRequest params={}", requestParams);
+ if (log.isDebugEnabled()) {
+ log.debug("handleSolrRequest start params={}", requestParams);
}
// TODO: isn't this handled by the mirroring handler?
@@ -103,7 +103,9 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
} catch (Exception e) {
result = handleException(mirroredSolrRequest, e);
}
-
+ if (log.isDebugEnabled()) {
+ log.debug("handleSolrRequest end params={} result={}", requestParams, result);
+ }
return result;
}
@@ -111,7 +113,7 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
final SolrException solrException = SolrExceptionUtil.asSolrException(e);
logIf4xxException(solrException);
if (!isRetryable(e)) {
- logFailure(mirroredSolrRequest, e, solrException, false);
+ log.error("Non retryable exception processing Solr update", e);
return new Result<>(ResultStatus.FAILED_NO_RETRY, e);
} else {
logFailure(mirroredSolrRequest, e, solrException, true);
@@ -171,8 +173,8 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
* Process the SolrRequest. If not, this method throws an exception.
*/
private Result<MirroredSolrRequest> processMirroredSolrRequest(SolrRequest request) throws Exception {
- if (log.isTraceEnabled()) {
- log.trace("Sending request to Solr at ZK address={} with params {}", client.getZkStateReader().getZkClient().getZkServerAddress(), request.getParams());
+ if (log.isDebugEnabled()) {
+ log.debug("Sending request to Solr at ZK address={} with params {}", client.getZkStateReader().getZkClient().getZkServerAddress(), request.getParams());
}
Result<MirroredSolrRequest> result;
@@ -190,7 +192,9 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
}
metrics.counter("processed").inc();
-
+ if (log.isDebugEnabled()) {
+ log.debug("Finished sending request to Solr at ZK address={} with params {} status_code={}", client.getZkStateReader().getZkClient().getZkServerAddress(), request.getParams(), status);
+ }
result = new Result<>(ResultStatus.HANDLED);
return result;
}
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java
index 9dc7073..a9dde63 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java
@@ -43,18 +43,16 @@ import static org.mockito.Mockito.*;
public class TestMessageProcessor {
static final String VERSION_FIELD = "_version_";
- static class NoOpResubmitBackoffPolicy implements ResubmitBackoffPolicy {
- @Override
- public long getBackoffTimeMs(MirroredSolrRequest resubmitRequest) {
- return 0;
- }
- }
-
@Mock
private CloudSolrClient solrClient;
private SolrMessageProcessor processor;
- private ResubmitBackoffPolicy backoffPolicy = spy(new NoOpResubmitBackoffPolicy());
+ private ResubmitBackoffPolicy backoffPolicy = spy(new ResubmitBackoffPolicy() {
+ @Override
+ public long getBackoffTimeMs(MirroredSolrRequest resubmitRequest) {
+ return 0;
+ }
+ });
@Before
public void setUp() {