You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ma...@apache.org on 2023/08/11 19:17:48 UTC
[solr-sandbox] branch main updated: Update some logging, add a couple kafka consumer config options.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/main by this push:
new 881bf2d Update some logging, add a couple kafka consumer config options.
881bf2d is described below
commit 881bf2dad29791bb3aeb6c92e6bd8db89d5c914f
Author: markrmiller <ma...@apache.org>
AuthorDate: Fri Aug 11 14:17:18 2023 -0500
Update some logging, add a couple kafka consumer config options.
---
CROSSDC.md | 1 +
.../apache/solr/crossdc/common/KafkaCrossDcConf.java | 19 +++++++++++++++++++
crossdc-consumer/machinet.conf | 3 ---
.../solr/crossdc/consumer/KafkaCrossDcConsumer.java | 3 +++
.../messageprocessor/SolrMessageProcessor.java | 18 +++++++++++-------
.../org/apache/solr/crossdc/TestMessageProcessor.java | 7 ++++++-
6 files changed, 40 insertions(+), 11 deletions(-)
diff --git a/CROSSDC.md b/CROSSDC.md
index d44bec8..2b03953 100644
--- a/CROSSDC.md
+++ b/CROSSDC.md
@@ -85,6 +85,7 @@ Optional configuration properties used when the consumer must retry by putting u
- `bufferMemoryBytes`: memory allocated by the Producer in total for buffering
- `lingerMs`: amount of time that the Producer will wait to add to a batch
- `requestTimeout`: request timeout for the Producer
+- `maxPollIntervalMs`: the maximum delay between invocations of poll() when using consumer group management.
#### Central Configuration Option
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
index 1901de3..92d8c7b 100644
--- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
@@ -51,6 +51,10 @@ public class KafkaCrossDcConf extends CrossDcConf {
public static final String DEFAULT_MAX_PARTITION_FETCH_BYTES = "33554432";
+ public static final String DEFAULT_MAX_POLL_INTERVAL_MS = "90000";
+
+ public static final String DEFAULT_SESSION_TIMEOUT_MS = "10000";
+
public static final String DEFAULT_PORT = "8090";
private static final String DEFAULT_GROUP_ID = "SolrCrossDCConsumer";
@@ -92,6 +96,18 @@ public class KafkaCrossDcConf extends CrossDcConf {
public static final String FETCH_MAX_BYTES = "fetchMaxBytes";
+ // The maximum delay between invocations of poll() when using consumer group management. This places
+ // an upper bound on the amount of time that the consumer can be idle before fetching more records.
+ // If poll() is not called before expiration of this timeout, then the consumer is considered failed
+ // and the group will rebalance in order to reassign the partitions to another member. For consumers
+ // using a non-null <code>group.instance.id</code> which reach this timeout, partitions will not be
+ // immediately reassigned. Instead, the consumer will stop sending heartbeats and partitions will be
+ // reassigned after expiration of <code>session.timeout.ms</code>. This mirrors the behavior of a
+ // static consumer which has shutdown.
+ public static final String MAX_POLL_INTERVAL_MS = "maxPollIntervalMs";
+
+ public static final String SESSION_TIMEOUT_MS = "sessionTimeoutMs";
+
public static final String MAX_PARTITION_FETCH_BYTES = "maxPartitionFetchBytes";
public static final String ZK_CONNECT_STRING = "zkConnectString";
@@ -127,6 +143,9 @@ public class KafkaCrossDcConf extends CrossDcConf {
new ConfigProperty(FETCH_MAX_BYTES, DEFAULT_FETCH_MAX_BYTES),
new ConfigProperty(FETCH_MAX_WAIT_MS, DEFAULT_FETCH_MAX_WAIT_MS),
new ConfigProperty(CONSUMER_PROCESSING_THREADS, DEFAULT_CONSUMER_PROCESSING_THREADS),
+ new ConfigProperty(MAX_POLL_INTERVAL_MS, DEFAULT_MAX_POLL_INTERVAL_MS),
+ new ConfigProperty(SESSION_TIMEOUT_MS, DEFAULT_SESSION_TIMEOUT_MS),
+
new ConfigProperty(MAX_PARTITION_FETCH_BYTES, DEFAULT_MAX_PARTITION_FETCH_BYTES),
new ConfigProperty(MAX_POLL_RECORDS, DEFAULT_MAX_POLL_RECORDS),
diff --git a/crossdc-consumer/machinet.conf b/crossdc-consumer/machinet.conf
deleted file mode 100644
index 1c152ae..0000000
--- a/crossdc-consumer/machinet.conf
+++ /dev/null
@@ -1,3 +0,0 @@
-### Please DO NOT modify the contents of this file. For internal purpose only
-root=7badab89-0174-350c-a9c1-20ef5b1bf5a5
-rootId=1af1047d-7c48-30ff-a67a-1bfedfa05bc4
\ No newline at end of file
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index e8195ac..b57d071 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -73,6 +73,9 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
kafkaConsumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, conf.getInt(KafkaCrossDcConf.MAX_POLL_RECORDS));
+ kafkaConsumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, conf.get(KafkaCrossDcConf.MAX_POLL_INTERVAL_MS));
+
+ kafkaConsumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, conf.get(KafkaCrossDcConf.SESSION_TIMEOUT_MS));
kafkaConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaConsumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
index dcea1d0..aad1b77 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
@@ -82,8 +82,8 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
SolrRequest request = mirroredSolrRequest.getSolrRequest();
final SolrParams requestParams = request.getParams();
- if (log.isTraceEnabled()) {
- log.trace("handleSolrRequest params={}", requestParams);
+ if (log.isDebugEnabled()) {
+ log.debug("handleSolrRequest start params={}", requestParams);
}
// TODO: isn't this handled by the mirroring handler?
@@ -103,7 +103,9 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
} catch (Exception e) {
result = handleException(mirroredSolrRequest, e);
}
-
+ if (log.isDebugEnabled()) {
+ log.debug("handleSolrRequest end params={} result={}", requestParams, result);
+ }
return result;
}
@@ -111,7 +113,7 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
final SolrException solrException = SolrExceptionUtil.asSolrException(e);
logIf4xxException(solrException);
if (!isRetryable(e)) {
- logFailure(mirroredSolrRequest, e, solrException, false);
+ log.error("Non retryable exception processing Solr update", e);
return new Result<>(ResultStatus.FAILED_NO_RETRY, e);
} else {
logFailure(mirroredSolrRequest, e, solrException, true);
@@ -171,8 +173,8 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
* Process the SolrRequest. If not, this method throws an exception.
*/
private Result<MirroredSolrRequest> processMirroredSolrRequest(SolrRequest request) throws Exception {
- if (log.isTraceEnabled()) {
- log.trace("Sending request to Solr at ZK address={} with params {}", client.getZkStateReader().getZkClient().getZkServerAddress(), request.getParams());
+ if (log.isDebugEnabled()) {
+ log.debug("Sending request to Solr at ZK address={} with params {}", client.getZkStateReader().getZkClient().getZkServerAddress(), request.getParams());
}
Result<MirroredSolrRequest> result;
@@ -190,7 +192,9 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
}
metrics.counter("processed").inc();
-
+ if (log.isDebugEnabled()) {
+ log.debug("Finished sending request to Solr at ZK address={} with params {} status_code={}", client.getZkStateReader().getZkClient().getZkServerAddress(), request.getParams(), status);
+ }
result = new Result<>(ResultStatus.HANDLED);
return result;
}
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java
index 5058c54..a9dde63 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java
@@ -47,7 +47,12 @@ public class TestMessageProcessor {
private CloudSolrClient solrClient;
private SolrMessageProcessor processor;
- private ResubmitBackoffPolicy backoffPolicy = spy(new NoOpResubmitBackoffPolicy());
+ private ResubmitBackoffPolicy backoffPolicy = spy(new ResubmitBackoffPolicy() {
+ @Override
+ public long getBackoffTimeMs(MirroredSolrRequest resubmitRequest) {
+ return 0;
+ }
+ });
@Before
public void setUp() {