You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/05/17 22:27:00 UTC

[pinot] branch master updated: Add decoder initialization error to the server's error cache (#10773)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ab0c27e947 Add decoder initialization error to the server's error cache (#10773)
ab0c27e947 is described below

commit ab0c27e947cbc280d3c55217a7a136259044de01
Author: Navina Ramesh <na...@apache.org>
AuthorDate: Wed May 17 15:26:51 2023 -0700

    Add decoder initialization error to the server's error cache (#10773)
    
    * Add decoder initialization error to the server's error cache.
    * Adding constructor failure to error cache
---
 .../data/manager/realtime/LLRealtimeSegmentDataManager.java | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 4f3428a5fe..7f96359ce0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1419,8 +1419,15 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
 
     // Create message decoder
     Set<String> fieldsToRead = IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(), _schema);
-    StreamMessageDecoder streamMessageDecoder = StreamDecoderProvider.create(_partitionLevelStreamConfig, fieldsToRead);
-    _streamDataDecoder = new StreamDataDecoderImpl(streamMessageDecoder);
+    try {
+      StreamMessageDecoder streamMessageDecoder =
+          StreamDecoderProvider.create(_partitionLevelStreamConfig, fieldsToRead);
+      _streamDataDecoder = new StreamDataDecoderImpl(streamMessageDecoder);
+    } catch (Exception e) {
+      _realtimeTableDataManager.addSegmentError(_segmentNameStr,
+          new SegmentErrorInfo(now(), "Failed to initialize the StreamMessageDecoder", e));
+      throw e;
+    }
     _transformPipeline = new TransformPipeline(tableConfig, schema);
     // Acquire semaphore to create stream consumers
     try {
@@ -1458,6 +1465,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       // ERROR -> OFFLINE -> CONSUMING via Helix Admin fails because the semaphore is acquired, but not released.
       // Hence releasing the semaphore here to unblock reset operation via Helix Admin.
       _partitionGroupConsumerSemaphore.release();
+      _realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(),
+          "Failed to initialize segment data manager", e));
       throw e;
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org