You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ma...@apache.org on 2023/08/11 19:17:48 UTC

[solr-sandbox] branch main updated: Update some logging, add a couple kafka consumer config options.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/main by this push:
     new 881bf2d  Update some logging, add a couple kafka consumer config options.
881bf2d is described below

commit 881bf2dad29791bb3aeb6c92e6bd8db89d5c914f
Author: markrmiller <ma...@apache.org>
AuthorDate: Fri Aug 11 14:17:18 2023 -0500

    Update some logging, add a couple kafka consumer config options.
---
 CROSSDC.md                                            |  1 +
 .../apache/solr/crossdc/common/KafkaCrossDcConf.java  | 19 +++++++++++++++++++
 crossdc-consumer/machinet.conf                        |  3 ---
 .../solr/crossdc/consumer/KafkaCrossDcConsumer.java   |  3 +++
 .../messageprocessor/SolrMessageProcessor.java        | 18 +++++++++++-------
 .../org/apache/solr/crossdc/TestMessageProcessor.java |  7 ++++++-
 6 files changed, 40 insertions(+), 11 deletions(-)

diff --git a/CROSSDC.md b/CROSSDC.md
index d44bec8..2b03953 100644
--- a/CROSSDC.md
+++ b/CROSSDC.md
@@ -85,6 +85,7 @@ Optional configuration properties used when the consumer must retry by putting u
 - `bufferMemoryBytes`: memory allocated by the Producer in total for buffering 
 - `lingerMs`: amount of time that the Producer will wait to add to a batch
 - `requestTimeout`: request timeout for the Producer 
+- `maxPollIntervalMs`: the maximum delay between invocations of poll() when using consumer group management.
 
 #### Central Configuration Option
 
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
index 1901de3..92d8c7b 100644
--- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
@@ -51,6 +51,10 @@ public class KafkaCrossDcConf extends CrossDcConf {
 
   public static final String DEFAULT_MAX_PARTITION_FETCH_BYTES = "33554432";
 
+  public static final String DEFAULT_MAX_POLL_INTERVAL_MS = "90000";
+
+  public static final String DEFAULT_SESSION_TIMEOUT_MS = "10000";
+
   public static final String DEFAULT_PORT = "8090";
 
   private static final String DEFAULT_GROUP_ID = "SolrCrossDCConsumer";
@@ -92,6 +96,18 @@ public class KafkaCrossDcConf extends CrossDcConf {
 
   public static final String FETCH_MAX_BYTES = "fetchMaxBytes";
 
+  // The maximum delay between invocations of poll() when using consumer group management. This places
+  // an upper bound on the amount of time that the consumer can be idle before fetching more records.
+  // If poll() is not called before expiration of this timeout, then the consumer is considered failed
+  // and the group will rebalance in order to reassign the partitions to another member. For consumers
+  // using a non-null <code>group.instance.id</code> which reach this timeout, partitions will not be
+  // immediately reassigned. Instead, the consumer will stop sending heartbeats and partitions will be
+  // reassigned after expiration of <code>session.timeout.ms</code>. This mirrors the behavior of a
+  // static consumer which has shutdown.
+  public static final String MAX_POLL_INTERVAL_MS = "maxPollIntervalMs";
+
+  public static final String SESSION_TIMEOUT_MS = "sessionTimeoutMs";
+
   public static final String MAX_PARTITION_FETCH_BYTES = "maxPartitionFetchBytes";
 
   public static final String ZK_CONNECT_STRING = "zkConnectString";
@@ -127,6 +143,9 @@ public class KafkaCrossDcConf extends CrossDcConf {
             new ConfigProperty(FETCH_MAX_BYTES, DEFAULT_FETCH_MAX_BYTES),
             new ConfigProperty(FETCH_MAX_WAIT_MS, DEFAULT_FETCH_MAX_WAIT_MS),
             new ConfigProperty(CONSUMER_PROCESSING_THREADS, DEFAULT_CONSUMER_PROCESSING_THREADS),
+            new ConfigProperty(MAX_POLL_INTERVAL_MS, DEFAULT_MAX_POLL_INTERVAL_MS),
+            new ConfigProperty(SESSION_TIMEOUT_MS, DEFAULT_SESSION_TIMEOUT_MS),
+
 
             new ConfigProperty(MAX_PARTITION_FETCH_BYTES, DEFAULT_MAX_PARTITION_FETCH_BYTES),
             new ConfigProperty(MAX_POLL_RECORDS, DEFAULT_MAX_POLL_RECORDS),
diff --git a/crossdc-consumer/machinet.conf b/crossdc-consumer/machinet.conf
deleted file mode 100644
index 1c152ae..0000000
--- a/crossdc-consumer/machinet.conf
+++ /dev/null
@@ -1,3 +0,0 @@
-### Please DO NOT modify the contents of this file. For internal purpose only
-root=7badab89-0174-350c-a9c1-20ef5b1bf5a5
-rootId=1af1047d-7c48-30ff-a67a-1bfedfa05bc4
\ No newline at end of file
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index e8195ac..b57d071 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -73,6 +73,9 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
 
     kafkaConsumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, conf.getInt(KafkaCrossDcConf.MAX_POLL_RECORDS));
 
+    kafkaConsumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, conf.get(KafkaCrossDcConf.MAX_POLL_INTERVAL_MS));
+
+    kafkaConsumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, conf.get(KafkaCrossDcConf.SESSION_TIMEOUT_MS));
     kafkaConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
     kafkaConsumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
index dcea1d0..aad1b77 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
@@ -82,8 +82,8 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
         SolrRequest request = mirroredSolrRequest.getSolrRequest();
         final SolrParams requestParams = request.getParams();
 
-        if (log.isTraceEnabled()) {
-            log.trace("handleSolrRequest params={}", requestParams);
+        if (log.isDebugEnabled()) {
+            log.debug("handleSolrRequest start params={}", requestParams);
         }
 
         // TODO: isn't this handled by the mirroring handler?
@@ -103,7 +103,9 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
         } catch (Exception e) {
             result = handleException(mirroredSolrRequest, e);
         }
-
+        if (log.isDebugEnabled()) {
+            log.debug("handleSolrRequest end params={} result={}", requestParams, result);
+        }
         return result;
     }
 
@@ -111,7 +113,7 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
         final SolrException solrException = SolrExceptionUtil.asSolrException(e);
         logIf4xxException(solrException);
         if (!isRetryable(e)) {
-            logFailure(mirroredSolrRequest, e, solrException, false);
+            log.error("Non retryable exception processing Solr update", e);
             return new Result<>(ResultStatus.FAILED_NO_RETRY, e);
         } else {
             logFailure(mirroredSolrRequest, e, solrException, true);
@@ -171,8 +173,8 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
      * Process the SolrRequest. If not, this method throws an exception.
      */
     private Result<MirroredSolrRequest> processMirroredSolrRequest(SolrRequest request) throws Exception {
-        if (log.isTraceEnabled()) {
-            log.trace("Sending request to Solr at ZK address={} with params {}", client.getZkStateReader().getZkClient().getZkServerAddress(), request.getParams());
+        if (log.isDebugEnabled()) {
+            log.debug("Sending request to Solr at ZK address={} with params {}", client.getZkStateReader().getZkClient().getZkServerAddress(), request.getParams());
         }
         Result<MirroredSolrRequest> result;
 
@@ -190,7 +192,9 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
         }
 
         metrics.counter("processed").inc();
-
+        if (log.isDebugEnabled()) {
+            log.debug("Finished sending request to Solr at ZK address={} with params {} status_code={}", client.getZkStateReader().getZkClient().getZkServerAddress(), request.getParams(), status);
+        }
         result = new Result<>(ResultStatus.HANDLED);
         return result;
     }
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java
index 5058c54..a9dde63 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java
@@ -47,7 +47,12 @@ public class TestMessageProcessor {
     private CloudSolrClient solrClient;
     private SolrMessageProcessor processor;
 
-    private ResubmitBackoffPolicy backoffPolicy = spy(new NoOpResubmitBackoffPolicy());
+    private ResubmitBackoffPolicy backoffPolicy = spy(new ResubmitBackoffPolicy() {
+        @Override
+        public long getBackoffTimeMs(MirroredSolrRequest resubmitRequest) {
+            return 0;
+        }
+    });
 
     @Before
     public void setUp() {