You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ma...@apache.org on 2023/05/22 23:34:58 UTC

[solr-sandbox] branch crossdc-wip updated: Update some logging, add a couple kafka consumer config options. (#59)

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/crossdc-wip by this push:
     new 2a2f4f4  Update some logging, add a couple kafka consumer config options. (#59)
2a2f4f4 is described below

commit 2a2f4f4b94d851439ed0e66fa41fc5aff2089a1b
Author: Mark Robert Miller <ma...@apache.org>
AuthorDate: Mon May 22 18:34:53 2023 -0500

    Update some logging, add a couple kafka consumer config options. (#59)
---
 CROSSDC.md                                         |  2 +-
 .../solr/crossdc/common/KafkaCrossDcConf.java      | 19 +++++++++++++++
 .../crossdc/consumer/KafkaCrossDcConsumer.java     | 27 +++++++++++++++-------
 .../messageprocessor/SolrMessageProcessor.java     | 18 +++++++++------
 .../apache/solr/crossdc/TestMessageProcessor.java  | 14 +++++------
 5 files changed, 56 insertions(+), 24 deletions(-)

diff --git a/CROSSDC.md b/CROSSDC.md
index 5c528f4..cf8deaf 100644
--- a/CROSSDC.md
+++ b/CROSSDC.md
@@ -124,7 +124,7 @@ central config properties file. This is because the Consumer will use a Producer
    *bufferMemoryBytes* - the amount of memory in bytes allocated by the Producer in total for buffering 
    *lingerMs* - the amount of time that the Producer will wait to add to a batch
    *requestTimeout* - request timeout for the Producer - when used for the Consumers retry Producer, this should be less than the timeout that will cause the Consumer to be removed from the group for taking too long.
-
+   *maxPollIntervalMs* - The maximum delay between invocations of poll() when using consumer group management.
 #### Central Configuration Option
 
 You can optionally manage the configuration centrally in Solr's Zookeeper cluster by placing a properties file called *crossdc.properties* in the root Solr Zookeeper znode, eg, */solr/crossdc.properties*.  This allows you to update the configuration in a central location rather than at each solrconfig.xml in each Solr node and also automatically deals with new Solr nodes or Consumers to come up without requiring additional configuration.
diff --git a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
index 0b45bbb..251754d 100644
--- a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
+++ b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
@@ -49,6 +49,10 @@ public class KafkaCrossDcConf extends CrossDcConf {
 
   public static final String DEFAULT_MAX_PARTITION_FETCH_BYTES = "33554432";
 
+  public static final String DEFAULT_MAX_POLL_INTERVAL_MS = "90000";
+
+  public static final String DEFAULT_SESSION_TIMEOUT_MS = "10000";
+
   public static final String DEFAULT_PORT = "8090";
 
   private static final String DEFAULT_GROUP_ID = "SolrCrossDCConsumer";
@@ -86,6 +90,18 @@ public class KafkaCrossDcConf extends CrossDcConf {
 
   public static final String FETCH_MAX_BYTES = "fetchMaxBytes";
 
+  // The maximum delay between invocations of poll() when using consumer group management. This places
+  // an upper bound on the amount of time that the consumer can be idle before fetching more records.
+  // If poll() is not called before expiration of this timeout, then the consumer is considered failed
+  // and the group will rebalance in order to reassign the partitions to another member. For consumers
+  // using a non-null <code>group.instance.id</code> which reach this timeout, partitions will not be
+  // immediately reassigned. Instead, the consumer will stop sending heartbeats and partitions will be
+  // reassigned after expiration of <code>session.timeout.ms</code>. This mirrors the behavior of a
+  // static consumer which has shutdown.
+  public static final String MAX_POLL_INTERVAL_MS = "maxPollIntervalMs";
+
+  public static final String SESSION_TIMEOUT_MS = "sessionTimeoutMs";
+
   public static final String MAX_PARTITION_FETCH_BYTES = "maxPartitionFetchBytes";
 
   public static final String ZK_CONNECT_STRING = "zkConnectString";
@@ -121,6 +137,9 @@ public class KafkaCrossDcConf extends CrossDcConf {
             new ConfigProperty(FETCH_MIN_BYTES, DEFAULT_FETCH_MIN_BYTES),
             new ConfigProperty(FETCH_MAX_BYTES, DEFAULT_FETCH_MAX_BYTES),
             new ConfigProperty(FETCH_MAX_WAIT_MS, DEFAULT_FETCH_MAX_WAIT_MS),
+            new ConfigProperty(MAX_POLL_INTERVAL_MS, DEFAULT_MAX_POLL_INTERVAL_MS),
+            new ConfigProperty(SESSION_TIMEOUT_MS, DEFAULT_SESSION_TIMEOUT_MS),
+
 
             new ConfigProperty(MAX_PARTITION_FETCH_BYTES, DEFAULT_MAX_PARTITION_FETCH_BYTES),
             new ConfigProperty(MAX_POLL_RECORDS, DEFAULT_MAX_POLL_RECORDS),
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index 451253b..2f66d0f 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -53,6 +53,9 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
 
     kafkaConsumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, conf.getInt(KafkaCrossDcConf.MAX_POLL_RECORDS));
 
+    kafkaConsumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, conf.get(KafkaCrossDcConf.MAX_POLL_INTERVAL_MS));
+
+    kafkaConsumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, conf.get(KafkaCrossDcConf.SESSION_TIMEOUT_MS));
     kafkaConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
     kafkaConsumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
@@ -142,14 +145,19 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
                   record.partition(), record.key(), record.value());
             }
             IQueueHandler.Result<MirroredSolrRequest> result = messageProcessor.handleItem(record.value());
+            if (log.isTraceEnabled()) {
+              log.trace("Finished processing record from topic={} partition={} key={} value={} result={}",
+                  record.topic(), record.partition(), record.key(), record.value(), result);
+            }
+
             switch (result.status()) {
               case FAILED_RESUBMIT:
                 // currently, we use a strategy taken from an earlier working implementation
                 // of just resubmitting back to the queue - note that in rare cases, this could
                 // allow for incorrect update reorders
-                if (log.isTraceEnabled()) {
-                  log.trace("result=failed-resubmit");
-                }
+
+                log.info("Resubmitting failed Solr update to Kafka queue");
+
                 metrics.counter("failed-resubmit").inc();
                 kafkaMirroringSink.submit(record.value());
                 break;
@@ -228,9 +236,7 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
    */
   private void resetOffsetForPartition(TopicPartition partition,
       List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords) {
-    if (log.isTraceEnabled()) {
-      log.trace("Resetting offset to: {}", partitionRecords.get(0).offset());
-    }
+    log.info("Resetting offset to: {}", partitionRecords.get(0).offset());
     long resetOffset = partitionRecords.get(0).offset();
     consumer.seek(partition, resetOffset);
   }
@@ -245,12 +251,17 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
       List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords) {
     long nextOffset = partitionRecords.get(partitionRecords.size() - 1).offset() + 1;
 
-    if (log.isTraceEnabled()) {
-      log.trace("Updated offset for topic={} partition={} to offset={}", partition.topic(),
+    if (log.isDebugEnabled()) {
+      log.trace("Updating offset for topic={} partition={} to offset={}", partition.topic(),
           partition.partition(), nextOffset);
     }
 
     consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(nextOffset)));
+
+    if (log.isDebugEnabled()) {
+      log.trace("Finished updating offset for topic={} partition={} to offset={}", partition.topic(),
+          partition.partition(), nextOffset);
+    }
   }
 
   /**
diff --git a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
index b1a428e..f906ed1 100644
--- a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
+++ b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/messageprocessor/SolrMessageProcessor.java
@@ -82,8 +82,8 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
         SolrRequest request = mirroredSolrRequest.getSolrRequest();
         final SolrParams requestParams = request.getParams();
 
-        if (log.isTraceEnabled()) {
-            log.trace("handleSolrRequest params={}", requestParams);
+        if (log.isDebugEnabled()) {
+            log.debug("handleSolrRequest start params={}", requestParams);
         }
 
         // TODO: isn't this handled by the mirroring handler?
@@ -103,7 +103,9 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
         } catch (Exception e) {
             result = handleException(mirroredSolrRequest, e);
         }
-
+        if (log.isDebugEnabled()) {
+            log.debug("handleSolrRequest end params={} result={}", requestParams, result);
+        }
         return result;
     }
 
@@ -111,7 +113,7 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
         final SolrException solrException = SolrExceptionUtil.asSolrException(e);
         logIf4xxException(solrException);
         if (!isRetryable(e)) {
-            logFailure(mirroredSolrRequest, e, solrException, false);
+            log.error("Non retryable exception processing Solr update", e);
             return new Result<>(ResultStatus.FAILED_NO_RETRY, e);
         } else {
             logFailure(mirroredSolrRequest, e, solrException, true);
@@ -171,8 +173,8 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
      * Process the SolrRequest. If not, this method throws an exception.
      */
     private Result<MirroredSolrRequest> processMirroredSolrRequest(SolrRequest request) throws Exception {
-        if (log.isTraceEnabled()) {
-            log.trace("Sending request to Solr at ZK address={} with params {}", client.getZkStateReader().getZkClient().getZkServerAddress(), request.getParams());
+        if (log.isDebugEnabled()) {
+            log.debug("Sending request to Solr at ZK address={} with params {}", client.getZkStateReader().getZkClient().getZkServerAddress(), request.getParams());
         }
         Result<MirroredSolrRequest> result;
 
@@ -190,7 +192,9 @@ public class SolrMessageProcessor extends MessageProcessor implements IQueueHand
         }
 
         metrics.counter("processed").inc();
-
+        if (log.isDebugEnabled()) {
+            log.debug("Finished sending request to Solr at ZK address={} with params {} status_code={}", client.getZkStateReader().getZkClient().getZkServerAddress(), request.getParams(), status);
+        }
         result = new Result<>(ResultStatus.HANDLED);
         return result;
     }
diff --git a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java
index 9dc7073..a9dde63 100644
--- a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java
+++ b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/TestMessageProcessor.java
@@ -43,18 +43,16 @@ import static org.mockito.Mockito.*;
 public class TestMessageProcessor {
     static final String VERSION_FIELD = "_version_";
 
-    static class NoOpResubmitBackoffPolicy implements ResubmitBackoffPolicy {
-        @Override
-        public long getBackoffTimeMs(MirroredSolrRequest resubmitRequest) {
-            return 0;
-        }
-    }
-
     @Mock
     private CloudSolrClient solrClient;
     private SolrMessageProcessor processor;
 
-    private ResubmitBackoffPolicy backoffPolicy = spy(new NoOpResubmitBackoffPolicy());
+    private ResubmitBackoffPolicy backoffPolicy = spy(new ResubmitBackoffPolicy() {
+        @Override
+        public long getBackoffTimeMs(MirroredSolrRequest resubmitRequest) {
+            return 0;
+        }
+    });
 
     @Before
     public void setUp() {