You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/05/17 22:27:00 UTC
[pinot] branch master updated: Add decoder initialization error to the server's error cache (#10773)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ab0c27e947 Add decoder initialization error to the server's error cache (#10773)
ab0c27e947 is described below
commit ab0c27e947cbc280d3c55217a7a136259044de01
Author: Navina Ramesh <na...@apache.org>
AuthorDate: Wed May 17 15:26:51 2023 -0700
Add decoder initialization error to the server's error cache (#10773)
* Add decoder initialization error to the server's error cache.
* Adding constructor failure to error cache
---
.../data/manager/realtime/LLRealtimeSegmentDataManager.java | 13 +++++++++++--
1 file changed, 11 insertions(+), 2 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 4f3428a5fe..7f96359ce0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1419,8 +1419,15 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// Create message decoder
Set<String> fieldsToRead = IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(), _schema);
- StreamMessageDecoder streamMessageDecoder = StreamDecoderProvider.create(_partitionLevelStreamConfig, fieldsToRead);
- _streamDataDecoder = new StreamDataDecoderImpl(streamMessageDecoder);
+ try {
+ StreamMessageDecoder streamMessageDecoder =
+ StreamDecoderProvider.create(_partitionLevelStreamConfig, fieldsToRead);
+ _streamDataDecoder = new StreamDataDecoderImpl(streamMessageDecoder);
+ } catch (Exception e) {
+ _realtimeTableDataManager.addSegmentError(_segmentNameStr,
+ new SegmentErrorInfo(now(), "Failed to initialize the StreamMessageDecoder", e));
+ throw e;
+ }
_transformPipeline = new TransformPipeline(tableConfig, schema);
// Acquire semaphore to create stream consumers
try {
@@ -1458,6 +1465,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// ERROR -> OFFLINE -> CONSUMING via Helix Admin fails because the semaphore is acquired, but not released.
// Hence releasing the semaphore here to unblock reset operation via Helix Admin.
_partitionGroupConsumerSemaphore.release();
+ _realtimeTableDataManager.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(),
+ "Failed to initialize segment data manager", e));
throw e;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org